Implement SCD Type 2 Full Merge via Spark Data Frames
Overview
For SQL developers that are familiar with SCD and merge statements, you may wonder how to implement the same in big data platforms, considering database or storages in Hadoop are not designed/optimised for record level updates and inserts.
In this post, I’m going to demonstrate how to implement the same logic as a SQL Merge statement by using Spark. All these operations will be done in memory after reading your source and target data. The code is implemented using Python by you can implement similar logic in Scala or R too.
Objective
Source data:
Target data (existing data, key is column id):
The purpose is to merge the source data into the target data set following a FULL Merge pattern.
Step by step
Imports the required packages and create Spark context
Follow the code below to import the required packages and also create a Spark context and a SQLContext object.
from pyspark.sql.functions import udf, lit, when, date_sub
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField, StringType, BooleanType, DateType
import json
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import Row
from datetime import datetime
appName = "Spark SCD Merge Example"
master = "local"
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
def quiet_logs(sc):
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)
# hide info logs
quiet_logs(sc)
In this example, we are running Spark in local mode and you can change the master to yarn or any others. I also hide the info logs by setting the log level to ERROR.
Create the target data frame
Use the following code to create a Spark data frame. In reality, you will need to read these data from HDFS, Hive or any other storages.
# Target data set
data_target = [
Row(1, "Hello!", False, False, datetime.strptime(
'2018-01-01', '%Y-%m-%d'), datetime.strptime('2018-12-31', '%Y-%m-%d')),
Row(1, "Hello World!", True, False, datetime.strptime(
'2019-01-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),
Row(2, "Hello Spark!", True, False, datetime.strptime(
'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),
Row(3, "Hello Old World!", True, False, datetime.strptime(
'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d'))
]
schema_target = StructType([
StructField("id", IntegerType(), True),
StructField("attr", StringType(), True),
StructField("is_current", BooleanType(), True),
StructField("is_deleted", BooleanType(), True),
StructField("start_date", DateType(), True),
StructField("end_date", DateType(), True)
])
df_target = sqlContext.createDataFrame(
sc.parallelize(data_target),
schema_target
)
df_target.show()
df_target.printSchema()
Create source data frame
Use similar approach to create a data frame that includes the source data.
# Source data set
data_source = [
Row(1, "Hello World!"),
Row(2, "Hello PySpark!"),
Row(4, "Hello Scala!")
]
schema_source = StructType([
StructField("src_id", IntegerType(), True),
StructField("src_attr", StringType(), True)
])
df_source = sqlContext.createDataFrame(
sc.parallelize(data_source),
schema_source
)
df_source.show()
df_source.printSchema()
Now, we can do a full join with these two data frames.
Implement full join between source and target data frames
As shown in the following code snippets, fullouter join type is used and the join keys are on column id and end_date. A new column action is also added to work what actions needs to be implemented for each record.
There are four possibilities for the actions:
- UPSERT: attributes have changed in the source and the existing records need to be expired and new records need to be inserted.
- DELETE: business keys no longer exist in source table and the records in target table need to be deleted logically.
- INSERT: new business keys exist in source that need to be inserted into the target table directly.
- NOACTION: no changes to the attributes or the records in target table are not current.
high_date = datetime.strptime('9999-12-31', '%Y-%m-%d').date()
print(high_date)
current_date = datetime.today().date()
print(current_date)
# Prepare for merge - Added effective and end date
df_source_new = df_source.withColumn('src_start_date', lit(
current_date)).withColumn('src_end_date', lit(high_date))
# FULL Merge, join on key column and also high date column to make only join to the latest records
df_merge = df_target.join(df_source_new, (df_source_new.src_id == df_target.id) &
(df_source_new.src_end_date == df_target.end_date), how='fullouter')
# Derive new column to indicate the action
df_merge = df_merge.withColumn('action',
when(df_merge.attr != df_merge.src_attr, 'UPSERT')
.when(df_merge.src_id.isNull() & df_merge.is_current, 'DELETE')
.when(df_merge.id.isNull(), 'INSERT')
.otherwise('NOACTION')
)
df_merge.show()
Implement the SCD type 2 actions
Now we can implement all the actions by generating different data frames:
# Generate the new data frames based on action code
column_names = ['id', 'attr', 'is_current',
'is_deleted', 'start_date', 'end_date']
# For records that needs no action
df_merge_p1 = df_merge.filter(
df_merge.action == 'NOACTION').select(column_names)
# For records that needs insert only
df_merge_p2 = df_merge.filter(df_merge.action == 'INSERT').select(df_merge.src_id.alias('id'),
df_merge.src_attr.alias(
'attr'),
lit(True).alias(
'is_current'),
lit(False).alias(
'is_deleted'),
df_merge.src_start_date.alias(
'start_date'),
df_merge.src_end_date.alias(
'end_date')
)
# For records that needs to be deleted
df_merge_p3 = df_merge.filter(
df_merge.action == 'DELETE').select(column_names).withColumn('is_current', lit(False)).withColumn('is_deleted', lit(True))
# For records that needs to be expired and then inserted
df_merge_p4_1 = df_merge.filter(df_merge.action == 'UPSERT').select(df_merge.src_id.alias('id'),
df_merge.src_attr.alias(
'attr'),
lit(True).alias(
'is_current'),
lit(False).alias(
'is_deleted'),
df_merge.src_start_date.alias(
'start_date'),
df_merge.src_end_date.alias(
'end_date')
)
df_merge_p4_2 = df_merge.filter(
df_merge.action == 'UPSERT').withColumn(
'end_date', date_sub(df_merge.src_start_date, 1)).withColumn(
'is_current', lit(False)).withColumn(
'is_deleted', lit(False)).select(column_names)
Union the data frames
Finally we can union the data frames to one.
# Union all records together
df_merge_final = df_merge_p1.unionAll(df_merge_p2).unionAll(
df_merge_p3).unionAll(df_merge_p4_1).unionAll(df_merge_p4_2)
df_merge_final.orderBy(['id', 'start_date']).show()
# At last, you can overwrite existing data using this new data frame.
# ...
Complete code example
from pyspark.sql.functions import udf, lit, when, date_sub
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField, StringType, BooleanType, DateType
import json
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import Row
from datetime import datetime
appName = "Spark SCD Merge Example"
master = "local"
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
def quiet_logs(sc):
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)
# hide info logs
quiet_logs(sc)
# Target data set
data_target = [
Row(1, "Hello!", False, False, datetime.strptime(
'2018-01-01', '%Y-%m-%d'), datetime.strptime('2018-12-31', '%Y-%m-%d')),
Row(1, "Hello World!", True, False, datetime.strptime(
'2019-01-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),
Row(2, "Hello Spark!", True, False, datetime.strptime(
'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),
Row(3, "Hello Old World!", True, False, datetime.strptime(
'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d'))
]
schema_target = StructType([
StructField("id", IntegerType(), True),
StructField("attr", StringType(), True),
StructField("is_current", BooleanType(), True),
StructField("is_deleted", BooleanType(), True),
StructField("start_date", DateType(), True),
StructField("end_date", DateType(), True)
])
df_target = sqlContext.createDataFrame(
sc.parallelize(data_target),
schema_target
)
df_target.show()
df_target.printSchema()
# Source data set
data_source = [
Row(1, "Hello World!"),
Row(2, "Hello PySpark!"),
Row(4, "Hello Scala!")
]
schema_source = StructType([
StructField("src_id", IntegerType(), True),
StructField("src_attr", StringType(), True)
])
df_source = sqlContext.createDataFrame(
sc.parallelize(data_source),
schema_source
)
df_source.show()
df_source.printSchema()
high_date = datetime.strptime('9999-12-31', '%Y-%m-%d').date()
print(high_date)
current_date = datetime.today().date()
print(current_date)
# Prepare for merge - Added effective and end date
df_source_new = df_source.withColumn('src_start_date', lit(
current_date)).withColumn('src_end_date', lit(high_date))
# FULL Merge, join on key column and also high date column to make only join to the latest records
df_merge = df_target.join(df_source_new, (df_source_new.src_id == df_target.id) &
(df_source_new.src_end_date == df_target.end_date), how='fullouter')
# Derive new column to indicate the action
df_merge = df_merge.withColumn('action',
when(df_merge.attr != df_merge.src_attr, 'UPSERT')
.when(df_merge.src_id.isNull() & df_merge.is_current, 'DELETE')
.when(df_merge.id.isNull(), 'INSERT')
.otherwise('NOACTION')
)
df_merge.show()
# Generate the new data frames based on action code
column_names = ['id', 'attr', 'is_current',
'is_deleted', 'start_date', 'end_date']
# For records that needs no action
df_merge_p1 = df_merge.filter(
df_merge.action == 'NOACTION').select(column_names)
# For records that needs insert only
df_merge_p2 = df_merge.filter(df_merge.action == 'INSERT').select(df_merge.src_id.alias('id'),
df_merge.src_attr.alias(
'attr'),
lit(True).alias(
'is_current'),
lit(False).alias(
'is_deleted'),
df_merge.src_start_date.alias(
'start_date'),
df_merge.src_end_date.alias(
'end_date')
)
# For records that needs to be deleted
df_merge_p3 = df_merge.filter(
df_merge.action == 'DELETE').select(column_names).withColumn('is_current', lit(False)).withColumn('is_deleted', lit(True))
# For records that needs to be expired and then inserted
df_merge_p4_1 = df_merge.filter(df_merge.action == 'UPSERT').select(df_merge.src_id.alias('id'),
df_merge.src_attr.alias(
'attr'),
lit(True).alias(
'is_current'),
lit(False).alias(
'is_deleted'),
df_merge.src_start_date.alias(
'start_date'),
df_merge.src_end_date.alias(
'end_date')
)
df_merge_p4_2 = df_merge.filter(
df_merge.action == 'UPSERT').withColumn(
'end_date', date_sub(df_merge.src_start_date, 1)).withColumn(
'is_current', lit(False)).withColumn(
'is_deleted', lit(False)).select(column_names)
# Union all records together
df_merge_final = df_merge_p1.unionAll(df_merge_p2).unionAll(
df_merge_p3).unionAll(df_merge_p4_1).unionAll(df_merge_p4_2)
df_merge_final.orderBy(['id', 'start_date']).show()
# At last, you can overwrite existing data using this new data frame.
# ...
Other considerations
In this demo, I am reading all the target data into memory. In real world, you would not do this for performance consideration. Thus a better approach is to partition your data properly. For example, if the target data is stored in parquet format, you can partition the data by end_data. In this way, you only need to read the active partition into memory to merge with source data. And also you can only overwrite a single partition in parquet too to save IO operations. You will archive much better performance through this approach.
Refer to this document for more details:
https://arrow.apache.org/docs/python/parquet.html#partitioned-datasets-multiple-files
Thanks for reading through, bro.
My comment in last graph is based on the assumption that the last partition includes all the latest records. Thus once SCD 2 merge is done, there will be two new partitions in your spark data frame:
- Expired records with current business date
- the latest records with high date (expiry date = 9999-12-31)
Assuming data is partitioned by end date, to write the results into HDFS, you can specify the partition keys for each of them instead of overwriting the whole data set.
*Note this solution may lead to high-skewness unless there are same amount of records need to be expired or inserted everyday.
Hope this makes sense.
Makes sense. Thanks a lot for the reply :-)
Nice article bro. Just a doubt, To optimize the full outer join, if we take only the active dataset, it might create a problem at the end, when we are replacing the final dataframe into original target. please clarify. Thanks.