Spark DEBUG: It is possible the underlying files have been updated.

visibility 22 access_time 7 days ago languageEnglish timeline Stats
timeline Stats
Page index 2.75
more_horiz

Issue context

When reading and writing into the same location or table simultaneously, Spark throws out the following error:

It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.

Reproduce the error

Run the following script to reproduce the error:

from pyspark.sql import SparkSession

appName = "PySpark Example - Partition Changes"
master = "local"
path = 'file:///home/kontext/pyspark-examples/data/part-change'
# # Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

data = [{
    'dt': '2022-01-01',
    'id': 1
}, {
    'dt': '2022-01-01',
    'id': 2
}, {
    'dt': '2022-01-01',
    'id': 3
}, {
    'dt': '2022-02-01',
    'id': 1
}]

df = spark.createDataFrame(data)
print(df.schema)
df.show()
df.repartition('dt').write.mode('overwrite').partitionBy('dt').format(
    'parquet').save(path)

df2 = spark.read.format('parquet').load(path)
df2 = df2.where("dt='2022-01-01'")
df2.show()
df2.repartition('dt').write.partitionBy('dt').mode('overwrite').format(
    'parquet').save(path)

The above script does the following:

  • Create a Spark DataFrame
  • Save it into local folder with partitions by column 'dt'
  • Read the folder into memory as df2.
  • Filter to only keep partition dt=2022-01-01
  • Save it to the same location

Resolutions or workarounds

There are different approaches to resolve this issue depends on the scenario. I tried the suggested refreshByPath (or refreshTable if source is table), it doesn't work.

Save the derived DF to a different location

One workaround is to save the DF to a different location and then delete the old path and rename the new location back to the previous name. This is not very efficient.

Cache the derived DF

We can cache the derived DF and then the code should work.

df2 = spark.read.format('parquet').load(path)
df2 = df2.where("dt='2022-01-01'")
df2.cache()
df2.show()
df2.repartition('dt').write.partitionBy('dt').mode('overwrite').format(
    'parquet').save(path)
*Note, this approach will delete all other partitions except the ones meet the filter condition. 

Change to dynamic overwrite

If the purpose is to only overwrite the specified partitions in df2, consider changing to use dynamic partition overwrite. Refer to Spark Dynamic and Static Partition Overwrite for more details.

Do you have other approaches? Feel free to add a comment.

info Last modified by Kontext 7 days ago copyright This page is subject to Site terms.
Like this article?
Share on

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

More from Kontext
Debug PySpark Code in Visual Studio Code
visibility 15,933
thumb_up 2
access_time 3 years ago
Spark Scala: Load Data from MySQL
visibility 1,623
thumb_up 0
access_time 2 years ago
Spark Scala: Load Data from MySQL