Spark Bucketing and Bucket Pruning Explained
Spark provides API (bucketBy
) to split data set to smaller chunks (buckets). Mumur3 hash function is used to calculate the bucket number based on the specified bucket columns. Buckets are different from partitions as the bucket columns are still stored in the data file while partition column values are usually stored as part of file system paths. The main purpose is to avoid data shuffling when performing joins. With less data shuffling, there will be less stages required for a job thus the performance will usually better. Spark also supports bucket pruning (similar as partition pruning).
Let's jump into the details with some examples.
Spark join without buckets
Let's first look into one example of INNER JOIN of two non-bucketing tables in Spark SQL. The following is code snippet:
from pyspark.sql import SparkSession appName = "PySpark Non-Bucketing Example" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .enableHiveSupport() \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") df1 = spark.range(1, 10 ** 6) df1.write.format('parquet').mode('overwrite').saveAsTable( 'test_db.spark_no_bucket_table1') df2 = spark.range(1, 10 ** 5) df2.write.format('parquet').mode('overwrite').saveAsTable( 'test_db.spark_no_bucket_table2') spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df = spark.sql(""" select * from test_db.spark_no_bucket_table1 t1 inner join test_db.spark_no_bucket_table2 t2 on t1.id=t2.id """) df.show() df.explain(extended=True)
The script creates two DataFrame objects can then save then as table into Hive database test_db
. Later the two tables were joined together via Spark SQL.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
is used to turn off broadcast join so that we can compare the differences between bucketing and non-bucketing tables. Otherwise, you may find these text in the execution plan: Bucketed: false (disabled by query planner)The text version of physical plan looks like the following:
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- SortMergeJoin [id#6L], [id#7L], Inner :- Sort [id#6L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#6L, 200), ENSURE_REQUIREMENTS, [id=#180] : +- Filter isnotnull(id#6L) : +- FileScan parquet test_db.spark_no_bucket_table1[id#6L] Batched: true, DataFilters: [isnotnull(id#6L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_no_bucket_t..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> +- Sort [id#7L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#7L, 200), ENSURE_REQUIREMENTS, [id=#181] +- Filter isnotnull(id#7L) +- FileScan parquet test_db.spark_no_bucket_table2[id#7L] Batched: true, DataFilters: [isnotnull(id#7L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_no_bucket_t..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
As highlighted, data shuffling occurs to perform the join. This is also reflected in the job DAG as the following screenshot shows:
Now let's create the same query using bucketed tables.
Spark join with bucketed tables
Let's create a similar script using bucketBy
API when saving into Hive tables.
from pyspark.sql import SparkSession appName = "PySpark Bucketing Example" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .enableHiveSupport() \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") df1 = spark.range(1, 10 ** 6) df1.write.format('parquet').mode('overwrite').bucketBy( 100, 'id').saveAsTable('test_db.spark_bucket_table1') df2 = spark.range(1, 10 ** 5) df2.write.format('parquet').mode('overwrite').bucketBy( 100, 'id').saveAsTable('test_db.spark_bucket_table2')
And then create a third script file to read data directly using Spark SQL:
from pyspark.sql import SparkSession appName = "PySpark Bucketing Example" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .enableHiveSupport() \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df = spark.sql(""" select * from test_db.spark_bucket_table1 t1 inner join test_db.spark_bucket_table2 t2 on t1.id=t2.id """) df.show() df.explain(extended=True)
Compared with the previous version, there is one main change - bucketBy(100,'id')
is called before saving as table in Hive. This splits the data into 100 buckets (files) via column id
.
bucketBy
has to work with saveAsTable
. We cannot directly save into file system using save
API. Data layout in HDFS
One thing to notice is that we now have 100 bucket files for each of these two tables. Let's have a look at them using HDFS commands:
$ hadoop fs -ls /user/hive/warehouse/test_db.db/spark_bucket_table1 Found 101 items -rw-r--r-- 1 kontext supergroup 0 2022-08-24 21:32 /user/hive/warehouse/test_db.db/spark_bucket_table1/_SUCCESS -rw-r--r-- 1 kontext supergroup 43655 2022-08-24 21:32 /user/hive/warehouse/test_db.db/spark_bucket_table1/part-00000-8c7bc19f-ab35-4211-9cd6-7ea1d408781a_00000.c000.snappy.parquet -rw-r--r-- 1 kontext supergroup 44514 2022-08-24 21:32 /user/hive/warehouse/test_db.db/spark_bucket_table1/part-00000-8c7bc19f-ab35-4211-9cd6-7ea1d408781a_00001.c000.snappy.parquet -rw-r--r-- 1 kontext supergroup 44323 2022-08-24 21:32 /user/hive/warehouse/test_db.db/spark_bucket_table1/part-00000-8c7bc19f-ab35-4211-9cd6-7ea1d408781a_00002.c000.snappy.parquet ... ... -rw-r--r-- 1 kontext supergroup 44641 2022-08-24 21:32 /user/hive/warehouse/test_db.db/spark_bucket_table1/part-00000-8c7bc19f-ab35-4211-9cd6-7ea1d408781a_00098.c000.snappy.parquet -rw-r--r-- 1 kontext supergroup 44310 2022-08-24 21:32 /user/hive/warehouse/test_db.db/spark_bucket_table1/part-00000-8c7bc19f-ab35-4211-9cd6-7ea1d408781a_00099.c000.snappy.parquet
The layout for table 'spark_bucket_table2
' is similar. In Hive metastore, these information are also captures and Spark can utilize them to optimize query plans.
Execution plan
Now let's have a look into the Spark Web UI about the job. The following screenshot shows the DAG of the job:
There is only one stage involved (Stage 0) as there is only one action (triggered by df.show()
). Stage 0's DAG is like the following screenshot:
As the DAG shows, there is no data exchange (data shuffling) involved as both relations have exact same number of bucket files and were bucketed by the same column (the join key 'id
').
By looking into Spark web UI tab SQL/DataFrame, it show more granular information of the stages and tasks:
With this example, we can see data exchange is not required thus the performance is better. From the physical execution plan, we can also tell that bucketed file scan is also enabled:
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- SortMergeJoin [id#0L], [id#1L], Inner :- Sort [id#0L ASC NULLS FIRST], false, 0 : +- Filter isnotnull(id#0L) : +- FileScan parquet test_db.spark_bucket_table1[id#0L] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#0L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_bucket_table1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 100 out of 100 +- Sort [id#1L ASC NULLS FIRST], false, 0 +- Filter isnotnull(id#1L) +- FileScan parquet test_db.spark_bucket_table2[id#1L] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#1L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_bucket_table2], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 100 out of 100
Bucket pruning
At last, let's explore bucket pruning feature. Bucket pruning feature will select the required buckets if we add filters on bucket columns.
Let's change the Spark SQL query slightly to add filters on id
column:
df = spark.sql(""" select * from test_db.spark_bucket_table1 t1 inner join test_db.spark_bucket_table2 t2 on t1.id=t2.id where t1.id in (100, 1000) """)
Run the script again and the following physical plan will be printed out:
AdaptiveSparkPlan isFinalPlan=false +- SortMergeJoin [id#0L], [id#1L], Inner :- Sort [id#0L ASC NULLS FIRST], false, 0 : +- Filter (id#0L IN (100,1000) AND isnotnull(id#0L)) : +- FileScan parquet test_db.spark_bucket_table1[id#0L] Batched: true, Bucketed: true, DataFilters: [id#0L IN (100,1000), isnotnull(id#0L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_bucket_table1], PartitionFilters: [], PushedFilters: [In(id, [100,1000]), IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 2 out of 100 +- Sort [id#1L ASC NULLS FIRST], false, 0 +- Filter (id#1L IN (100,1000) AND isnotnull(id#1L)) +- FileScan parquet test_db.spark_bucket_table2[id#1L] Batched: true, Bucketed: true, DataFilters: [id#1L IN (100,1000), isnotnull(id#1L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_bucket_table2], PartitionFilters: [], PushedFilters: [In(id, [100,1000]), IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 2 out of 100
Only two buckets out of 100 are selected. The same information shows in the graphical plan too as the following screenshot shows:
Supported predicate expressions
One thing to notice is that only the following predicate expressions are supported:
Bucket pruning supports the following predicate expressions:
=
<=>
IN
AND
andOR
with the multiple=
,<=>
, andIN
expressions.
Summary
I hope you now have a good understanding of Spark bucketing and bucket pruning features. If you have any questions, feel free to post a comment.
References
pyspark.sql.DataFrameWriter.bucketBy — PySpark 3.3.0 documentation
Bucketing · The Internals of Spark SQL