Spark Bucketing and Bucket Pruning Explained

Kontext Kontext event 2022-08-24 visibility 2,848
more_vert
Spark Bucketing and Bucket Pruning Explained

Spark provides API (bucketBy) to split data set to smaller chunks (buckets). Mumur3 hash function is used to calculate the bucket number based on the specified bucket columns. Buckets are different from partitions as the bucket columns are still stored in the data file while partition column values are usually stored as part of file system paths. The main purpose is to avoid data shuffling when performing joins. With less data shuffling, there will be less stages required for a job thus the performance will usually better. Spark also supports bucket pruning (similar as partition pruning).

infoA common misconception is that Spark bucketing is the same as Hive bucketing but they are not compatible as they use different algorithms to decide bucket number. For Hive native bucket table, refer to Introduction to Hive Bucketed Table.

Let's jump into the details with some examples.

Spark join without buckets

Let's first look into one example of INNER JOIN of two non-bucketing tables in Spark SQL. The following is code snippet:

from pyspark.sql import SparkSession

appName = "PySpark Non-Bucketing Example"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .enableHiveSupport() \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

df1 = spark.range(1, 10 ** 6)
df1.write.format('parquet').mode('overwrite').saveAsTable(
    'test_db.spark_no_bucket_table1')

df2 = spark.range(1, 10 ** 5)
df2.write.format('parquet').mode('overwrite').saveAsTable(
    'test_db.spark_no_bucket_table2')

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df = spark.sql("""
    select * from test_db.spark_no_bucket_table1 t1
    inner join test_db.spark_no_bucket_table2 t2 
    on t1.id=t2.id
    """)

df.show()
df.explain(extended=True)

The script creates two DataFrame objects can then save then as table into Hive database test_db. Later the two tables were joined together via Spark SQL. 

infoInfo - spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) is used to turn off broadcast join so that we can compare the differences between bucketing and non-bucketing tables. Otherwise, you may find these text in the execution plan: Bucketed: false (disabled by query planner)

