Hive ACID Inserts, Updates and Deletes with ORC

Raymond Raymond event 2022-08-17 visibility 2,251 comment 2
more_vert
Hive ACID Inserts, Updates and Deletes with ORC

From Hive 3, ACID operations are enhanced with Orc file format though full ACID semantics are supported from Hive 0.13 (before that it can only be done at partition level). This article shows you how Hive ACID tables work with ORC format.

About ORC format

ORC means optimized row columnar. It is the smallest and fastest columnar storage for Hadoop workloads. It is still a write-once file format and updates and deletes are implemented using base and delta files. ORC will automatically merge small delta files into big ones and then merge them into base files when delta files grow big enough.

ORC adds a few additional attributes to each table. 'operation' attribute indicates the action. Its possible values are:

  • 0 - INSERT
  • 1 - UPDATE
  • 2 - DELETE

And the triple that uniquely identify each row (originalTransactionbucketrowId). The last attribute is the currentTransaction which indicates the current write ID. The data type of each record looks like the following struct:

struct<
  operation: int,
  originalTransaction: bigInt,
  bucket: int,
  rowId: bigInt,
  currentTransaction: bigInt,
  row: struct<...>
>

ORC also has additional features to support streaming writing into Hive tables. 

Now let's start creating a Hive table to explore these features.

Create Hive table stored as ORC

In the following sections, I am going to use my local Hive instance (3.1.3) on WSL to explore these features. If you don't have Hive to work with, you can install one by following one of these articles:

Apache Hive 3.1.2 Installation on Linux Guide
Apache Hive 3.1.2 Installation on Windows 10
infoFor simplicity, you may just use Derby as metastore for Hive instead of a remote relational database.

For enabling transactions (ACID support), you need to ensure your Hive instance has enabled it. You can follow this article to configure it if not enabled by default: Configure HiveServer2 to Enable Transactions (ACID Support).

Create a table

Now let's create a Hive table in my test_db database using the following SQL statements:

use test_db;
SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
SET hive.support.concurrency=true;
create table crud_table (id INT, value STRING) STORED AS ORC TBLPROPERTIES ('transactional' = 'true');

The two SET statements are not necessary if they are added into client configurations. 

Show the table definition using SQL 'show create table crud_table;'. It prints out the following definition:

+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE `crud_table`(                         |
|   `id` int,                                        |
|   `value` string)                                  |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.ql.io.orc.OrcSerde'      |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'  |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' |
| LOCATION                                           |
|   'hdfs://localhost:9000/user/hive/warehouse/test_db.db/crud_table' |
| TBLPROPERTIES (                                    |
|   'bucketing_version'='2',                         |
|   'transactional'='true',                          |
|   'transactional_properties'='default',            |
|   'transient_lastDdlTime'='1660724168')            |
+----------------------------------------------------+

As you can see, the SerDe is using ORC format. We can also use DESCRIBE FORMATTED crud_table to print out more information like the following screenshot:

2022081782029-image.png

At the moment, the table will be empty.

Populate some data

Now let's insert some data into this table using INSERT statements:

insert into crud_table(id, value) values
(1,'A'),
(2,'B'),
(3,'C');

INSERT statements run as map-reduce jobs. Once it is completed, we can display the result using SELECT statement:

select * from crud_table;
OK
+----------------+-------------------+
| crud_table.id  | crud_table.value  |
+----------------+-------------------+
| 1              | A                 |
| 2              | B                 |
| 3              | C                 |
+----------------+-------------------+

Now let's have a look into HDFS of this managed table using the following Hadoop CLI command:

hadoop fs -ls -h /user/hive/warehouse/test_db.db/crud_table

Found 1 items
drwxr-xr-x   - kontext supergroup          0 2022-08-17 18:23 /user/hive/warehouse/test_db.db/crud_table/delta_0000001_0000001_0000

The INSERT statement creates a delta folder named 'delta_0000001_0000001_0000'. Within this folder, it contains two files:

-rw-r--r--   1 kontext supergroup          1 2022-08-17 18:23 /user/hive/warehouse/test_db.db/crud_table/delta_0000001_0000001_0000/_orc_acid_version
-rw-r--r--   1 kontext supergroup        714 2022-08-17 18:23 /user/hive/warehouse/test_db.db/crud_table/delta_0000001_0000001_0000/bucket_00000
infoInfo - Both UPDATE and INSERT statements will create a delta folder for each transaction; DELETE statement will create a delete folder prefixed with delete_delta.

For file '_orc_acid_version', it always contains value 2. It is used to differentiate from version 1. For version 2 ACID, the UPDATE operation behaves differently (HIVE-14035). 

$ hadoop fs -cat /user/hive/warehouse/test_db.db/crud_table/delta_0000001_0000001_0000/_orc_acid_version
2

For file 'bucket_00000', it includes all the data of those three records. We can get the result of the file using content:

hive --orcfiledump -d /user/hive/warehouse/test_db.db/crud_table/delta_0000001_0000001_0000/bucket_00000

It prints out these text:

