Kontext Kontext

Introduction to Hive Bucketed Table

event 2022-08-24 visibility 914 comment 2 insights toc
more_vert
insights Stats
Introduction to Hive Bucketed Table

In article Hive SQL - Cluster By and Distribute By, it shows examples of using CLUSTER BY and DISTRIBUTE BY in SELECT queries that can potentially improve the performance of the downstream MapReduce tasks. In fact, we can also create bucketed tables directly in Hive. Bucketing can improve query performances when doing select with filter or table sampling or joins between tables with same bucket columns, etc. This article provides a good summary about Hive bucketing.

About bucketed Hive table

A bucketed table split the data of the table into smaller chunks based on columns specified by CLUSTER BY clause. It can work with or without partitions. If a table is partitioned, each partition folder in storage will have bucket files. 

With bucketed table, data with same bucket keys will be written into the same single file which makes it easy to join tables or read data. This usually leads to better performance especially if you table is huge.

Create bucketed table

Hive bucketed table can be created by adding CLUSTER BY clause. The following is one example of creating a partitioned and bucketed table.

create table test_db.bucket_table(user_id int, key string) 
comment 'A bucketed table' 
partitioned by(country string) 
clustered by(user_id) sorted by (key) into 10 buckets
stored as ORC;

The above DDL will create bucketed table bucket_table that is partitioned by country column and clustered by user_id column into 10 clusters (buckets). SORTED BY clause is optional.

Refer to Create Bucketed Sorted Table - Hive SQL for more examples. 

How does bucketing work?

When inserting records into a Hive bucket table, a bucket number will be calculated using the following algorithym:

hash_function(bucketing_column) mod num_buckets

For about example table above, the algorithm is:

hash_function(user_id) mod 10

The hash function varies depends on the data type. Murmur3 is the algorithym used in Hive. You can find details in source code on GitHub: public static int hashCodeMurmur(Object o, ObjectInspector objIns, ByteBuffer byteBuffer)

One thing to remember is that - the same input with same data type will generate the same bucket number. 

Insert sample data

Now let's create a sample PySpark scripts to insert data into table bucket_table.

from pyspark.sql import SparkSession

appName = "PySpark Hive Bucketing Example"
master = "local"

# Create Spark session with Hive supported.
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .enableHiveSupport() \
    .getOrCreate()

# prepare sample data for inserting into hive table
data = []
countries = ['CN', 'AU']
for i in range(0, 1000):
    data.append([int(i),  'U'+str(i), countries[i % 2]])

df = spark.createDataFrame(data, ['user_id', 'key', 'country'])
df.show()

# Save df to Hive table test_db.bucket_table

df.write.mode('append').insertInto('test_db.bucket_table')

The above script creates 1000 rows in memory and then save the DataFrame into hive table using insertInto API.

By selecting the data from Hive table in Beeline, the output looks like the following:

0: jdbc:hive2://> select * from test_db.bucket_table limit 10;
OK
 ----------------------- ------------------- ----------------------- 
| bucket_table.user_id  | bucket_table.key  | bucket_table.country  |
 ----------------------- ------------------- ----------------------- 
| 1                     | U1                | AU                    |
| 11                    | U11               | AU                    |
| 21                    | U21               | AU                    |
| 31                    | U31               | AU                    |
| 41                    | U41               | AU                    |
| 51                    | U51               | AU                    |
| 61                    | U61               | AU                    |
| 71                    | U71               | AU                    |
| 81                    | U81               | AU                    |
| 91                    | U91               | AU                    |
 ----------------------- ------------------- ----------------------- 

HDFS structure for this bucket table

Now let's have a look into the folder structure of this table in HDFS using the following command:

hadoop fs -ls -R /user/hive/warehouse/test_db.db/bucket_table

The output looks like the following screenshot:

2022082465735-image.png

As expected, there are two partition sub folders named country=AU and country=CN. Under each partition folder, there are multiple bucket files. As our sample data volume is small, it doesn't get 10 bucket files created. Instead, there are 5 bucket files for each. From the file name we can pretty much tell the bucket number:

