Spark Dynamic and Static Partition Overwrite
How to enable DYNAMIC overwrite
Dynamic overwrite mode can be enabled when saving DataFrame:
dataframe.write.option("partitionOverwriteMode", "dynamic").save(path)
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC")
Let's use some examples to understand more.
Dynamic overwrite example
Let's first create a local folder using the following code snippet. The script first creates a DataFrame in memory and repartition data by 'dt' column and write it into the local file system.
# dynamic-overwrite-creation.py from pyspark.sql import SparkSession appName = "PySpark Example - Partition Dynamic Overwrite" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .getOrCreate() data = [{ 'dt': '2022-01-01', 'id': 1 }, { 'dt': '2022-01-01', 'id': 2 }, { 'dt': '2022-01-01', 'id': 3 }, { 'dt': '2022-02-01', 'id': 1 }] df = spark.createDataFrame(data) print(df.schema) df.show() df.repartition('dt').write.mode('overwrite').partitionBy('dt').format( 'parquet').save('file:///home/kontext/pyspark-examples/data/parts')
Tree command prints out the following texts:
$ tree -D /home/kontext/pyspark-examples/data/parts /home/kontext/pyspark-examples/data/parts ├── [Jun 22 21:39] _SUCCESS ├── [Jun 22 21:39] dt=2022-01-01 │ └── [Jun 22 21:39] part-00000-789a5277-6d1e-448d-b140-38a52e5a1acc.c000.snappy.parquet └── [Jun 22 21:39] dt=2022-02-01 └── [Jun 22 21:39] part-00000-789a5277-6d1e-448d-b140-38a52e5a1acc.c000.snappy.parquet
Now let's derive a new DataFrame and save it using dynamic overwrite.
Overwrite with dynamic mode
The code snippet looks like the following:
# dynamic-overwrite-mode.py from pyspark.sql import SparkSession appName = "PySpark Example - Partition Dynamic Overwrite" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .getOrCreate() path = 'file:///home/kontext/pyspark-examples/data/parts' df = spark.read.format('parquet').load(path) df = df.where("dt='2022-01-01'") df = df.withColumn('id', df['id'] * 10) df.show() df.write.partitionBy('dt').mode('overwrite').format('parquet').option( "partitionOverwriteMode", "dynamic").save(path)
We first create a Spark session and then read the parquet files in previous saved location. It then filters the DataFrame to only include records for dt=2022-01-01
partition. It increases values of column 'id' before saving to the previous location using dynamic overwrite.
Now let's run the tree
command again:
$ tree -D /home/kontext/pyspark-examples/data/parts /home/kontext/pyspark-examples/data/parts ├── [Jun 22 21:39] _SUCCESS ├── [Jun 22 21:50] dt=2022-01-01 │ └── [Jun 22 21:50] part-00000-68af5e31-ccd6-4ecc-991a-4a8f5c056bde.c000.snappy.parquet └── [Jun 22 21:39] dt=2022-02-01 └── [Jun 22 21:39] part-00000-789a5277-6d1e-448d-b140-38a52e5a1acc.c000.snappy.parquet
From the timestamp, we can clearly see that only files in the first partition was updated and the second partition is untouched.
About static overwrite mode
By default, the mode is STATIC when overwrite mode is specified. Thus there is no additional code required unless your Spark default configuration is set to DYNAMIC and you want to change it to STATIC.
If we change the code in the previous example to use STATIC overwrite mode, then partition 2 will be deleted.
path = 'file:///home/kontext/pyspark-examples/data/parts' df = spark.read.format('parquet').load(path) df = df.where("dt='2022-01-01'") df = df.withColumn('id', df['id'] * 10) df.cache() df.show() df.repartition('dt').write.partitionBy('dt').mode('overwrite').format( 'parquet').option("partitionOverwriteMode", "static").save(path)
It happens in that way because partition 2 was removed from the DataFrame in runtime.
tree
command again:$ tree -D /home/kontext/pyspark-examples/data/parts /home/kontext/pyspark-examples/data/parts ├── [Jun 22 22:17] _SUCCESS └── [Jun 22 22:17] dt=2022-01-01 └── [Jun 22 22:17] part-00000-22d4538a-9213-45a4-821a-339d45e57004.c000.snappy.parquet 1 directory, 2 files
Sorry for the late reply. Can you try adding the partitionOverwriteMode in your write options:
df.write.partitionBy('dt').mode('overwrite').format('parquet').option( "partitionOverwriteMode", "dynamic").save(path)
The one you used needs to be specified separately before calling df.write:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC")
I tried this however the data is getting completely overwritten i.e. even the partitions that are not changed are removed. I am running PySpark in AWS Glue with spark version 3.1. Below is the code snippet-