Schema Merging (Evolution) with Parquet in Spark and Hive
Schema evolution is supported by many frameworks or data serialization systems such as Avro, Orc, Protocol Buffer and Parquet. With schema evolution, one set of data can be stored in multiple files with different but compatible schema. In Spark, Parquet data source can detect and merge schema of those files automatically. Without automatic schema merging, the typical way of handling schema evolution is through historical data reload that requires much work.
In this article, I am going to demo how to use Spark to support schema merging scenarios such as adding or deleting columns. I will also touch a little bit about Hive metastore schema and Parquet schema.
The scenario
The following sections are based on this scenario.
A dataframe df1 is created with the following attributes:
Schema version 0
1) id bigint2) attr0 string
df1 is saved as parquet format in data/partition-date=2020-01-01.
A new dataframe df2 is created with the following attributes:
Schema version 1
1) id bigint
2) attr0 string
3) attr1 string
Compared with schema version 0, one new attribute attr1 is added. df2 is saved as parquet format in data/partition-date=2020-01-02.
Similarly, a new dataframe df3 is created with attr0 removed:
Schema version 2
1) id bigint
2) attr1 string
The data is saved as parquet format in data/partition-date=2020-01-03.
The Spark application will need to read data from these three folders with schema merging.
The solution
First, let's create these three dataframes and save them into the corresponded locations using the following code:
from pyspark import SparkConf from pyspark.sql import SparkSession appName = "Python Example - Parquet Schema Merge" master = 'local' # Create Spark session conf = SparkConf().setMaster(master) spark = SparkSession.builder.config(conf=conf) \ .enableHiveSupport() \ .getOrCreate() data1 = [{"id": 1, "attr0": "Attr 0"}, {"id": 2, "attr0": "Attr 0"}] df1 = spark.createDataFrame(data1) data2 = [{"id": 1, "attr0": "Attr 0", "attr1": "Attr 1"}, {"id": 2, "attr0": "Attr 0", "attr1": "Attr 1"}] df2 = spark.createDataFrame(data2) data3= [{"id": 1, "attr1": "Attr 1"}, {"id": 2, "attr1": "Attr 1"}] df3 = spark.createDataFrame(data3) df1.write.mode('overwrite').parquet('data/partition-date=2020-01-01') df2.write.mode('overwrite').parquet('data/partition-date=2020-01-02') df3.write.mode('overwrite').parquet('data/partition-date=2020-01-03')
INFO parquet.ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema: { "type" : "struct", "fields" : [ { "name" : "attr0", "type" : "string", "nullable" : true, "metadata" : { } }, { "name" : "id", "type" : "long", "nullable" : true, "metadata" : { } } ] } and corresponding Parquet message type: message spark_schema { optional binary attr0 (UTF8); optional int64 id; }
Run HDFS command and we can see the following directories are created in HDFS.
$ hadoop fs -ls data Found 3 items drwxr-xr-x - tangr supergroup 0 2020-02-02 18:24 data/partition-date=2020-01-01 drwxr-xr-x - tangr supergroup 0 2020-02-02 18:24 data/partition-date=2020-01-02 drwxr-xr-x - tangr supergroup 0 2020-02-02 18:24 data/partition-date=2020-01-03
Read parquet with schema merge
Now let's read the schema using the following code:
df = spark.read.option("mergeSchema","true").parquet("data") df.show()
The output looks like the following:
+------+---+------+--------------+ | attr0| id| attr1|partition-date| +------+---+------+--------------+ |Attr 0| 1|Attr 1| 2020-01-02| |Attr 0| 2|Attr 1| 2020-01-02| | null| 1|Attr 1| 2020-01-03| | null| 2|Attr 1| 2020-01-03| |Attr 0| 1| null| 2020-01-01| |Attr 0| 2| null| 2020-01-01| +------+---+------+--------------+
In the result, the values will be null if the column doesn't exist in the partition.
Without schema merging
If we don't specify mergeSchema option, the new attributes will not be picked up.
df = spark.read.parquet("data") df.show()
The result looks like the following:
+------+---+--------------+ | attr0| id|partition-date| +------+---+--------------+ |Attr 0| 1| 2020-01-02| |Attr 0| 2| 2020-01-02| | null| 1| 2020-01-03| | null| 2| 2020-01-03| |Attr 0| 1| 2020-01-01| |Attr 0| 2| 2020-01-01| +------+---+--------------+
Without schema merge, the schema will be decided randomly based on on of the partition files.
Use Spark SQL
Alternatively, we can also use Spark SQL option to enable schema merge.
spark.conf.set("spark.sql.parquet.mergeSchema", "true") df = spark.read.parquet("data") df.show()
+------+---+------+--------------+ | attr0| id| attr1|partition-date| +------+---+------+--------------+ |Attr 0| 1|Attr 1| 2020-01-02| |Attr 0| 2|Attr 1| 2020-01-02| | null| 1|Attr 1| 2020-01-03| | null| 2|Attr 1| 2020-01-03| |Attr 0| 1| null| 2020-01-01| |Attr 0| 2| null| 2020-01-01| +------+---+------+--------------+
The result is same as using mergeSchema option. The advantage of using this option is that it is effective in the whole Spark session instead of specifying it in all read functions.
Hive metastore Parquet table conversion
Spark SQL will try to use its own Parquet support instead of Hive SerDe for better performance when interacting with Hive metastore Parquet tables. It is controlled by spark.sql.hive.convertMetastoreParquet Spark configuration. By default it is turned on.
When reading from Hive Parquet table to Spark SQL Parquet table, schema reconciliation happens due the follow differences (referred from official documentation):
- Hive is case insensitive, while Parquet is not
- Hive considers all columns nullable, while nullability in Parquet is significant
Create Hive table
Let's create a Hive table using the following command:
hive> use test_db; OK Time taken: 0.029 seconds hive> create external table `parquet_merge` (id bigint, attr0 string) partitioned by (`partition-date` string) stored as parquet location 'data'; OK Time taken: 0.144 seconds hive> MSCK REPAIR TABLE `parquet_merge`; OK Partitions not in metastore: parquet_merge:partition-date=2020-01-01 parquet_merge:partition-date=2020-01-02 parquet_merge:partition-date=2020-01-03 Repair: Added partition to metastore parquet_merge:partition-date=2020-01-01 Repair: Added partition to metastore parquet_merge:partition-date=2020-01-03 Repair: Added partition to metastore parquet_merge:partition-date=2020-01-02 Time taken: 0.387 seconds, Fetched: 4 row(s) hive> select * from `parquet_merge`; OK 1 Attr 0 2020-01-01 2 Attr 0 2020-01-01 1 Attr 0 2020-01-02 2 Attr 0 2020-01-02 1 NULL 2020-01-03 2 NULL 2020-01-03 Time taken: 0.303 seconds, Fetched: 6 row(s) hive>
The above command create the Hive external table in test_db database.
Read from Hive table in Spark
Use the following code to read from Hive table directly:
df = spark.sql("select * from test_db.parquet_merge") df.show()
The output looks like the following:
2020-02-02 19:08:16,126 INFO datasources.FileScanRDD: Reading File path: hdfs://localhost:19000/user/tangr/data/partition-date=2020-01-02/part-00000-14963203-5d95-44be-811f-061efecd2711-c000.snappy.parquet, range: 0-946, partition values: [2020-01-02] 2020-02-02 19:08:16,147 INFO datasources.FileScanRDD: Reading File path: hdfs://localhost:19000/user/tangr/data/partition-date=2020-01-01/part-00000-3d7d5e79-4ce2-48d7-b1e5-f9ff28d89ee5-c000.snappy.parquet, range: 0-692, partition values: [2020-01-01] 2020-02-02 19:08:16,165 INFO datasources.FileScanRDD: Reading File path: hdfs://localhost:19000/user/tangr/data/partition-date=2020-01-03/part-00000-84b2f787-61ed-4e4b-9bb2-67af5f7c5933-c000.snappy.parquet, range: 0-692, partition values: [2020-01-03] ...... +---+------+--------------+ | id| attr0|partition-date| +---+------+--------------+ | 1|Attr 0| 2020-01-02| | 2|Attr 0| 2020-01-02| | 1|Attr 0| 2020-01-01| | 2|Attr 0| 2020-01-01| | 1| null| 2020-01-03| | 2| null| 2020-01-03| +---+------+--------------+
So from the above result, we can see Hive metastore won't be automatically refreshed through Spark can automatically reconcile schema based on Hive table definition. You'll need to manually refresh Hive table schema if required.