00001_0_part-00000-a2ea270a-326d-4c29-b16b-dab43512309f_00001.c000.snappy.orc

Query from bucket table in Spark

Now let's have a look into how bucketing table improve performances. 

Run the following Spark script which uses Spark SQL to query this table with filters.

from pyspark.sql import SparkSession

appName = "PySpark Hive Bucketing Example"
master = "local"

# Create Spark session with Hive supported.
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .enableHiveSupport() \
    .getOrCreate()

df = spark.sql("""select * from test_db.bucket_table
where country='AU' and user_id=101
""")
df.show()
df.explain(extended=True)

From the execution plan, we expect to see partition push down but not filter push down to bucket level.

== Parsed Logical Plan ==
'Project [*]
 - 'Filter (('country = AU) AND ('t1.user_id = 101))
    - 'SubqueryAlias t1
       - 'UnresolvedRelation [test_db, bucket_table], [], false

== Analyzed Logical Plan ==
user_id: int, key: string, country: string
Project [user_id#20, key#21, country#22]
 - Filter ((country#22 = AU) AND (user_id#20 = 101))
    - SubqueryAlias t1
       - SubqueryAlias spark_catalog.test_db.bucket_table
          - Relation test_db.bucket_table[user_id#20,key#21,country#22] orc

== Optimized Logical Plan ==
Filter (((isnotnull(country#22) AND isnotnull(user_id#20)) AND (country#22 = AU)) AND (user_id#20 = 101))
 - Relation test_db.bucket_table[user_id#20,key#21,country#22] orc

== Physical Plan ==
*(1) Filter (isnotnull(user_id#20) AND (user_id#20 = 101))
 - *(1) ColumnarToRow
    - FileScan orc test_db.bucket_table[user_id#20,key#21,country#22] Batched: true, DataFilters: [isnotnull(user_id#20), (user_id#20 = 101)], Format: ORC, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/bucket_table/coun..., PartitionFilters: [isnotnull(country#22), (country#22 = AU)], PushedFilters: [IsNotNull(user_id), EqualTo(user_id,101)], ReadSchema: struct<user_id:int,key:string>  

TODO: figure out why predict pushdown is not utilizing bucket files though the partition pruning is working. 

References

LanguageManual DDL BucketedTables - Apache Hive - Apache Software Foundation

More from Kontext
comment Comments
Kontext Kontext #1716 access_time 2 years ago more_vert

I have not figured out the reason why it is not being used. I would expect similar behavior as Spark bucketing table pruning: Spark Bucketing and Bucket Pruning Explained.

AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#0L], [id#1L], Inner
   :- Sort [id#0L ASC NULLS FIRST], false, 0
   :  +- Filter (id#0L IN (100,1000) AND isnotnull(id#0L))
   :     +- FileScan parquet test_db.spark_bucket_table1[id#0L] Batched: true, Bucketed: true, DataFilters: [id#0L IN (100,1000), isnotnull(id#0L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_bucket_table1], PartitionFilters: [], PushedFilters: [In(id, [100,1000]), IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 2 out of 100
   +- Sort [id#1L ASC NULLS FIRST], false, 0
      +- Filter (id#1L IN (100,1000) AND isnotnull(id#1L))
         +- FileScan parquet test_db.spark_bucket_table2[id#1L] Batched: true, Bucketed: true, DataFilters: [id#1L IN (100,1000), isnotnull(id#1L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://localhost:9000/user/hive/warehouse/test_db.db/spark_bucket_table2], PartitionFilters: [], PushedFilters: [In(id, [100,1000]), IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 2 out of 100

I have also raised this to Spark user community and will update once I get any feedback or if I figure it out myself. 

format_quote

person Michał access_time 2 years ago

What about TODO: ? :)

M Michał ŚWIATOWY #1715 access_time 2 years ago more_vert

What about TODO: ? :)

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts