Schema Merging (Evolution) with Parquet in Spark and Hive

Raymond Raymond event 2020-02-02 visibility 26,609
more_vert
Schema Merging (Evolution) with Parquet in Spark and Hive

Schema evolution is supported by many frameworks or data serialization systems such as Avro, Orc, Protocol Buffer and Parquet. With schema evolution, one set of data can be stored in multiple files with different but compatible schema. In Spark, Parquet data source can detect and merge schema of those files automatically. Without automatic schema merging, the typical way of handling schema evolution is through historical data reload that requires much work. 

In this article, I am going to demo how to use Spark to support schema merging scenarios such as adding or deleting columns. I will also touch a little bit about Hive metastore schema and Parquet schema. 

infoSchema merge is turned off by default starting from Spark 1.5.0 as it is a relatively expensive operation. To enable it, we can set mergeSchema option to true or set global SQL option spark.sql.parquet.mergeSchema to true.

The scenario

The following sections are based on this scenario.

A dataframe df1 is created with the following attributes:

Schema version 0

1) id bigint

2) attr0 string

df1 is saved as parquet format in data/partition-date=2020-01-01.

A new dataframe df2 is created with the following attributes:

Schema version 1

1) id bigint

2) attr0 string

3) attr1 string

Compared with schema version 0, one new attribute attr1 is added. df2 is saved as parquet format in data/partition-date=2020-01-02.

Similarly, a new dataframe df3 is created with attr0 removed:

Schema version 2

1) id bigint

2) attr1 string

The data is saved as parquet format in data/partition-date=2020-01-03.

The Spark application will need to read data from these three folders with schema merging.

The solution

First, let's create these three dataframes and save them into the corresponded locations using the following code:

from pyspark import SparkConf
from pyspark.sql import SparkSession

appName = "Python Example - Parquet Schema Merge"
master = 'local'

# Create Spark session
conf = SparkConf().setMaster(master)
spark = SparkSession.builder.config(conf=conf) \
    .enableHiveSupport() \
    .getOrCreate()

data1 = [{"id": 1, "attr0": "Attr 0"}, 
{"id": 2, "attr0": "Attr 0"}]
df1 = spark.createDataFrame(data1)

data2 = [{"id": 1, "attr0": "Attr 0", "attr1": "Attr 1"}, 
{"id": 2, "attr0": "Attr 0", "attr1": "Attr 1"}]
df2 = spark.createDataFrame(data2)

data3= [{"id": 1, "attr1": "Attr 1"}, 
{"id": 2, "attr1": "Attr 1"}]
df3 = spark.createDataFrame(data3)

df1.write.mode('overwrite').parquet('data/partition-date=2020-01-01')
df2.write.mode('overwrite').parquet('data/partition-date=2020-01-02')
df3.write.mode('overwrite').parquet('data/partition-date=2020-01-03')
The above code snippet simply create three dataframes from Python dictionary list. The schema for the data frame will be inferred automatically though the recommended approach is to specify the schema manually. 

Run this application and the logs will print out schema information like the following:
INFO parquet.ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
{
  "type" : "struct",
  "fields" : [ {
    "name" : "attr0",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "id",
    "type" : "long",
    "nullable" : true,
    "metadata" : { }
  } ]
}
and corresponding Parquet message type:
message spark_schema {
  optional binary attr0 (UTF8);
  optional int64 id;
}

Run HDFS command and we can see the following directories are created in HDFS.

$ hadoop fs -ls data
Found 3 items
drwxr-xr-x   - tangr supergroup          0 2020-02-02 18:24 data/partition-date=2020-01-01
drwxr-xr-x   - tangr supergroup          0 2020-02-02 18:24 data/partition-date=2020-01-02
drwxr-xr-x   - tangr supergroup          0 2020-02-02 18:24 data/partition-date=2020-01-03

Read parquet with schema merge

Now let's read the schema using the following code:

df = spark.read.option("mergeSchema","true").parquet("data")
df.show()

The output looks like the following:

+------+---+------+--------------+
| attr0| id| attr1|partition-date|
+------+---+------+--------------+
|Attr 0|  1|Attr 1|    2020-01-02|
|Attr 0|  2|Attr 1|    2020-01-02|
|  null|  1|Attr 1|    2020-01-03|
|  null|  2|Attr 1|    2020-01-03|
|Attr 0|  1|  null|    2020-01-01|
|Attr 0|  2|  null|    2020-01-01|
+------+---+------+--------------+

In the result, the values will be null if the column doesn't exist in the partition. 

infoColumn partition-date is added as it is a partition column and partition discovery feature of Spark can automatically add the partition columns in the file paths. For more information about partition, refer to Data Partitioning in Spark (PySpark) In-depth Walkthrough.

Without schema merging

If we don't specify mergeSchema option, the new attributes will not be picked up.

df = spark.read.parquet("data")
df.show()

The result looks like the following:

+------+---+--------------+
| attr0| id|partition-date|
+------+---+--------------+
|Attr 0|  1|    2020-01-02|
|Attr 0|  2|    2020-01-02|
|  null|  1|    2020-01-03|
|  null|  2|    2020-01-03|
|Attr 0|  1|    2020-01-01|
|Attr 0|  2|    2020-01-01|
+------+---+--------------+

Without schema merge, the schema will be decided randomly based on on of the partition files. 

Use Spark SQL

Alternatively, we can also use Spark SQL option to enable schema merge.

spark.conf.set("spark.sql.parquet.mergeSchema", "true")
df = spark.read.parquet("data")
df.show()
The output looks like the following:
+------+---+------+--------------+
| attr0| id| attr1|partition-date|
+------+---+------+--------------+
|Attr 0|  1|Attr 1|    2020-01-02|
|Attr 0|  2|Attr 1|    2020-01-02|
|  null|  1|Attr 1|    2020-01-03|
|  null|  2|Attr 1|    2020-01-03|
|Attr 0|  1|  null|    2020-01-01|
|Attr 0|  2|  null|    2020-01-01|
+------+---+------+--------------+

The result is same as using mergeSchema option. The advantage of using this option is that it is effective in the whole Spark session instead of specifying it in all read functions. 

Hive metastore Parquet table conversion

Spark SQL will try to use its own Parquet support instead of Hive SerDe for better performance when interacting with Hive metastore Parquet tables. It is controlled by spark.sql.hive.convertMetastoreParquet Spark configuration. By default it is turned on.

When reading from Hive Parquet table to Spark SQL Parquet table, schema reconciliation happens due the follow differences (referred from official documentation):

  1. Hive is case insensitive, while Parquet is not
  2. Hive considers all columns nullable, while nullability in Parquet is significant

Create Hive table

Let's create a Hive table using the following command:

hive> use test_db;
OK
Time taken: 0.029 seconds
hive> create external table `parquet_merge` (id bigint, attr0 string) partitioned by (`partition-date` string) stored as parquet location 'data';
OK
Time taken: 0.144 seconds
hive> MSCK REPAIR TABLE `parquet_merge`;
OK
Partitions not in metastore:    parquet_merge:partition-date=2020-01-01 parquet_merge:partition-date=2020-01-02 parquet_merge:partition-date=2020-01-03
Repair: Added partition to metastore parquet_merge:partition-date=2020-01-01
Repair: Added partition to metastore parquet_merge:partition-date=2020-01-03
Repair: Added partition to metastore parquet_merge:partition-date=2020-01-02
Time taken: 0.387 seconds, Fetched: 4 row(s)
hive> select * from `parquet_merge`;
OK
1       Attr 0  2020-01-01
2       Attr 0  2020-01-01
1       Attr 0  2020-01-02
2       Attr 0  2020-01-02
1       NULL    2020-01-03
2       NULL    2020-01-03
Time taken: 0.303 seconds, Fetched: 6 row(s)
hive> 

The above command create the Hive external table in test_db database. 

info'MSCK REPAIR TABLE tablename' SQL statement is used to recover partitions and data associated with partitions. 

Read from Hive table in Spark

Use the following code to read from Hive table directly:

df = spark.sql("select * from test_db.parquet_merge")
df.show()

The output looks like the following:

2020-02-02 19:08:16,126 INFO datasources.FileScanRDD: Reading File path: hdfs://localhost:19000/user/tangr/data/partition-date=2020-01-02/part-00000-14963203-5d95-44be-811f-061efecd2711-c000.snappy.parquet, range: 0-946, partition values: [2020-01-02]
2020-02-02 19:08:16,147 INFO datasources.FileScanRDD: Reading File path: hdfs://localhost:19000/user/tangr/data/partition-date=2020-01-01/part-00000-3d7d5e79-4ce2-48d7-b1e5-f9ff28d89ee5-c000.snappy.parquet, range: 0-692, partition values: [2020-01-01]
2020-02-02 19:08:16,165 INFO datasources.FileScanRDD: Reading File path: hdfs://localhost:19000/user/tangr/data/partition-date=2020-01-03/part-00000-84b2f787-61ed-4e4b-9bb2-67af5f7c5933-c000.snappy.parquet, range: 0-692, partition values: [2020-01-03]
......
+---+------+--------------+
| id| attr0|partition-date|
+---+------+--------------+
|  1|Attr 0|    2020-01-02|
|  2|Attr 0|    2020-01-02|
|  1|Attr 0|    2020-01-01|
|  2|Attr 0|    2020-01-01|
|  1|  null|    2020-01-03|
|  2|  null|    2020-01-03|
+---+------+--------------+

So from the above result, we can see Hive metastore won't be automatically refreshed through Spark can automatically reconcile schema based on Hive table definition. You'll need to manually refresh Hive table schema if required. 

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts