Schema Merging (Evolution) with Parquet in Spark and Hive

access_time 10 months ago visibility6227 comment 0

Schema evolution is supported by many frameworks or data serialization systems such as Avro, Orc, Protocol Buffer and Parquet. With schema evolution, one set of data can be stored in multiple files with different but compatible schema. In Spark, Parquet data source can detect and merge schema of those files automatically. Without automatic schema merging, the typical way of handling schema evolution is through historical data reload that requires much work. 

In this article, I am going to demo how to use Spark to support schema merging scenarios such as adding or deleting columns. I will also touch a little bit about Hive metastore schema and Parquet schema. 

infoSchema merge is turned off by default starting from Spark 1.5.0 as it is a relatively expensive operation. To enable it, we can set mergeSchema option to true or set global SQL option spark.sql.parquet.mergeSchema to true.

The scenario

The following sections are based on this scenario.

A dataframe df1 is created with the following attributes:

Schema version 0

1) id bigint

2) attr0 string

df1 is saved as parquet format in data/partition-date=2020-01-01.

A new dataframe df2 is created with the following attributes:

Schema version 1

1) id bigint

2) attr0 string

3) attr1 string

Compared with schema version 0, one new attribute attr1 is added. df2 is saved as parquet format in data/partition-date=2020-01-02.

Similarly, a new dataframe df3 is created with attr0 removed:

Schema version 2

1) id bigint

2) attr1 string

The data is saved as parquet format in data/partition-date=2020-01-03.

The Spark application will need to read data from these three folders with schema merging.

The solution

First, let's create these three dataframes and save them into the corresponded locations using the following code:

from pyspark import SparkConf
from pyspark.sql import SparkSession

appName = "Python Example - Parquet Schema Merge"
master = 'local'

# Create Spark session
conf = SparkConf().setMaster(master)
spark = SparkSession.builder.config(conf=conf) \
    .enableHiveSupport() \

data1 = [{"id": 1, "attr0": "Attr 0"}, 
{"id": 2, "attr0": "Attr 0"}]
df1 = spark.createDataFrame(data1)

data2 = [{"id": 1, "attr0": "Attr 0", "attr1": "Attr 1"}, 
{"id": 2, "attr0": "Attr 0", "attr1": "Attr 1"}]
df2 = spark.createDataFrame(data2)

data3= [{"id": 1, "attr1": "Attr 1"}, 
{"id": 2, "attr1": "Attr 1"}]
df3 = spark.createDataFrame(data3)

The above code snippet simply create three dataframes from Python dictionary list. The schema for the data frame will be inferred automatically though the recommended approach is to specify the schema manually. 

Run this application and the logs will print out schema information like the following:
INFO parquet.ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
  "type" : "struct",
  "fields" : [ {
    "name" : "attr0",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "id",
    "type" : "long",
    "nullable" : true,
    "metadata" : { }
  } ]
and corresponding Parquet message type:
message spark_schema {
  optional binary attr0 (UTF8);
  optional int64 id;

Run HDFS command and we can see the following directories are created in HDFS.

$ hadoop fs -ls data
Found 3 items
drwxr-xr-x   - tangr supergroup          0 2020-02-02 18:24 data/partition-date=2020-01-01
drwxr-xr-x   - tangr supergroup          0 2020-02-02 18:24 data/partition-date=2020-01-02
drwxr-xr-x   - tangr supergroup          0 2020-02-02 18:24 data/partition-date=2020-01-03

Read parquet with schema merge

Now let's read the schema using the following code:

df ="mergeSchema","true").parquet("data")

The output looks like the following:

| attr0| id| attr1|partition-date|
|Attr 0|  1|Attr 1|    2020-01-02|
|Attr 0|  2|Attr 1|    2020-01-02|
|  null|  1|Attr 1|    2020-01-03|
|  null|  2|Attr 1|    2020-01-03|
|Attr 0|  1|  null|    2020-01-01|
|Attr 0|  2|  null|    2020-01-01|

In the result, the values will be null if the column doesn't exist in the partition. 

infoColumn partition-date is added as it is a partition column and partition discovery feature of Spark can automatically add the partition columns in the file paths. For more information about partition, refer to Data Partitioning in Spark (PySpark) In-depth Walkthrough.

Without schema merging

If we don't specify mergeSchema option, the new attributes will not be picked up.

df ="data")

The result looks like the following:

| attr0| id|partition-date|
|Attr 0|  1|    2020-01-02|
|Attr 0|  2|    2020-01-02|
|  null|  1|    2020-01-03|
|  null|  2|    2020-01-03|
|Attr 0|  1|    2020-01-01|
|Attr 0|  2|    2020-01-01|

Without schema merge, the schema will be decided randomly based on on of the partition files. 

Use Spark SQL

Alternatively, we can also use Spark SQL option to enable schema merge.

spark.conf.set("spark.sql.parquet.mergeSchema", "true")
df ="data")
The output looks like the following:
| attr0| id| attr1|partition-date|
|Attr 0|  1|Attr 1|    2020-01-02|
|Attr 0|  2|Attr 1|    2020-01-02|
|  null|  1|Attr 1|    2020-01-03|
|  null|  2|Attr 1|    2020-01-03|
|Attr 0|  1|  null|    2020-01-01|
|Attr 0|  2|  null|    2020-01-01|

The result is same as using mergeSchema option. The advantage of using this option is that it is effective in the whole Spark session instead of specifying it in all read functions. 

Hive metastore Parquet table conversion

Spark SQL will try to use its own Parquet support instead of Hive SerDe for better performance when interacting with Hive metastore Parquet tables. It is controlled by spark.sql.hive.convertMetastoreParquet Spark configuration. By default it is turned on.

When reading from Hive Parquet table to Spark SQL Parquet table, schema reconciliation happens due the follow differences (referred from official documentation):

  1. Hive is case insensitive, while Parquet is not
  2. Hive considers all columns nullable, while nullability in Parquet is significant

Create Hive table

Let's create a Hive table using the following command:

hive> use test_db;
Time taken: 0.029 seconds
hive> create external table `parquet_merge` (id bigint, attr0 string) partitioned by (`partition-date` string) stored as parquet location 'data';
Time taken: 0.144 seconds
hive> MSCK REPAIR TABLE `parquet_merge`;
Partitions not in metastore:    parquet_merge:partition-date=2020-01-01 parquet_merge:partition-date=2020-01-02 parquet_merge:partition-date=2020-01-03
Repair: Added partition to metastore parquet_merge:partition-date=2020-01-01
Repair: Added partition to metastore parquet_merge:partition-date=2020-01-03
Repair: Added partition to metastore parquet_merge:partition-date=2020-01-02
Time taken: 0.387 seconds, Fetched: 4 row(s)
hive> select * from `parquet_merge`;
1       Attr 0  2020-01-01
2       Attr 0  2020-01-01
1       Attr 0  2020-01-02
2       Attr 0  2020-01-02
1       NULL    2020-01-03
2       NULL    2020-01-03
Time taken: 0.303 seconds, Fetched: 6 row(s)

The above command create the Hive external table in test_db database. 

info'MSCK REPAIR TABLE tablename' SQL statement is used to recover partitions and data associated with partitions. 

Read from Hive table in Spark

Use the following code to read from Hive table directly:

df = spark.sql("select * from test_db.parquet_merge")

The output looks like the following:

2020-02-02 19:08:16,126 INFO datasources.FileScanRDD: Reading File path: hdfs://localhost:19000/user/tangr/data/partition-date=2020-01-02/part-00000-14963203-5d95-44be-811f-061efecd2711-c000.snappy.parquet, range: 0-946, partition values: [2020-01-02]
2020-02-02 19:08:16,147 INFO datasources.FileScanRDD: Reading File path: hdfs://localhost:19000/user/tangr/data/partition-date=2020-01-01/part-00000-3d7d5e79-4ce2-48d7-b1e5-f9ff28d89ee5-c000.snappy.parquet, range: 0-692, partition values: [2020-01-01]
2020-02-02 19:08:16,165 INFO datasources.FileScanRDD: Reading File path: hdfs://localhost:19000/user/tangr/data/partition-date=2020-01-03/part-00000-84b2f787-61ed-4e4b-9bb2-67af5f7c5933-c000.snappy.parquet, range: 0-692, partition values: [2020-01-03]
| id| attr0|partition-date|
|  1|Attr 0|    2020-01-02|
|  2|Attr 0|    2020-01-02|
|  1|Attr 0|    2020-01-01|
|  2|Attr 0|    2020-01-01|
|  1|  null|    2020-01-03|
|  2|  null|    2020-01-03|

So from the above result, we can see Hive metastore won't be automatically refreshed through Spark can automatically reconcile schema based on Hive table definition. You'll need to manually refresh Hive table schema if required. 

info Last modified by Administrator at 4 months ago copyright This page is subject to Site terms.
Like this article?
Share on

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

Want to publish your article on Kontext?

Learn more

Kontext Column

Created for everyone to publish data, programming and cloud related articles.
Follow three steps to create your columns.

Learn more arrow_forward

More from Kontext

local_offer tutorial local_offer pyspark local_offer spark local_offer how-to local_offer spark-dataframe

visibility 2285
thumb_up 0
access_time 4 months ago

This article shows how to change column types of Spark DataFrame using Python. For example, convert StringType to DoubleType, StringType to Integer, StringType to DateType. Follow article  Convert Python Dictionary List to PySpark DataFrame to construct a dataframe.

Install Apache Spark 3.0.0 on Windows 10

local_offer spark local_offer pyspark local_offer windows10 local_offer big-data-on-windows-10

visibility 1248
thumb_up 1
access_time 4 months ago

Spark 3.0.0 was release on 18th June 2020 with many new features. The highlights of features include adaptive query execution, dynamic partition pruning, ANSI SQL compliance, significant improvements in pandas APIs, new UI for structured streaming, up to 40x speedups for calling R user-defined ...

Spark Read from SQL Server Source using Windows/Kerberos Authentication

local_offer pyspark local_offer SQL Server local_offer spark-2-x local_offer spark-database-connect

visibility 1227
thumb_up 0
access_time 10 months ago

In this article, I am going to show you how to use JDBC Kerberos authentication to connect to SQL Server sources in Spark (PySpark). I will use  Kerberos connection with principal names and password directly that requires  Microsoft JDBC Driver 6.2  or above. The sample code can run ...

About column

Apache Spark installation guides, performance tuning tips, general tutorials, etc.

*Spark logo is a registered trademark of Apache Spark.

rss_feed Subscribe RSS