Spark Dynamic and Static Partition Overwrite

visibility 4,236 comment 1 event 2022-06-22 access_time 6 months ago language English
more_vert
From version 2.3.0, Spark provides two modes to overwrite partitions to save data: DYNAMIC and STATIC. Static mode will overwrite all the partitions or the partition specified in INSERT statement, for example, PARTITION=20220101; dynamic mode only overwrites those partitions that have data written into it at runtime. The default mode is STATIC. The benefit of DYNAMIC overwrite is that it can be more efficient as it doesn't delete all partition ahead. 
infoThis config doesn't affect Hive serde tables.

How to enable DYNAMIC overwrite

Dynamic overwrite mode can be enabled when saving DataFrame:

dataframe.write.option("partitionOverwriteMode", "dynamic").save(path)
Alternatively, we can also use Spark SQL configuration:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC")

Let's use some examples to understand more. 

Dynamic overwrite example

Let's first create a local folder using the following code snippet. The script first creates a DataFrame in memory and repartition data by 'dt' column and write it into the local file system.

# dynamic-overwrite-creation.py
from pyspark.sql import SparkSession

appName = "PySpark Example - Partition Dynamic Overwrite"
master = "local"
# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

data = [{
    'dt': '2022-01-01',
    'id': 1
}, {
    'dt': '2022-01-01',
    'id': 2
}, {
    'dt': '2022-01-01',
    'id': 3
}, {
    'dt': '2022-02-01',
    'id': 1
}]

df = spark.createDataFrame(data)
print(df.schema)
df.show()
df.repartition('dt').write.mode('overwrite').partitionBy('dt').format(
    'parquet').save('file:///home/kontext/pyspark-examples/data/parts')
The above script creates the following structure:
20220622114007-image.png

Tree command prints out the following texts:

$ tree -D /home/kontext/pyspark-examples/data/parts
/home/kontext/pyspark-examples/data/parts
├── [Jun 22 21:39]  _SUCCESS
├── [Jun 22 21:39]  dt=2022-01-01
│   └── [Jun 22 21:39]  part-00000-789a5277-6d1e-448d-b140-38a52e5a1acc.c000.snappy.parquet
└── [Jun 22 21:39]  dt=2022-02-01
    └── [Jun 22 21:39]  part-00000-789a5277-6d1e-448d-b140-38a52e5a1acc.c000.snappy.parquet

Now let's derive a new DataFrame and save it using dynamic overwrite.

Overwrite with dynamic mode

The code snippet looks like the following:

# dynamic-overwrite-mode.py
from pyspark.sql import SparkSession

appName = "PySpark Example - Partition Dynamic Overwrite"
master = "local"
# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

path = 'file:///home/kontext/pyspark-examples/data/parts'
df = spark.read.format('parquet').load(path)
df = df.where("dt='2022-01-01'")
df = df.withColumn('id', df['id'] * 10)
df.show()
df.write.partitionBy('dt').mode('overwrite').format('parquet').option(
    "partitionOverwriteMode", "dynamic").save(path)

We first create a Spark session and then read the parquet files in previous saved location. It then filters the DataFrame to only include records for dt=2022-01-01 partition. It increases values of column 'id' before saving to the previous location using dynamic overwrite. 

Now let's run the tree command again:

$ tree -D /home/kontext/pyspark-examples/data/parts
/home/kontext/pyspark-examples/data/parts
├── [Jun 22 21:39]  _SUCCESS
├── [Jun 22 21:50]  dt=2022-01-01
│   └── [Jun 22 21:50]  part-00000-68af5e31-ccd6-4ecc-991a-4a8f5c056bde.c000.snappy.parquet
└── [Jun 22 21:39]  dt=2022-02-01
    └── [Jun 22 21:39]  part-00000-789a5277-6d1e-448d-b140-38a52e5a1acc.c000.snappy.parquet

From the timestamp, we can clearly see that only files in the first partition was updated and the second partition is untouched.

About static overwrite mode

By default, the mode is STATIC when overwrite mode is specified. Thus there is no additional code required unless your Spark default configuration is set to DYNAMIC and you want to change it to STATIC.

If we change the code in the previous example to use STATIC overwrite mode, then partition 2 will be deleted.

path = 'file:///home/kontext/pyspark-examples/data/parts'
df = spark.read.format('parquet').load(path)
df = df.where("dt='2022-01-01'")
df = df.withColumn('id', df['id'] * 10)
df.cache()
df.show()
df.repartition('dt').write.partitionBy('dt').mode('overwrite').format(
    'parquet').option("partitionOverwriteMode", "static").save(path)

It happens in that way because partition 2 was removed from the DataFrame in runtime.

We can verify this using tree command again:
$ tree -D /home/kontext/pyspark-examples/data/parts
/home/kontext/pyspark-examples/data/parts
├── [Jun 22 22:17]  _SUCCESS
└── [Jun 22 22:17]  dt=2022-01-01
    └── [Jun 22 22:17]  part-00000-22d4538a-9213-45a4-821a-339d45e57004.c000.snappy.parquet

1 directory, 2 files
Hopefully by now you understand the differences between these two modes. Feel free to leave a comment if you have any questions.
info Last modified by Kontext 6 months ago copyright This page is subject to Site terms.
Like this article?
Share on

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts