Spark Schema Merge (Evolution) for Orc Files
Schema merge or evolution might be a common requirement when implementing data lakes on cloud or enterprise big data platforms. Orc and parquet are two of the commonly used data storage format that supports schema merge as schema information is stored together with the data. When reading from Orc files, Spark provides a configuration spark.sql.orc.mergeSchema
to control the behaviors of schema merge.
About spark.sql.orc.mergeSchema
This configuration by default is set to false
, which means Spark will randomly pick up an Orc file to decide the schema for your Spark DataFrame. When it is set as true, Spark will merge the schemas.
Schema merge example with Orc
Let's understand a little bit more with the following example.
First, let's create a folder with Orc files with different schema using the following script (orc-schema-data.py). The first file has two columns and the second has one additional columns.
#orc-schema-data.py from pyspark.sql import SparkSession appName = "PySpark Example - Orc Schema Merge" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .getOrCreate() # List data = [{ 'col1': 'Category A', 'col2': 100 }, { 'col1': 'Category B', 'col2': 200 }, { 'col1': 'Category C', 'col2': 300 }] data2 = [{ 'col1': 'Category D', 'col2': 100, 'col3': 'D - 1' }, { 'col1': 'Category E', 'col2': 200, 'col3': 'E - 1' }] # Save as Orc files spark.createDataFrame(data).write.format('orc').mode('overwrite').save( 'file:///home/kontext/pyspark-examples/data/orc-schema-merge') spark.createDataFrame(data2).write.format('orc').mode('append').save( 'file:///home/kontext/pyspark-examples/data/orc-schema-merge')
Run the PySpark script using the following command:
spark-submit ./orc-schema-data.py
The script generates a few files like the following screenshot shows:
Now let's read the data.
Read with schema merge off
Let's first read the data with schema merge off using the following script:
#orc-shcema-merge-off.py from pyspark.sql import SparkSession appName = "PySpark Example - Orc Schema Merge" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .config('spark.sql.orc.mergeSchema', 'false') \ .getOrCreate() # Read Orc files df = spark.read.format('orc').load( 'file:///home/kontext/pyspark-examples/data/orc-schema-merge') df.show()
Run the above script using the following command:
spark-submit ./orc-schema-merge-off.py
Depends on which file is picked for DataFrame schema, the output can be one of the following:
+----------+----+
| col1|col2|
+----------+----+
|Category D| 100|
|Category E| 200|
|Category A| 100|
|Category B| 200|
|Category C| 300|
+----------+----+
or
+----------+----+-----+ | col1|col2| col3| +----------+----+-----+ |Category D| 100|D - 1| |Category E| 200|E - 1| |Category A| 100| null| |Category B| 200| null| |Category C| 300| null| +----------+----+-----+
Read with schema merge on
Let's first read the data with schema merge on using the following script:
#orc-shcema-merge-on.py from pyspark.sql import SparkSession appName = "PySpark Example - Orc Schema Merge" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .config('spark.sql.orc.mergeSchema', 'true') \ .getOrCreate() # Read Orc files df = spark.read.format('orc').load( 'file:///home/kontext/pyspark-examples/data/orc-schema-merge') df.show()
Run the above script using the following command:
spark-submit ./orc-schema-merge-on.py
As schema merge is on, the output will consolidate all the columns in both schema versions:
+----------+----+-----+ | col1|col2| col3| +----------+----+-----+ |Category D| 100|D - 1| |Category E| 200|E - 1| |Category A| 100| null| |Category B| 200| null| |Category C| 300| null| +----------+----+-----+
References
Schema Merging (Evolution) with Parquet in Spark and Hive