Spark DEBUG: It is possible the underlying files have been updated.

Kontext Kontext event 2022-06-22 visibility 7,564
more_vert

Issue context

When reading and writing into the same location or table simultaneously, Spark throws out the following error:

It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.

Reproduce the error

Run the following script to reproduce the error:

from pyspark.sql import SparkSession

appName = "PySpark Example - Partition Changes"
master = "local"
path = 'file:///home/kontext/pyspark-examples/data/part-change'
# # Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

data = [{
    'dt': '2022-01-01',
    'id': 1
}, {
    'dt': '2022-01-01',
    'id': 2
}, {
    'dt': '2022-01-01',
    'id': 3
}, {
    'dt': '2022-02-01',
    'id': 1
}]

df = spark.createDataFrame(data)
print(df.schema)
df.show()
df.repartition('dt').write.mode('overwrite').partitionBy('dt').format(
    'parquet').save(path)

df2 = spark.read.format('parquet').load(path)
df2 = df2.where("dt='2022-01-01'")
df2.show()
df2.repartition('dt').write.partitionBy('dt').mode('overwrite').format(
    'parquet').save(path)

The above script does the following:

  • Create a Spark DataFrame
  • Save it into local folder with partitions by column 'dt'
  • Read the folder into memory as df2.
  • Filter to only keep partition dt=2022-01-01
  • Save it to the same location

Resolutions or workarounds

There are different approaches to resolve this issue depends on the scenario. I tried the suggested refreshByPath (or refreshTable if source is table), it doesn't work.

Save the derived DF to a different location

One workaround is to save the DF to a different location and then delete the old path and rename the new location back to the previous name. This is not very efficient.

Cache the derived DF

We can cache the derived DF and then the code should work.

df2 = spark.read.format('parquet').load(path)
df2 = df2.where("dt='2022-01-01'")
df2.cache()
df2.show()
df2.repartition('dt').write.partitionBy('dt').mode('overwrite').format(
    'parquet').save(path)
*Note, this approach will delete all other partitions except the ones meet the filter condition. 

Change to dynamic overwrite

If the purpose is to only overwrite the specified partitions in df2, consider changing to use dynamic partition overwrite. Refer to Spark Dynamic and Static Partition Overwrite for more details.

Do you have other approaches? Feel free to add a comment.

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts