Spark Dynamic and Static Partition Overwrite

Kontext Kontext event 2022-06-22 visibility 19,181 comment 2
more_vert
From version 2.3.0, Spark provides two modes to overwrite partitions to save data: DYNAMIC and STATIC. Static mode will overwrite all the partitions or the partition specified in INSERT statement, for example, PARTITION=20220101; dynamic mode only overwrites those partitions that have data written into it at runtime. The default mode is STATIC. The benefit of DYNAMIC overwrite is that it can be more efficient as it doesn't delete all partition ahead. 
infoThis config doesn't affect Hive serde tables.

How to enable DYNAMIC overwrite

Dynamic overwrite mode can be enabled when saving DataFrame:

dataframe.write.option("partitionOverwriteMode", "dynamic").save(path)
Alternatively, we can also use Spark SQL configuration:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC")

Let's use some examples to understand more. 

Dynamic overwrite example

Let's first create a local folder using the following code snippet. The script first creates a DataFrame in memory and repartition data by 'dt' column and write it into the local file system.

# dynamic-overwrite-creation.py
from pyspark.sql import SparkSession

appName = "PySpark Example - Partition Dynamic Overwrite"
master = "local"
# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

data = [{
    'dt': '2022-01-01',
    'id': 1
}, {
    'dt': '2022-01-01',
    'id': 2
}, {
    'dt': '2022-01-01',
    'id': 3
}, {
    'dt': '2022-02-01',
    'id': 1
}]

df = spark.createDataFrame(data)
print(df.schema)
df.show()
df.repartition('dt').write.mode('overwrite').partitionBy('dt').format(
    'parquet').save('file:///home/kontext/pyspark-examples/data/parts')
The above script creates the following structure:
20220622114007-image.png

Tree command prints out the following texts:

$ tree -D /home/kontext/pyspark-examples/data/parts
/home/kontext/pyspark-examples/data/parts
├── [Jun 22 21:39]  _SUCCESS
├── [Jun 22 21:39]  dt=2022-01-01
│   └── [Jun 22 21:39]  part-00000-789a5277-6d1e-448d-b140-38a52e5a1acc.c000.snappy.parquet
└── [Jun 22 21:39]  dt=2022-02-01
    └── [Jun 22 21:39]  part-00000-789a5277-6d1e-448d-b140-38a52e5a1acc.c000.snappy.parquet

Now let's derive a new DataFrame and save it using dynamic overwrite.

Overwrite with dynamic mode

The code snippet looks like the following:

# dynamic-overwrite-mode.py
from pyspark.sql import SparkSession

appName = "PySpark Example - Partition Dynamic Overwrite"
master = "local"
# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

path = 'file:///home/kontext/pyspark-examples/data/parts'
df = spark.read.format('parquet').load(path)
df = df.where("dt='2022-01-01'")
df = df.withColumn('id', df['id'] * 10)
df.show()
df.write.partitionBy('dt').mode('overwrite').format('parquet').option(
    "partitionOverwriteMode", "dynamic").save(path)

We first create a Spark session and then read the parquet files in previous saved location. It then filters the DataFrame to only include records for dt=2022-01-01 partition. It increases values of column 'id' before saving to the previous location using dynamic overwrite. 

Now let's run the tree command again:

$ tree -D /home/kontext/pyspark-examples/data/parts
/home/kontext/pyspark-examples/data/parts
├── [Jun 22 21:39]  _SUCCESS
├── [Jun 22 21:50]  dt=2022-01-01
│   └── [Jun 22 21:50]  part-00000-68af5e31-ccd6-4ecc-991a-4a8f5c056bde.c000.snappy.parquet
└── [Jun 22 21:39]  dt=2022-02-01
    └── [Jun 22 21:39]  part-00000-789a5277-6d1e-448d-b140-38a52e5a1acc.c000.snappy.parquet

From the timestamp, we can clearly see that only files in the first partition was updated and the second partition is untouched.

About static overwrite mode

By default, the mode is STATIC when overwrite mode is specified. Thus there is no additional code required unless your Spark default configuration is set to DYNAMIC and you want to change it to STATIC.

If we change the code in the previous example to use STATIC overwrite mode, then partition 2 will be deleted.

path = 'file:///home/kontext/pyspark-examples/data/parts'
df = spark.read.format('parquet').load(path)
df = df.where("dt='2022-01-01'")
df = df.withColumn('id', df['id'] * 10)
df.cache()
df.show()
df.repartition('dt').write.partitionBy('dt').mode('overwrite').format(
    'parquet').option("partitionOverwriteMode", "static").save(path)

It happens in that way because partition 2 was removed from the DataFrame in runtime.

We can verify this using tree command again:
$ tree -D /home/kontext/pyspark-examples/data/parts
/home/kontext/pyspark-examples/data/parts
├── [Jun 22 22:17]  _SUCCESS
└── [Jun 22 22:17]  dt=2022-01-01
    └── [Jun 22 22:17]  part-00000-22d4538a-9213-45a4-821a-339d45e57004.c000.snappy.parquet

1 directory, 2 files
Hopefully by now you understand the differences between these two modes. Feel free to leave a comment if you have any questions.
More from Kontext
comment Comments
J Jatin Kakani

Jatin access_time 3 years ago link more_vert

I tried this however the data is getting completely overwritten i.e. even the partitions that are not changed are removed. I am running PySpark in AWS Glue with spark version 3.1. Below is the code snippet-

2022112050127-code.png



Kontext Kontext

Kontext access_time 2 years ago link more_vert

Sorry for the late reply. Can you try adding the partitionOverwriteMode in your write options:

df.write.partitionBy('dt').mode('overwrite').format('parquet').option(
    "partitionOverwriteMode", "dynamic").save(path)

The one you used needs to be specified separately before calling df.write:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC")

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts