Introduction to Hive Bucketed Table
In article Hive SQL - Cluster By and Distribute By, it shows examples of using CLUSTER BY and DISTRIBUTE BY in SELECT queries that can potentially improve the performance of the downstream MapReduce tasks. In fact, we can also create bucketed tables directly in Hive. Bucketing can improve query performances when doing select with filter or table sampling or joins between tables with same bucket columns, etc. This article provides a good summary about Hive bucketing.
About bucketed Hive table
A bucketed table split the data of the table into smaller chunks based on columns specified by CLUSTER BY clause. It can work with or without partitions. If a table is partitioned, each partition folder in storage will have bucket files.
With bucketed table, data with same bucket keys will be written into the same single file which makes it easy to join tables or read data. This usually leads to better performance especially if you table is huge.
Create bucketed table
Hive bucketed table can be created by adding CLUSTER BY clause. The following is one example of creating a partitioned and bucketed table.
create table test_db.bucket_table(user_id int, key string) comment 'A bucketed table' partitioned by(country string) clustered by(user_id) sorted by (key) into 10 buckets stored as ORC;
The above DDL will create bucketed table bucket_table
that is partitioned by country column and clustered by user_id
column into 10 clusters (buckets). SORTED BY
clause is optional.
Refer to Create Bucketed Sorted Table - Hive SQL for more examples.
How does bucketing work?
When inserting records into a Hive bucket table, a bucket number will be calculated using the following algorithym:
hash_function(bucketing_column) mod num_buckets
For about example table above, the algorithm is:
hash_function(user_id) mod 10
The hash function varies depends on the data type. Murmur3 is the algorithym used in Hive. You can find details in source code on GitHub: public static int hashCodeMurmur(Object o, ObjectInspector objIns, ByteBuffer byteBuffer).
One thing to remember is that - the same input with same data type will generate the same bucket number.
Insert sample data
Now let's create a sample PySpark scripts to insert data into table bucket_table
.
from pyspark.sql import SparkSession appName = "PySpark Hive Bucketing Example" master = "local" # Create Spark session with Hive supported. spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .enableHiveSupport() \ .getOrCreate() # prepare sample data for inserting into hive table data = [] countries = ['CN', 'AU'] for i in range(0, 1000): data.append([int(i), 'U'+str(i), countries[i % 2]]) df = spark.createDataFrame(data, ['user_id', 'key', 'country']) df.show() # Save df to Hive table test_db.bucket_table df.write.mode('append').insertInto('test_db.bucket_table')
The above script creates 1000 rows in memory and then save the DataFrame into hive table using insertInto
API.
By selecting the data from Hive table in Beeline, the output looks like the following:
0: jdbc:hive2://> select * from test_db.bucket_table limit 10; OK ----------------------- ------------------- ----------------------- | bucket_table.user_id | bucket_table.key | bucket_table.country | ----------------------- ------------------- ----------------------- | 1 | U1 | AU | | 11 | U11 | AU | | 21 | U21 | AU | | 31 | U31 | AU | | 41 | U41 | AU | | 51 | U51 | AU | | 61 | U61 | AU | | 71 | U71 | AU | | 81 | U81 | AU | | 91 | U91 | AU | ----------------------- ------------------- -----------------------
HDFS structure for this bucket table
Now let's have a look into the folder structure of this table in HDFS using the following command:
hadoop fs -ls -R /user/hive/warehouse/test_db.db/bucket_table
The output looks like the following screenshot:
As expected, there are two partition sub folders named country=AU
and country=CN
. Under each partition folder, there are multiple bucket files. As our sample data volume is small, it doesn't get 10 bucket files created. Instead, there are 5 bucket files for each. From the file name we can pretty much tell the bucket number:
00001_0_part-00000-a2ea270a-326d-4c29-b16b-dab43512309f_00001.c000.snappy.orc
Query from bucket table in Spark
Now let's have a look into how bucketing table improve performances.
Run the following Spark script which uses Spark SQL to query this table with filters.
from pyspark.sql import SparkSession appName = "PySpark Hive Bucketing Example" master = "local" # Create Spark session with Hive supported. spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .enableHiveSupport() \ .getOrCreate() df = spark.sql("""select * from test_db.bucket_table where country='AU' and user_id=101 """) df.show() df.explain(extended=True)
From the execution plan, we expect to see partition push down but not filter push down to bucket level.
== Parsed Logical Plan == 'Project [*] - 'Filter (('country = AU) AND ('t1.user_id = 101)) - 'SubqueryAlias t1 - 'UnresolvedRelation [test_db, bucket_table], [], false == Analyzed Logical Plan == user_id: int, key: string, country: string Project [user_id#20, key#21, country#22] - Filter ((country#22 = AU) AND (user_id#20 = 101)) - SubqueryAlias t1 - SubqueryAlias spark_catalog.test_db.bucket_table - Relation test_db.bucket_table[user_id#20,key#21,country#22] orc == Optimized Logical Plan == Filter (((isnotnull(country#22) AND isnotnull(user_id#20)) AND (country#22 = AU)) AND (user_id#20 = 101)) - Relation test_db.bucket_table[user_id#20,key#21,country#22] orc == Physical Plan == *(1) Filter (isnotnull(user_id#20) AND (user_id#20 = 101)) - *(1) ColumnarToRow - FileScan orc test_db.bucket_table[user_id#20,key#21,country#22] Batched: true, DataFilters: [isnotnull(user_id#20), (user_id#20 = 101)], Format: ORC, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/bucket_table/coun..., PartitionFilters: [isnotnull(country#22), (country#22 = AU)], PushedFilters: [IsNotNull(user_id), EqualTo(user_id,101)], ReadSchema: struct<user_id:int,key:string>
TODO: figure out why predict pushdown is not utilizing bucket files though the partition pruning is working.
References
LanguageManual DDL BucketedTables - Apache Hive - Apache Software Foundation
I have not figured out the reason why it is not being used. I would expect similar behavior as Spark bucketing table pruning: Spark Bucketing and Bucket Pruning Explained.
AdaptiveSparkPlan isFinalPlan=false +- SortMergeJoin [id#0L], [id#1L], Inner :- Sort [id#0L ASC NULLS FIRST], false, 0 : +- Filter (id#0L IN (100,1000) AND isnotnull(id#0L)) : +- FileScan parquet test_db.spark_bucket_table1[id#0L] Batched: true, Bucketed: true, DataFilters: [id#0L IN (100,1000), isnotnull(id#0L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_bucket_table1], PartitionFilters: [], PushedFilters: [In(id, [100,1000]), IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 2 out of 100 +- Sort [id#1L ASC NULLS FIRST], false, 0 +- Filter (id#1L IN (100,1000) AND isnotnull(id#1L)) +- FileScan parquet test_db.spark_bucket_table2[id#1L] Batched: true, Bucketed: true, DataFilters: [id#1L IN (100,1000), isnotnull(id#1L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_bucket_table2], PartitionFilters: [], PushedFilters: [In(id, [100,1000]), IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 2 out of 100
I have also raised this to Spark user community and will update once I get any feedback or if I figure it out myself.
What about TODO: ? :)