Spark Schema Merge (Evolution) for Orc Files

Kontext Kontext event 2022-06-18 visibility 1,170
more_vert
Spark Schema Merge (Evolution) for Orc Files

Schema merge or evolution might be a common requirement when implementing data lakes on cloud or enterprise big data platforms. Orc and parquet are two of the commonly used data storage format that supports schema merge as schema information is stored together with the data. When reading from Orc files, Spark provides a configuration spark.sql.orc.mergeSchema to control the behaviors of schema merge. 

About spark.sql.orc.mergeSchema

This configuration by default is set to false, which means Spark will randomly pick up an Orc file to decide the schema for your Spark DataFrame.  When it is set as true, Spark will merge the schemas.

Schema merge example with Orc

Let's understand a little bit more with the following example.

First, let's create a folder with Orc files with different schema using the following script (orc-schema-data.py). The first file has two columns and the second has one additional columns.

#orc-schema-data.py
from pyspark.sql import SparkSession

appName = "PySpark Example - Orc Schema Merge"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

# List
data = [{
    'col1': 'Category A',
    'col2': 100
}, {
    'col1': 'Category B',
    'col2': 200
}, {
    'col1': 'Category C',
    'col2': 300
}]

data2 = [{
    'col1': 'Category D',
    'col2': 100,
    'col3': 'D - 1'
}, {
    'col1': 'Category E',
    'col2': 200,
    'col3': 'E - 1'
}]

# Save as Orc files
spark.createDataFrame(data).write.format('orc').mode('overwrite').save(
    'file:///home/kontext/pyspark-examples/data/orc-schema-merge')
spark.createDataFrame(data2).write.format('orc').mode('append').save(
    'file:///home/kontext/pyspark-examples/data/orc-schema-merge')	

Run the PySpark script using the following command:

spark-submit ./orc-schema-data.py

The script generates a few files like the following screenshot shows:

2022061860232-image.png

Now let's read the data.

Read with schema merge off

Let's first read the data with schema merge off using the following script:

#orc-shcema-merge-off.py
from pyspark.sql import SparkSession

appName = "PySpark Example - Orc Schema Merge"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .config('spark.sql.orc.mergeSchema', 'false') \
    .getOrCreate()

# Read Orc files
df = spark.read.format('orc').load(
    'file:///home/kontext/pyspark-examples/data/orc-schema-merge')
df.show()
infoThe line to configure schema merge is not necessary as the default value is false.

Run the above script using the following command:

spark-submit ./orc-schema-merge-off.py 

Depends on which file is picked for DataFrame schema, the output can be one of the following:

+----------+----+
|      col1|col2|
+----------+----+
|Category D| 100|
|Category E| 200|
|Category A| 100|
|Category B| 200|
|Category C| 300|
+----------+----+

or 

+----------+----+-----+
|      col1|col2| col3|
+----------+----+-----+
|Category D| 100|D - 1|
|Category E| 200|E - 1|
|Category A| 100| null|
|Category B| 200| null|
|Category C| 300| null|
+----------+----+-----+

Read with schema merge on

Let's first read the data with schema merge on using the following script:

#orc-shcema-merge-on.py
from pyspark.sql import SparkSession

appName = "PySpark Example - Orc Schema Merge"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .config('spark.sql.orc.mergeSchema', 'true') \
    .getOrCreate()

# Read Orc files
df = spark.read.format('orc').load(
    'file:///home/kontext/pyspark-examples/data/orc-schema-merge')
df.show()

Run the above script using the following command:

spark-submit ./orc-schema-merge-on.py 

As schema merge is on, the output will consolidate all the columns in both schema versions:

+----------+----+-----+
|      col1|col2| col3|
+----------+----+-----+
|Category D| 100|D - 1|
|Category E| 200|E - 1|
|Category A| 100| null|
|Category B| 200| null|
|Category C| 300| null|
+----------+----+-----+

References

Schema Merging (Evolution) with Parquet in Spark and Hive

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts