Kontext Kontext

PySpark partitionBy with Examples

event 2022-08-25 visibility 1,262 comment 0 insights toc
more_vert
insights Stats

PySpark DataFrameWriter.partitionBy method can be used to partition the data set by the given columns on the file system. The data layout in the file system will be similar to Hive's partitioning tables. Spark partition pruning can benefit from this data layout in file system to improve performance when filtering on partition columns.

Syntax

DataFrameWriter.partitionBy(*cols: Union[str, List[str]]) 

One thing to notice is that - this function is very different from the Spark DataFrame.repartition function. repartition function is used to repartition RDD to usually improve parallelism. Refer to Spark repartition Function Internals for more details. 

Example

Let's create a code snippet to use partitionBy method.

This code snippet provides an example or write data into partitioned files. 

from pyspark.sql import SparkSession

appName = "PySpark partitionBy Example"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

df = spark.range(1, 10 ** 5)
df = df.withColumn('id_bucket', df.id % 5)
df.write.format('parquet').mode('overwrite').partitionBy(
    'id_bucket').save('/data/partitionBy')

The script creates a DataFrame in memory and then derive a new column id_bucket using id column. All records in the DataFrame will land into 5 buckets (0, 1, 2, 3 and 4). The DataFrame is then saved into HDFS with data partitioned by id_bucket column. 

Output

In the file system (HDFS for this case), there are 5 partition folders created based on id_bucket column. The file layout system is very similar as Hive partitioned table.

$ hadoop fs -ls -R /data/partitionBy
-rw-r--r--   1 kontext supergroup          0 2022-08-25 22:39 /data/partitionBy/_SUCCESS
drwxr-xr-x   - kontext supergroup          0 2022-08-25 22:39 /data/partitionBy/id_bucket=0
-rw-r--r--   1 kontext supergroup      80892 2022-08-25 22:39 /data/partitionBy/id_bucket=0/part-00000-7c889b61-0c5b-4027-8aa4-158a80a8502f.c000.snappy.parquet
drwxr-xr-x   - kontext supergroup          0 2022-08-25 22:39 /data/partitionBy/id_bucket=1
-rw-r--r--   1 kontext supergroup      80896 2022-08-25 22:39 /data/partitionBy/id_bucket=1/part-00000-7c889b61-0c5b-4027-8aa4-158a80a8502f.c000.snappy.parquet
drwxr-xr-x   - kontext supergroup          0 2022-08-25 22:39 /data/partitionBy/id_bucket=2
-rw-r--r--   1 kontext supergroup      80888 2022-08-25 22:39 /data/partitionBy/id_bucket=2/part-00000-7c889b61-0c5b-4027-8aa4-158a80a8502f.c000.snappy.parquet
drwxr-xr-x   - kontext supergroup          0 2022-08-25 22:39 /data/partitionBy/id_bucket=3
-rw-r--r--   1 kontext supergroup      80898 2022-08-25 22:39 /data/partitionBy/id_bucket=3/part-00000-7c889b61-0c5b-4027-8aa4-158a80a8502f.c000.snappy.parquet
drwxr-xr-x   - kontext supergroup          0 2022-08-25 22:39 /data/partitionBy/id_bucket=4
-rw-r--r--   1 kontext supergroup      80895 2022-08-25 22:39 /data/partitionBy/id_bucket=4/part-00000-7c889b61-0c5b-4027-8aa4-158a80a8502f.c000.snappy.parquet

Partition pruning in Spark

Now let's create another script to read from the above location with filter on id_bucket column.

from pyspark.sql import SparkSession

appName = "PySpark partitionBy Example"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

df = spark.read.format('parquet').load('/data/partitionBy')
df = df.where('id_bucket=1')
df.show()
df.explain(extended=True)

The physical plan is printed out like the following:

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [id#0L,id_bucket#1] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/data/partitionBy], PartitionFilters: [isnotnull(id_bucket#1), (id_bucket#1 = 1)], PushedFilters: [], ReadSchema: struct<id:bigint>

As we can see, partition filter is used instead of scanning all those 5 parquet files. This is also reflected in the graphical plan in web UI portal as the following screenshot shows:

20220825125434-image.png

This generally leads up to better performance in Spark SQL queries.

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts