By using this site, you acknowledge that you have read and understand our Cookie policy, Privacy policy and Terms .
close

Apache Spark installation guides, performance tuning tips, general tutorials, etc.

rss_feed Subscribe RSS

Schema evolution is supported by many frameworks or data serialization systems such as Avro, Orc, Protocol Buffer and Parquet. With schema evolution, one set of data can be stored in multiple files with different but compatible schema. In Spark, Parquet data source can detect and merge schema of those files automatically. Without automatic schema merging, the typical way of handling schema evolution is through historical data reload that requires much work. 

In this article, I am going to demo how to use Spark to support schema merging scenarios such as adding or deleting columns. I will also touch a little bit about Hive metastore schema and Parquet schema. 


infoSchema merge is turned off by default starting from Spark 1.5.0 as it is a relatively expensive operation. To enable it, we can set mergeSchema option to true or set global SQL option spark.sql.parquet.mergeSchema to true.

The scenario

The following sections are based on this scenario.

A dataframe df1 is created with the following attributes:

Schema version 0

1) id bigint

2) attr0 string

df1 is saved as parquet format in data/partition-date=2020-01-01.

A new dataframe df2 is created with the following attributes:

Schema version 1

1) id bigint

2) attr0 string

3) attr1 string

Compared with schema version 0, one new attribute attr1 is added. df2 is saved as parquet format in data/partition-date=2020-01-02.

Similarly, a new dataframe df3 is created with attr0 removed:

Schema version 2

1) id bigint

2) attr1 string

The data is saved as parquet format in data/partition-date=2020-01-03.

The Spark application will need to read data from these three folders with schema merging.

The solution

First, let's create these three dataframes and save them into the corresponded locations using the following code:

from pyspark import SparkConf
from pyspark.sql import SparkSession

appName = "Python Example - Parquet Schema Merge"
master = 'local'

# Create Spark session
conf = SparkConf().setMaster(master)
spark = SparkSession.builder.config(conf=conf) \
    .enableHiveSupport() \
    .getOrCreate()

data1 = [{"id": 1, "attr0": "Attr 0"}, 
{"id": 2, "attr0": "Attr 0"}]
df1 = spark.createDataFrame(data1)

data2 = [{"id": 1, "attr0": "Attr 0", "attr1": "Attr 1"}, 
{"id": 2, "attr0": "Attr 0", "attr1": "Attr 1"}]
df2 = spark.createDataFrame(data2)

data3= [{"id": 1, "attr1": "Attr 1"}, 
{"id": 2, "attr1": "Attr 1"}]
df3 = spark.createDataFrame(data3)

df1.write.mode('overwrite').parquet('data/partition-date=2020-01-01')
df2.write.mode('overwrite').parquet('data/partition-date=2020-01-02')
df3.write.mode('overwrite').parquet('data/partition-date=2020-01-03')
The above code snippet simply create three dataframes from Python dictionary list. The schema for the data frame will be inferred automatically though the recommended approach is to specify the schema manually. 

Run this application and the logs will print out schema information like the following:
INFO parquet.ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
{
  "type" : "struct",
  "fields" : [ {
    "name" : "attr0",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "id",
    "type" : "long",
    "nullable" : true,
    "metadata" : { }
  } ]
}
and corresponding Parquet message type:
message spark_schema {
  optional binary attr0 (UTF8);
  optional int64 id;
}

Run HDFS command and we can see the following directories are created in HDFS.

$ hadoop fs -ls data
Found 3 items
drwxr-xr-x   - tangr supergroup          0 2020-02-02 18:24 data/partition-date=2020-01-01
drwxr-xr-x   - tangr supergroup          0 2020-02-02 18:24 data/partition-date=2020-01-02
drwxr-xr-x   - tangr supergroup          0 2020-02-02 18:24 data/partition-date=2020-01-03

Read parquet with schema merge

Now let's read the schema using the following code:

df = spark.read.option("mergeSchema","true").parquet("data")
df.show()

The output looks like the following:

+------+---+------+--------------+
| attr0| id| attr1|partition-date|
+------+---+------+--------------+
|Attr 0|  1|Attr 1|    2020-01-02|
|Attr 0|  2|Attr 1|    2020-01-02|
|  null|  1|Attr 1|    2020-01-03|
|  null|  2|Attr 1|    2020-01-03|
|Attr 0|  1|  null|    2020-01-01|
|Attr 0|  2|  null|    2020-01-01|
+------+---+------+--------------+

In the result, the values will be null if the column doesn't exist in the partition. 

infoColumn partition-date is added as it is a partition column and partition discovery feature of Spark can automatically add the partition columns in the file paths. For more information about partition, refer to Data Partitioning in Spark (PySpark) In-depth Walkthrough.

Without schema merging

If we don't specify mergeSchema option, the new attributes will not be picked up.

df = spark.read.parquet("data")
df.show()

The result looks like the following:

+------+---+--------------+
| attr0| id|partition-date|
+------+---+--------------+
|Attr 0|  1|    2020-01-02|
|Attr 0|  2|    2020-01-02|
|  null|  1|    2020-01-03|
|  null|  2|    2020-01-03|
|Attr 0|  1|    2020-01-01|
|Attr 0|  2|    2020-01-01|
+------+---+--------------+

Without schema merge, the schema will be decided randomly based on on of the partition files. 

Use Spark SQL

Alternatively, we can also use Spark SQL option to enable schema merge.

spark.conf.set("spark.sql.parquet.mergeSchema", "true")
df = spark.read.parquet("data")
df.show()
The output looks like the following:
+------+---+------+--------------+
| attr0| id| attr1|partition-date|
+------+---+------+--------------+
|Attr 0|  1|Attr 1|    2020-01-02|
|Attr 0|  2|Attr 1|    2020-01-02|
|  null|  1|Attr 1|    2020-01-03|
|  null|  2|Attr 1|    2020-01-03|
|Attr 0|  1|  null|    2020-01-01|
|Attr 0|  2|  null|    2020-01-01|
+------+---+------+--------------+

The result is same as using mergeSchema option. The advantage of using this option is that it is effective in the whole Spark session instead of specifying it in all read functions. 

Hive metastore Parquet table conversion

Spark SQL will try to use its own Parquet support instead of Hive SerDe for better performance when interacting with Hive metastore Parquet tables. It is controlled by spark.sql.hive.convertMetastoreParquet Spark configuration. By default it is turned on.

When reading from Hive Parquet table to Spark SQL Parquet table, schema reconciliation happens due the follow differences (referred from official documentation):

  1. Hive is case insensitive, while Parquet is not
  2. Hive considers all columns nullable, while nullability in Parquet is significant

Create Hive table

Let's create a Hive table using the following command:

hive> use test_db;
OK
Time taken: 0.029 seconds
hive> create external table `parquet_merge` (id bigint, attr0 string) partitioned by (`partition-date` string) stored as parquet location 'data';
OK
Time taken: 0.144 seconds
hive> MSCK REPAIR TABLE `parquet_merge`;
OK
Partitions not in metastore:    parquet_merge:partition-date=2020-01-01 parquet_merge:partition-date=2020-01-02 parquet_merge:partition-date=2020-01-03
Repair: Added partition to metastore parquet_merge:partition-date=2020-01-01
Repair: Added partition to metastore parquet_merge:partition-date=2020-01-03
Repair: Added partition to metastore parquet_merge:partition-date=2020-01-02
Time taken: 0.387 seconds, Fetched: 4 row(s)
hive> select * from `parquet_merge`;
OK
1       Attr 0  2020-01-01
2       Attr 0  2020-01-01
1       Attr 0  2020-01-02
2       Attr 0  2020-01-02
1       NULL    2020-01-03
2       NULL    2020-01-03
Time taken: 0.303 seconds, Fetched: 6 row(s)
hive> 

The above command create the Hive external table in test_db database. 

info'MSCK REPAIR TABLE tablename' SQL statement is used to recover partitions and data associated with partitions. 

Read from Hive table in Spark

Use the following code to read from Hive table directly:

df = spark.sql("select * from test_db.parquet_merge")
df.show()

The output looks like the following:

2020-02-02 19:08:16,126 INFO datasources.FileScanRDD: Reading File path: hdfs://localhost:19000/user/tangr/data/partition-date=2020-01-02/part-00000-14963203-5d95-44be-811f-061efecd2711-c000.snappy.parquet, range: 0-946, partition values: [2020-01-02]
2020-02-02 19:08:16,147 INFO datasources.FileScanRDD: Reading File path: hdfs://localhost:19000/user/tangr/data/partition-date=2020-01-01/part-00000-3d7d5e79-4ce2-48d7-b1e5-f9ff28d89ee5-c000.snappy.parquet, range: 0-692, partition values: [2020-01-01]
2020-02-02 19:08:16,165 INFO datasources.FileScanRDD: Reading File path: hdfs://localhost:19000/user/tangr/data/partition-date=2020-01-03/part-00000-84b2f787-61ed-4e4b-9bb2-67af5f7c5933-c000.snappy.parquet, range: 0-692, partition values: [2020-01-03]
......
+---+------+--------------+
| id| attr0|partition-date|
+---+------+--------------+
|  1|Attr 0|    2020-01-02|
|  2|Attr 0|    2020-01-02|
|  1|Attr 0|    2020-01-01|
|  2|Attr 0|    2020-01-01|
|  1|  null|    2020-01-03|
|  2|  null|    2020-01-03|
+---+------+--------------+

So from the above result, we can see Hive metastore won't be automatically refreshed through Spark can automatically reconcile schema based on Hive table definition. You'll need to manually refresh Hive table schema if required. 

info Last modified by Raymond at 15 days ago

info About author

info License/Terms

More from Kontext

Spark Read from SQL Server Source using Windows/Kerberos Authentication

local_offer pyspark local_offer SQL Server local_offer spark-2-x

visibility 26
thumb_up 0
access_time 14 days ago

In this article, I am going to show you how to use JDBC Kerberos authentication to connect to SQL Server sources in Spark (PySpark). I will use  Kerberos connection with principal names and password directly that requires  ...

open_in_new View

local_offer windows10 local_offer hadoop local_offer hdfs

visibility 52
thumb_up 0
access_time 23 days ago

Issue When installing Hadoop 3.2.1 on Windows 10,  you may encounter the following error when trying to format HDFS  namnode: ERROR namenode.NameNode: Failed to start namenode. The error happens when running the following comm...

open_in_new View

local_offer pyspark local_offer spark-2-x local_offer python

visibility 83
thumb_up 0
access_time 2 months ago

This articles show you how to convert a Python dictionary list to a Spark DataFrame. The code snippets runs on Spark 2.x environments. Input The input data (dictionary list looks like the following): data = [{"Category": 'Category A', 'ItemID': 1, 'Amount': 12.40}, ...

open_in_new View

Improve PySpark Performance using Pandas UDF with Apache Arrow

local_offer pyspark local_offer spark local_offer spark-2-x local_offer pandas

visibility 162
thumb_up 4
access_time 2 months ago

Apache Arrow is an in-memory columnar data format that can be used in Spark to efficiently transfer data between JVM and Python processes. This currently is most beneficial to Python users that work with Pandas/NumPy data. In this article, ...

open_in_new View

info Follow us on Twitter to get the latest article updates. Follow us