PySpark partitionBy with Examples
PySpark DataFrameWriter.partitionBy
method can be used to partition the data set by the given columns on the file system. The data layout in the file system will be similar to Hive's partitioning tables. Spark partition pruning can benefit from this data layout in file system to improve performance when filtering on partition columns.
Syntax
DataFrameWriter.partitionBy(*cols: Union[str, List[str]])
One thing to notice is that - this function is very different from the Spark DataFrame.repartition
function. repartition
function is used to repartition RDD to usually improve parallelism. Refer to Spark repartition Function Internals for more details.
Example
Let's create a code snippet to use partitionBy
method.
This code snippet provides an example or write data into partitioned files.
from pyspark.sql import SparkSession appName = "PySpark partitionBy Example" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.range(1, 10 ** 5) df = df.withColumn('id_bucket', df.id % 5) df.write.format('parquet').mode('overwrite').partitionBy( 'id_bucket').save('/data/partitionBy')
The script creates a DataFrame in memory and then derive a new column id_bucket
using id
column. All records in the DataFrame will land into 5 buckets (0, 1, 2, 3 and 4). The DataFrame is then saved into HDFS with data partitioned by id_bucket
column.
Output
In the file system (HDFS for this case), there are 5 partition folders created based on id_bucket
column. The file layout system is very similar as Hive partitioned table.
$ hadoop fs -ls -R /data/partitionBy -rw-r--r-- 1 kontext supergroup 0 2022-08-25 22:39 /data/partitionBy/_SUCCESS drwxr-xr-x - kontext supergroup 0 2022-08-25 22:39 /data/partitionBy/id_bucket=0 -rw-r--r-- 1 kontext supergroup 80892 2022-08-25 22:39 /data/partitionBy/id_bucket=0/part-00000-7c889b61-0c5b-4027-8aa4-158a80a8502f.c000.snappy.parquet drwxr-xr-x - kontext supergroup 0 2022-08-25 22:39 /data/partitionBy/id_bucket=1 -rw-r--r-- 1 kontext supergroup 80896 2022-08-25 22:39 /data/partitionBy/id_bucket=1/part-00000-7c889b61-0c5b-4027-8aa4-158a80a8502f.c000.snappy.parquet drwxr-xr-x - kontext supergroup 0 2022-08-25 22:39 /data/partitionBy/id_bucket=2 -rw-r--r-- 1 kontext supergroup 80888 2022-08-25 22:39 /data/partitionBy/id_bucket=2/part-00000-7c889b61-0c5b-4027-8aa4-158a80a8502f.c000.snappy.parquet drwxr-xr-x - kontext supergroup 0 2022-08-25 22:39 /data/partitionBy/id_bucket=3 -rw-r--r-- 1 kontext supergroup 80898 2022-08-25 22:39 /data/partitionBy/id_bucket=3/part-00000-7c889b61-0c5b-4027-8aa4-158a80a8502f.c000.snappy.parquet drwxr-xr-x - kontext supergroup 0 2022-08-25 22:39 /data/partitionBy/id_bucket=4 -rw-r--r-- 1 kontext supergroup 80895 2022-08-25 22:39 /data/partitionBy/id_bucket=4/part-00000-7c889b61-0c5b-4027-8aa4-158a80a8502f.c000.snappy.parquet
Partition pruning in Spark
Now let's create another script to read from the above location with filter on id_bucket
column.
from pyspark.sql import SparkSession appName = "PySpark partitionBy Example" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.read.format('parquet').load('/data/partitionBy') df = df.where('id_bucket=1') df.show() df.explain(extended=True)
The physical plan is printed out like the following:
== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [id#0L,id_bucket#1] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/data/partitionBy], PartitionFilters: [isnotnull(id_bucket#1), (id_bucket#1 = 1)], PushedFilters: [], ReadSchema: struct<id:bigint>
As we can see, partition filter is used instead of scanning all those 5 parquet files. This is also reflected in the graphical plan in web UI portal as the following screenshot shows:
This generally leads up to better performance in Spark SQL queries.