Hive ACID Inserts, Updates and Deletes with ORC
From Hive 3, ACID operations are enhanced with Orc file format though full ACID semantics are supported from Hive 0.13 (before that it can only be done at partition level). This article shows you how Hive ACID tables work with ORC format.
About ORC format
ORC means optimized row columnar. It is the smallest and fastest columnar storage for Hadoop workloads. It is still a write-once file format and updates and deletes are implemented using base and delta files. ORC will automatically merge small delta files into big ones and then merge them into base files when delta files grow big enough.
ORC adds a few additional attributes to each table. 'operation
' attribute indicates the action. Its possible values are:
- 0 - INSERT
- 1 - UPDATE
- 2 - DELETE
And the triple that uniquely identify each row (originalTransaction
, bucket
, rowId
). The last attribute is the currentTransaction
which indicates the current write ID. The data type of each record looks like the following struct:
struct< operation: int, originalTransaction: bigInt, bucket: int, rowId: bigInt, currentTransaction: bigInt, row: struct<...> >
ORC also has additional features to support streaming writing into Hive tables.
Now let's start creating a Hive table to explore these features.
Create Hive table stored as ORC
In the following sections, I am going to use my local Hive instance (3.1.3) on WSL to explore these features. If you don't have Hive to work with, you can install one by following one of these articles:
Apache Hive 3.1.2 Installation on Linux Guide
Apache Hive 3.1.2 Installation on Windows 10
For enabling transactions (ACID support), you need to ensure your Hive instance has enabled it. You can follow this article to configure it if not enabled by default: Configure HiveServer2 to Enable Transactions (ACID Support).
Create a table
Now let's create a Hive table in my test_db database using the following SQL statements:
use test_db; SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; SET hive.support.concurrency=true; create table crud_table (id INT, value STRING) STORED AS ORC TBLPROPERTIES ('transactional' = 'true');
The two SET statements are not necessary if they are added into client configurations.
Show the table definition using SQL 'show create table crud_table;
'. It prints out the following definition:
+----------------------------------------------------+ | createtab_stmt | +----------------------------------------------------+ | CREATE TABLE `crud_table`( | | `id` int, | | `value` string) | | ROW FORMAT SERDE | | 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' | | STORED AS INPUTFORMAT | | 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' | | OUTPUTFORMAT | | 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' | | LOCATION | | 'hdfs://localhost:9000/user/hive/warehouse/test_db.db/crud_table' | | TBLPROPERTIES ( | | 'bucketing_version'='2', | | 'transactional'='true', | | 'transactional_properties'='default', | | 'transient_lastDdlTime'='1660724168') | +----------------------------------------------------+
As you can see, the SerDe is using ORC format. We can also use DESCRIBE FORMATTED crud_table
to print out more information like the following screenshot:
At the moment, the table will be empty.
Populate some data
Now let's insert some data into this table using INSERT statements:
insert into crud_table(id, value) values (1,'A'), (2,'B'), (3,'C');
INSERT statements run as map-reduce jobs. Once it is completed, we can display the result using SELECT statement:
select * from crud_table; OK +----------------+-------------------+ | crud_table.id | crud_table.value | +----------------+-------------------+ | 1 | A | | 2 | B | | 3 | C | +----------------+-------------------+
Now let's have a look into HDFS of this managed table using the following Hadoop CLI command:
hadoop fs -ls -h /user/hive/warehouse/test_db.db/crud_table Found 1 items drwxr-xr-x - kontext supergroup 0 2022-08-17 18:23 /user/hive/warehouse/test_db.db/crud_table/delta_0000001_0000001_0000
The INSERT statement creates a delta folder named 'delta_0000001_0000001_0000'. Within this folder, it contains two files:
-rw-r--r-- 1 kontext supergroup 1 2022-08-17 18:23 /user/hive/warehouse/test_db.db/crud_table/delta_0000001_0000001_0000/_orc_acid_version -rw-r--r-- 1 kontext supergroup 714 2022-08-17 18:23 /user/hive/warehouse/test_db.db/crud_table/delta_0000001_0000001_0000/bucket_00000
For file '_orc_acid_version
', it always contains value 2. It is used to differentiate from version 1. For version 2 ACID, the UPDATE operation behaves differently (HIVE-14035).
$ hadoop fs -cat /user/hive/warehouse/test_db.db/crud_table/delta_0000001_0000001_0000/_orc_acid_version 2
For file 'bucket_00000
', it includes all the data of those three records. We can get the result of the file using content:
hive --orcfiledump -d /user/hive/warehouse/test_db.db/crud_table/delta_0000001_0000001_0000/bucket_00000
It prints out these text:
Processing data file /user/hive/warehouse/test_db.db/crud_table/delta_0000001_0000001_0000/bucket_00000 [length: 714]
{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,"row":{"id":1,"value":"A"}}
{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":1,"row":{"id":2,"value":"B"}}
{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":2,"currentTransaction":1,"row":{"id":3,"value":"C"}}
As mentioned previously, operation 0 means INSERT; attributes 'originalTransaction
', 'bucket
', and 'rowId
' uniquely identifies each record. As mentioned in article Hive SQL - Virtual Columns, we can use row__id
virtual column to display these information too.
0: jdbc:hive2://> select row__id, id, value from crud_table; OK +-----------------------------------------------+-----+--------+ | row__id | id | value | +-----------------------------------------------+-----+--------+ | {"writeid":1,"bucketid":536870912,"rowid":0} | 1 | A | | {"writeid":1,"bucketid":536870912,"rowid":1} | 2 | B | | {"writeid":1,"bucketid":536870912,"rowid":2} | 3 | C | +-----------------------------------------------+-----+--------+ 3 rows selected (1.066 seconds)
Update data
Now let's issue an UPDATE statement:
update crud_table set value='CC' where id=3;
The HDFS folder now looks like the following:
$ hadoop fs -ls -h /user/hive/warehouse/test_db.db/crud_table Found 3 items drwxr-xr-x - kontext supergroup 0 2022-08-17 18:52 /user/hive/warehouse/test_db.db/crud_table/delete_delta_0000002_0000002_0000 drwxr-xr-x - kontext supergroup 0 2022-08-17 18:23 /user/hive/warehouse/test_db.db/crud_table/delta_0000001_0000001_0000 drwxr-xr-x - kontext supergroup 0 2022-08-17 18:52 /user/hive/warehouse/test_db.db/crud_table/delta_0000002_0000002_0000
The previous folder is not changed while two new folders added: delta_0000002_0000002_0000
and delete_delta_0000002_0000002_0000
. Each of these two folders include two files (one as '_orc_acid_version' to store ACID format version).
In the delete delta folder, the content of bucket_00000
records the DELETE operation (value 2):
{"operation":2,"originalTransaction":1,"bucket":536870912,"rowId":2,"currentTransaction":2,"row":null}
In the update delta folder, the content is:
{"operation":0,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":2,"row":{"id":3,"value":"CC"}}
Now this new record's originalTransaction
is marked as 2. So from this we can tell that: an UPDATE action in fact is to mark the previous record as DELETED and then insert an new record. When reading from ORC files, it will be skipped automatically.
Delete data
As we can imagine, any DELETE statement will just simply create a delete delta folder.
delete from crud_table where id=3;
Run the above statement, a new folder named delete_delta_0000003_0000003_0000
is created. Similar as the previous one, there are two files. The data file records the following information:
{"operation":2,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":3,"row":null}
The current transaction ID is 3 for this record.
Till now, the table has only two records:
+----------------+-------------------+ | crud_table.id | crud_table.value | +----------------+-------------------+ | 1 | A | | 2 | B | +----------------+-------------------+
Small files compaction
As for any big data frameworks, large volume of small files read are not good for performance. In the above example, each transaction generates at least one delta folder and each folder can contain many files for each bucket (for this case I am not bucketizing the table thus there is only one data file). Soon enough the performance can suffer. To address this issue, we can consolidate delta files into base files. We can do this using the following SQL statement:
ALTER TABLE crud_table compact ['minor'|'major'];
The differences between minor and major compact are:
- minor compact will merge multiple delete and delta folders into one
- major compact will merge all folders into a single one named
base_N
whereN
is the latest write ID (currentTransaction
).
If your table has partitions, you can also specify it:
ALTER TABLE tablename [PARTITION (partition_key='partition_value' [,...])] COMPACT 'compaction_type'
While the compaction is running, we can use the following SQL statement to check the status:
SHOW COMPACTIONS;
The result looks like the following screenshot:
After major compaction, the state will change to succeeded as the following screenshot shows:
The HDFS folder for this table has the following folders:
$ hadoop fs -ls -h /user/hive/warehouse/test_db.db/crud_table Found 2 items -rw-r--r-- 1 kontext supergroup 0 2022-08-17 20:44 /user/hive/warehouse/test_db.db/crud_table/_SUCCESS drwxr-xr-x - kontext supergroup 0 2022-08-20 12:06 /user/hive/warehouse/test_db.db/crud_table/base_0000003
Under base_0000003 folder, there are three files:
/user/hive/warehouse/test_db.db/crud_table/base_0000003/_metadata_acid /user/hive/warehouse/test_db.db/crud_table/base_0000003/_orc_acid_version /user/hive/warehouse/test_db.db/crud_table/base_0000003/bucket_00000
The last one is the data file and it includes the following information:
{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,"row":{"id":1,"value":"A"}} {"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":1,"row":{"id":2,"value":"B"}}
As you can see, record 3 was deleted from data file.
Summary
Hopefully you now have a good understanding about how transaction tables work in Hive with Orc file format. Even Hive supports transaction tables, you should not use it as a transactional database for OLTP purpose as it can support millions of rows updated per a transaction, but it cannot support millions of transactions an hour.
References
For more advanced information about how Hive read those delta and base folders, refer to these articles:
Understanding Hive ACID Transactional Table | Ji ZHANG's Blog
Evolving Draft for ORC Specification v2
There are can be many different points. I will just provide some typical ones here:
If your use case has no low latency requirement, you can use it. For example, ingest events stored in Kafka into a persistent table in Hive.
Spark Delta uses different engine and usually can provide better performance but it really depends on your exact use cases. Delta Lake has the advantage of not relying on a metastore hence it is very easy to migrate from one platform to another.
Thank you very much for your article, it is very useful. As you mentioned in the conclusion that it should not use as the OLTP. Could you provide additional suggestions for the below questions?
What is the situation that we should consider to use this feature? Please give me an example.
As it executes as Map-Reduce, So if we want the better performance. Can we use Spark (Ex. PySpark) with the Delta library?
BRs,
Ugrite Hita