Processing data file /user/hive/warehouse/test_db.db/crud_table/delta_0000001_0000001_0000/bucket_00000 [length: 714]
{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,"row":{"id":1,"value":"A"}}
{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":1,"row":{"id":2,"value":"B"}}
{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":2,"currentTransaction":1,"row":{"id":3,"value":"C"}}

As mentioned previously, operation 0 means INSERT; attributes 'originalTransaction', 'bucket', and 'rowId' uniquely identifies each record.  As mentioned in article Hive SQL - Virtual Columns, we can use row__id virtual column to display these information too.

0: jdbc:hive2://> select row__id, id, value from crud_table;
OK
+-----------------------------------------------+-----+--------+
|                    row__id                    | id  | value  |
+-----------------------------------------------+-----+--------+
| {"writeid":1,"bucketid":536870912,"rowid":0}  | 1   | A      |
| {"writeid":1,"bucketid":536870912,"rowid":1}  | 2   | B      |
| {"writeid":1,"bucketid":536870912,"rowid":2}  | 3   | C      |
+-----------------------------------------------+-----+--------+
3 rows selected (1.066 seconds)

Update data

Now let's issue an UPDATE statement:

update crud_table set value='CC' where id=3;

The HDFS folder now looks like the following:

$ hadoop fs -ls -h /user/hive/warehouse/test_db.db/crud_table
Found 3 items
drwxr-xr-x   - kontext supergroup          0 2022-08-17 18:52 /user/hive/warehouse/test_db.db/crud_table/delete_delta_0000002_0000002_0000
drwxr-xr-x   - kontext supergroup          0 2022-08-17 18:23 /user/hive/warehouse/test_db.db/crud_table/delta_0000001_0000001_0000
drwxr-xr-x   - kontext supergroup          0 2022-08-17 18:52 /user/hive/warehouse/test_db.db/crud_table/delta_0000002_0000002_0000

The previous folder is not changed while two new folders added: delta_0000002_0000002_0000 and delete_delta_0000002_0000002_0000. Each of these two folders include two files (one as '_orc_acid_version' to store ACID format version).

In the delete delta folder, the content of bucket_00000 records the DELETE operation (value 2):

{"operation":2,"originalTransaction":1,"bucket":536870912,"rowId":2,"currentTransaction":2,"row":null}

In the update delta folder, the content is:

{"operation":0,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":2,"row":{"id":3,"value":"CC"}}

Now this new record's originalTransaction is marked as 2. So from this we can tell that: an UPDATE action in fact is to mark the previous record as DELETED and then insert an new record. When reading from ORC files, it will be skipped automatically.

Delete data

As we can imagine, any DELETE statement will just simply create a delete delta folder.

delete from crud_table where id=3;

Run the above statement, a new folder named delete_delta_0000003_0000003_0000 is created. Similar as the previous one, there are two files. The data file records the following information:

{"operation":2,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":3,"row":null}

The current transaction ID is 3 for this record. 

Till now, the table has only two records:

+----------------+-------------------+
| crud_table.id  | crud_table.value  |
+----------------+-------------------+
| 1              | A                 |
| 2              | B                 |
+----------------+-------------------+

Small files compaction

As for any big data frameworks, large volume of small files read are not good for performance. In the above example, each transaction generates at least one delta folder and each folder can contain many files for each bucket (for this case I am not bucketizing the table thus there is only one data file). Soon enough the performance can suffer. To address this issue, we can consolidate delta files into base files. We can do this using the following SQL statement:

ALTER TABLE crud_table compact ['minor'|'major'];

The differences between minor and major compact are:

  • minor compact will merge multiple delete and delta folders into one
  • major compact will merge all folders into a single one named base_N where N is the latest write ID (currentTransaction).

If your table has partitions, you can also specify it:

ALTER TABLE tablename [PARTITION (partition_key='partition_value' [,...])] COMPACT 'compaction_type'

While the compaction is running, we can use the following SQL statement to check the status:

SHOW COMPACTIONS;

The result looks like the following screenshot:

2022081794253-image.png

After major compaction, the state will change to succeeded as the following screenshot shows:

2022082021553-image.png

The HDFS folder for this table has the following folders:

$ hadoop fs -ls -h /user/hive/warehouse/test_db.db/crud_table
Found 2 items
-rw-r--r--   1 kontext supergroup          0 2022-08-17 20:44 /user/hive/warehouse/test_db.db/crud_table/_SUCCESS
drwxr-xr-x   - kontext supergroup          0 2022-08-20 12:06 /user/hive/warehouse/test_db.db/crud_table/base_0000003

Under base_0000003 folder, there are three files:

/user/hive/warehouse/test_db.db/crud_table/base_0000003/_metadata_acid
/user/hive/warehouse/test_db.db/crud_table/base_0000003/_orc_acid_version
 /user/hive/warehouse/test_db.db/crud_table/base_0000003/bucket_00000

The last one is the data file and it includes the following information:

{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,"row":{"id":1,"value":"A"}}
{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":1,"row":{"id":2,"value":"B"}}

As you can see, record 3 was deleted from data file.

Summary

Hopefully you now have a good understanding about how transaction tables work in Hive with Orc file format. Even Hive supports transaction tables, you should not use it as a transactional database for OLTP purpose as it can support millions of rows updated per a transaction, but it cannot support millions of transactions an hour. 

References

For more advanced information about how Hive read those delta and base folders, refer to these articles:

Understanding Hive ACID Transactional Table | Ji ZHANG's Blog

Evolving Draft for ORC Specification v2

More from Kontext
comment Comments
U ugrite hita

ugrite access_time 2 years ago link more_vert

Thank you very much for your article, it is very useful. As you mentioned in the conclusion that it should not use as the OLTP. Could you provide additional suggestions for the below questions?

  1. What is the situation that we should consider to use this feature?  Please give me an example.

  2. As it executes as Map-Reduce, So if we want the better performance. Can we use Spark (Ex. PySpark) with the Delta library?

BRs,

Ugrite Hita

Raymond Raymond

Raymond access_time 2 years ago link more_vert

There are can be many different points. I will just provide some typical ones here:

  1. If your use case has no low latency requirement, you can use it. For example, ingest events stored in Kafka into a persistent table in Hive. 

  2. Spark Delta uses different engine and usually can provide better performance but it really depends on your exact use cases. Delta Lake has the advantage of not relying on a metastore hence it is very easy to migrate from one platform to another.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts