Spark DEBUG: It is possible the underlying files have been updated.
Issue context
When reading and writing into the same location or table simultaneously, Spark throws out the following error:
It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
Reproduce the error
Run the following script to reproduce the error:
from pyspark.sql import SparkSession appName = "PySpark Example - Partition Changes" master = "local" path = 'file:///home/kontext/pyspark-examples/data/part-change' # # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .getOrCreate() data = [{ 'dt': '2022-01-01', 'id': 1 }, { 'dt': '2022-01-01', 'id': 2 }, { 'dt': '2022-01-01', 'id': 3 }, { 'dt': '2022-02-01', 'id': 1 }] df = spark.createDataFrame(data) print(df.schema) df.show() df.repartition('dt').write.mode('overwrite').partitionBy('dt').format( 'parquet').save(path) df2 = spark.read.format('parquet').load(path) df2 = df2.where("dt='2022-01-01'") df2.show() df2.repartition('dt').write.partitionBy('dt').mode('overwrite').format( 'parquet').save(path)
The above script does the following:
- Create a Spark DataFrame
- Save it into local folder with partitions by column '
dt
' - Read the folder into memory as
df2
. - Filter to only keep partition
dt=2022-01-01
- Save it to the same location
Resolutions or workarounds
There are different approaches to resolve this issue depends on the scenario. I tried the suggested refreshByPath
(or refreshTable
if source is table), it doesn't work.
Save the derived DF to a different location
One workaround is to save the DF to a different location and then delete the old path and rename the new location back to the previous name. This is not very efficient.
Cache the derived DF
We can cache the derived DF and then the code should work.
df2 = spark.read.format('parquet').load(path) df2 = df2.where("dt='2022-01-01'") df2.cache() df2.show() df2.repartition('dt').write.partitionBy('dt').mode('overwrite').format( 'parquet').save(path)
Change to dynamic overwrite
If the purpose is to only overwrite the specified partitions in df2, consider changing to use dynamic partition overwrite. Refer to Spark Dynamic and Static Partition Overwrite for more details.
Do you have other approaches? Feel free to add a comment.