The text version of  physical plan looks like the following:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#6L], [id#7L], Inner
   :- Sort [id#6L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#6L, 200), ENSURE_REQUIREMENTS, [id=#180]
   :     +- Filter isnotnull(id#6L)
   :        +- FileScan parquet test_db.spark_no_bucket_table1[id#6L] Batched: true, DataFilters: [isnotnull(id#6L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_no_bucket_t..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   +- Sort [id#7L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#7L, 200), ENSURE_REQUIREMENTS, [id=#181]
         +- Filter isnotnull(id#7L)
            +- FileScan parquet test_db.spark_no_bucket_table2[id#7L] Batched: true, DataFilters: [isnotnull(id#7L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_no_bucket_t..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>

As highlighted, data shuffling occurs to perform the join. This is also reflected in the job DAG as the following screenshot shows:

20220824122255-image.png

Now let's create the same query using bucketed tables.

Spark join with bucketed tables

Let's create a similar script using bucketBy API when saving into Hive tables. 

from pyspark.sql import SparkSession

appName = "PySpark Bucketing Example"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .enableHiveSupport() \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

df1 = spark.range(1, 10 ** 6)
df1.write.format('parquet').mode('overwrite').bucketBy(
    100, 'id').saveAsTable('test_db.spark_bucket_table1')

df2 = spark.range(1, 10 ** 5)
df2.write.format('parquet').mode('overwrite').bucketBy(
    100, 'id').saveAsTable('test_db.spark_bucket_table2')

And then create a third script file to read data directly using Spark SQL:

from pyspark.sql import SparkSession

appName = "PySpark Bucketing Example"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .enableHiveSupport() \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df = spark.sql("""
    select * from test_db.spark_bucket_table1 t1
    inner join test_db.spark_bucket_table2 t2 
    on t1.id=t2.id
    """)

df.show()
df.explain(extended=True)

Compared with the previous version, there is one main change - bucketBy(100,'id') is called before saving as table in Hive. This splits the data into 100 buckets (files) via column id.

infoInfo - At the moment, bucketBy has to work with saveAsTable. We cannot directly save into file system using save API. 

Data layout in HDFS

One thing to notice is that we now have 100 bucket files for each of these two tables. Let's have a look at them using HDFS commands:

$ hadoop fs -ls /user/hive/warehouse/test_db.db/spark_bucket_table1
Found 101 items
-rw-r--r--   1 kontext supergroup          0 2022-08-24 21:32 /user/hive/warehouse/test_db.db/spark_bucket_table1/_SUCCESS
-rw-r--r--   1 kontext supergroup      43655 2022-08-24 21:32 /user/hive/warehouse/test_db.db/spark_bucket_table1/part-00000-8c7bc19f-ab35-4211-9cd6-7ea1d408781a_00000.c000.snappy.parquet
-rw-r--r--   1 kontext supergroup      44514 2022-08-24 21:32 /user/hive/warehouse/test_db.db/spark_bucket_table1/part-00000-8c7bc19f-ab35-4211-9cd6-7ea1d408781a_00001.c000.snappy.parquet
-rw-r--r--   1 kontext supergroup      44323 2022-08-24 21:32 /user/hive/warehouse/test_db.db/spark_bucket_table1/part-00000-8c7bc19f-ab35-4211-9cd6-7ea1d408781a_00002.c000.snappy.parquet
...
...
-rw-r--r--   1 kontext supergroup      44641 2022-08-24 21:32 /user/hive/warehouse/test_db.db/spark_bucket_table1/part-00000-8c7bc19f-ab35-4211-9cd6-7ea1d408781a_00098.c000.snappy.parquet
-rw-r--r--   1 kontext supergroup      44310 2022-08-24 21:32 /user/hive/warehouse/test_db.db/spark_bucket_table1/part-00000-8c7bc19f-ab35-4211-9cd6-7ea1d408781a_00099.c000.snappy.parquet

The layout for table 'spark_bucket_table2' is similar. In Hive metastore, these information are also captures and Spark can utilize them to optimize query plans.

Execution plan

Now let's have a look into the Spark Web UI about the job. The following screenshot shows the DAG of the job:

20220824123549-image.png

There is only one stage involved (Stage 0) as there is only one action (triggered by df.show()). Stage 0's DAG is like the following screenshot:

20220824124225-image.png

As the DAG shows, there is no data exchange (data shuffling) involved as both relations have exact same number of bucket files and were bucketed by the same column (the join key 'id').

By looking into Spark web UI tab SQL/DataFrame, it show more granular information of the stages and tasks:

20220824124927-image.png

With this example, we can see data exchange is not required thus the performance is better. From the physical execution plan, we can also tell that bucketed file scan is also enabled:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#0L], [id#1L], Inner
   :- Sort [id#0L ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(id#0L)
   :     +- FileScan parquet test_db.spark_bucket_table1[id#0L] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#0L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_bucket_table1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 100 out of 100
   +- Sort [id#1L ASC NULLS FIRST], false, 0
      +- Filter isnotnull(id#1L)
         +- FileScan parquet test_db.spark_bucket_table2[id#1L] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#1L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_bucket_table2], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 100 out of 100

Bucket pruning

At last, let's explore bucket pruning feature. Bucket pruning feature will select the required buckets if we add filters on bucket columns.

Let's change the Spark SQL query slightly to add filters on id column:

df = spark.sql("""
    select * from test_db.spark_bucket_table1 t1
    inner join test_db.spark_bucket_table2 t2 
    on t1.id=t2.id
    where t1.id in (100, 1000)
    """)

Run the script again and the following physical plan will be printed out:

AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#0L], [id#1L], Inner
   :- Sort [id#0L ASC NULLS FIRST], false, 0
   :  +- Filter (id#0L IN (100,1000) AND isnotnull(id#0L))
   :     +- FileScan parquet test_db.spark_bucket_table1[id#0L] Batched: true, Bucketed: true, DataFilters: [id#0L IN (100,1000), isnotnull(id#0L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_bucket_table1], PartitionFilters: [], PushedFilters: [In(id, [100,1000]), IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 2 out of 100
   +- Sort [id#1L ASC NULLS FIRST], false, 0
      +- Filter (id#1L IN (100,1000) AND isnotnull(id#1L))
         +- FileScan parquet test_db.spark_bucket_table2[id#1L] Batched: true, Bucketed: true, DataFilters: [id#1L IN (100,1000), isnotnull(id#1L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_bucket_table2], PartitionFilters: [], PushedFilters: [In(id, [100,1000]), IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 2 out of 100

Only two buckets out of 100 are selected. The same information shows in the graphical plan too as the following screenshot shows:

20220824133949-image.png

Supported predicate expressions

One thing to notice is that only the following predicate expressions are supported:

Bucket pruning supports the following predicate expressions:

  • =
  • <=>
  • IN
  • AND and OR with the multiple =, <=>, and IN expressions.

Summary

I hope you now have a good understanding of Spark bucketing and bucket pruning features. If you have any questions, feel free to post a comment.

References

pyspark.sql.DataFrameWriter.bucketBy — PySpark 3.3.0 documentation

Bucketing · The Internals of Spark SQL

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts