Implement SCD Type 2 Full Merge via Spark Data Frames

access_time 2 years ago visibility10781 comment 0

Overview

For SQL developers that are familiar with SCD and merge statements, you may wonder how to implement the same in big data platforms, considering database or storages in Hadoop are not designed/optimised for record level updates and inserts.

In this post, I’m going to demonstrate how to implement the same logic as a SQL Merge statement by using Spark. All these operations will be done in memory after reading your source and target data.  The code is implemented using Python by you can implement similar logic in Scala or R too.

Objective

Source data:

image

Target data (existing data, key is column id):

image

The purpose is to merge the source data into the target data set following a FULL Merge pattern.

image

Step by step

Imports the required packages and create Spark context

Follow the code below to import the required packages and also create a Spark context and a SQLContext object.

from pyspark.sql.functions import udf, lit, when, date_sub
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField, StringType, BooleanType, DateType
import json
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import Row
from datetime import datetime
appName = "Spark SCD Merge Example"
master = "local"
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
def quiet_logs(sc):
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)
# hide info logs
quiet_logs(sc)

In this example, we are running Spark in local mode and you can change the master to yarn or any others. I also hide the info logs by setting the log level to ERROR.

Create the target data frame

Use the following code to create a Spark data frame.  In reality, you will need to read these data from HDFS, Hive or any other storages.

# Target data set
data_target = [
Row(1, "Hello!", False, False, datetime.strptime(
'2018-01-01', '%Y-%m-%d'), datetime.strptime('2018-12-31', '%Y-%m-%d')),
Row(1, "Hello World!", True, False, datetime.strptime(
'2019-01-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),
Row(2, "Hello Spark!", True, False, datetime.strptime(
'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),
Row(3, "Hello Old World!", True, False, datetime.strptime(
'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d'))
]
schema_target = StructType([
StructField("id", IntegerType(), True),
StructField("attr", StringType(), True),
StructField("is_current", BooleanType(), True),
StructField("is_deleted", BooleanType(), True),
StructField("start_date", DateType(), True),
StructField("end_date", DateType(), True)
])
df_target = sqlContext.createDataFrame(
sc.parallelize(data_target),
schema_target
)
df_target.show()
df_target.printSchema()

Create source data frame

Use similar approach to create a data frame that includes the source data.

# Source data set
data_source = [
Row(1, "Hello World!"),
Row(2, "Hello PySpark!"),
Row(4, "Hello Scala!")
]
schema_source = StructType([
StructField("src_id", IntegerType(), True),
StructField("src_attr", StringType(), True)
])
df_source = sqlContext.createDataFrame(
sc.parallelize(data_source),
schema_source
)
df_source.show()
df_source.printSchema()

Now, we can do a full join with these two data frames.

Implement full join between source and target data frames

As shown in the following code snippets, fullouter join type is used and the join keys are on column id and end_date. A new column action is also added to work what actions needs to be implemented for each record.

There are four possibilities for the actions:

  • UPSERT: attributes have changed in the source and the existing records need to be expired and new records need to be inserted.
  • DELETE: business keys no longer exist in source table and the records in target table need to be deleted logically.
  • INSERT: new business keys exist in source that need to be inserted into the target table directly.
  • NOACTION: no changes to the attributes or the records in target table are not current.
high_date = datetime.strptime('9999-12-31', '%Y-%m-%d').date()
print(high_date)
current_date = datetime.today().date()
print(current_date)
# Prepare for merge - Added effective and end date
df_source_new = df_source.withColumn('src_start_date', lit(
current_date)).withColumn('src_end_date', lit(high_date))
# FULL Merge, join on key column and also high date column to make only join to the latest records
df_merge = df_target.join(df_source_new, (df_source_new.src_id == df_target.id) &
(df_source_new.src_end_date == df_target.end_date), how='fullouter')
# Derive new column to indicate the action
df_merge = df_merge.withColumn('action',
when(df_merge.attr != df_merge.src_attr, 'UPSERT')
.when(df_merge.src_id.isNull() & df_merge.is_current, 'DELETE')
.when(df_merge.id.isNull(), 'INSERT')
.otherwise('NOACTION')
)
df_merge.show()

Implement the SCD type 2 actions

Now we can implement all the actions by generating different data frames:

# Generate the new data frames based on action code
column_names = ['id', 'attr', 'is_current',
'is_deleted', 'start_date', 'end_date']
# For records that needs no action
df_merge_p1 = df_merge.filter(
df_merge.action == 'NOACTION').select(column_names)
# For records that needs insert only
df_merge_p2 = df_merge.filter(df_merge.action == 'INSERT').select(df_merge.src_id.alias('id'),
df_merge.src_attr.alias(
'attr'),
lit(True).alias(
'is_current'),
lit(False).alias(
'is_deleted'),
df_merge.src_start_date.alias(
'start_date'),
df_merge.src_end_date.alias(
'end_date')
)
# For records that needs to be deleted
df_merge_p3 = df_merge.filter(
df_merge.action == 'DELETE').select(column_names).withColumn('is_current', lit(False)).withColumn('is_deleted', lit(True))
# For records that needs to be expired and then inserted
df_merge_p4_1 = df_merge.filter(df_merge.action == 'UPSERT').select(df_merge.src_id.alias('id'),
df_merge.src_attr.alias(
'attr'),
lit(True).alias(
'is_current'),
lit(False).alias(
'is_deleted'),
df_merge.src_start_date.alias(
'start_date'),
df_merge.src_end_date.alias(
'end_date')
)
df_merge_p4_2 = df_merge.filter(
df_merge.action == 'UPSERT').withColumn(
'end_date', date_sub(df_merge.src_start_date, 1)).withColumn(
'is_current', lit(False)).withColumn(
'is_deleted', lit(False)).select(column_names)

Union the data frames

Finally we can union the data frames to one.

# Union all records together
df_merge_final = df_merge_p1.unionAll(df_merge_p2).unionAll(
df_merge_p3).unionAll(df_merge_p4_1).unionAll(df_merge_p4_2)
df_merge_final.orderBy(['id', 'start_date']).show()
# At last, you can overwrite existing data using this new data frame.
# ...

Complete code example

from pyspark.sql.functions import udf, lit, when, date_sub
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField, StringType, BooleanType, DateType
import json
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import Row
from datetime import datetime
appName = "Spark SCD Merge Example"
master = "local"
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
def quiet_logs(sc):
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)
# hide info logs
quiet_logs(sc)
# Target data set
data_target = [
Row(1, "Hello!", False, False, datetime.strptime(
'2018-01-01', '%Y-%m-%d'), datetime.strptime('2018-12-31', '%Y-%m-%d')),
Row(1, "Hello World!", True, False, datetime.strptime(
'2019-01-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),
Row(2, "Hello Spark!", True, False, datetime.strptime(
'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),
Row(3, "Hello Old World!", True, False, datetime.strptime(
'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d'))
]
schema_target = StructType([
StructField("id", IntegerType(), True),
StructField("attr", StringType(), True),
StructField("is_current", BooleanType(), True),
StructField("is_deleted", BooleanType(), True),
StructField("start_date", DateType(), True),
StructField("end_date", DateType(), True)
])
df_target = sqlContext.createDataFrame(
sc.parallelize(data_target),
schema_target
)
df_target.show()
df_target.printSchema()
# Source data set
data_source = [
Row(1, "Hello World!"),
Row(2, "Hello PySpark!"),
Row(4, "Hello Scala!")
]
schema_source = StructType([
StructField("src_id", IntegerType(), True),
StructField("src_attr", StringType(), True)
])
df_source = sqlContext.createDataFrame(
sc.parallelize(data_source),
schema_source
)
df_source.show()
df_source.printSchema()
high_date = datetime.strptime('9999-12-31', '%Y-%m-%d').date()
print(high_date)
current_date = datetime.today().date()
print(current_date)
# Prepare for merge - Added effective and end date
df_source_new = df_source.withColumn('src_start_date', lit(
current_date)).withColumn('src_end_date', lit(high_date))
# FULL Merge, join on key column and also high date column to make only join to the latest records
df_merge = df_target.join(df_source_new, (df_source_new.src_id == df_target.id) &
(df_source_new.src_end_date == df_target.end_date), how='fullouter')
# Derive new column to indicate the action
df_merge = df_merge.withColumn('action',
when(df_merge.attr != df_merge.src_attr, 'UPSERT')
.when(df_merge.src_id.isNull() & df_merge.is_current, 'DELETE')
.when(df_merge.id.isNull(), 'INSERT')
.otherwise('NOACTION')
)
df_merge.show()
# Generate the new data frames based on action code
column_names = ['id', 'attr', 'is_current',
'is_deleted', 'start_date', 'end_date']
# For records that needs no action
df_merge_p1 = df_merge.filter(
df_merge.action == 'NOACTION').select(column_names)
# For records that needs insert only
df_merge_p2 = df_merge.filter(df_merge.action == 'INSERT').select(df_merge.src_id.alias('id'),
df_merge.src_attr.alias(
'attr'),
lit(True).alias(
'is_current'),
lit(False).alias(
'is_deleted'),
df_merge.src_start_date.alias(
'start_date'),
df_merge.src_end_date.alias(
'end_date')
)
# For records that needs to be deleted
df_merge_p3 = df_merge.filter(
df_merge.action == 'DELETE').select(column_names).withColumn('is_current', lit(False)).withColumn('is_deleted', lit(True))
# For records that needs to be expired and then inserted
df_merge_p4_1 = df_merge.filter(df_merge.action == 'UPSERT').select(df_merge.src_id.alias('id'),
df_merge.src_attr.alias(
'attr'),
lit(True).alias(
'is_current'),
lit(False).alias(
'is_deleted'),
df_merge.src_start_date.alias(
'start_date'),
df_merge.src_end_date.alias(
'end_date')
)
df_merge_p4_2 = df_merge.filter(
df_merge.action == 'UPSERT').withColumn(
'end_date', date_sub(df_merge.src_start_date, 1)).withColumn(
'is_current', lit(False)).withColumn(
'is_deleted', lit(False)).select(column_names)
# Union all records together
df_merge_final = df_merge_p1.unionAll(df_merge_p2).unionAll(
df_merge_p3).unionAll(df_merge_p4_1).unionAll(df_merge_p4_2)
df_merge_final.orderBy(['id', 'start_date']).show()
# At last, you can overwrite existing data using this new data frame.
# ...

Other considerations

In this demo, I am reading all the target data into memory. In real world, you would not do this for performance consideration. Thus a better approach is to partition your data properly. For example, if the target data is stored in parquet format, you can partition the data by end_data. In this way, you only need to read the active partition into memory  to merge with source data. And also you can only overwrite a single partition in parquet too to save IO operations.  You will archive much better performance through this approach.

Refer to this document for more details:

https://arrow.apache.org/docs/python/parquet.html#partitioned-datasets-multiple-files

info Last modified by Administrator 6 months ago copyright This page is subject to Site terms.
Like this article?
Share on

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

More from Kontext

PySpark Read Multiple Lines Records from CSV
visibility 2875
thumb_up 0
access_time 11 months ago

CSV is a common format used when extracting and exchanging data between systems and platforms. Once CSV file is ingested into HDFS, you can easily read them as DataFrame in Spark. However there are a few options you need to pay attention to especially if you source file: Has records across ...

visibility 1284
thumb_up 0
access_time 6 months ago

This article shows how to add a constant or literal column to Spark data frame using Python.  Follow article  Convert Python Dictionary List to PySpark DataFrame to construct a dataframe. +----------+---+------+ | Category| ID| Value| +----------+---+------+ |Category A| 1| ...

visibility 16
thumb_up 0
access_time 2 months ago

This article shows you how to filter NULL/None values from a Spark data frame using Scala. Function DataFrame.filter or DataFrame.where can be used to filter out null values.