This site uses cookies to deliver our services. By using this site, you acknowledge that you have read and understand our Cookie and Privacy policy. Your use of Kontext website is subject to this policy. Allow Cookies and Dismiss

Implement SCD Type 2 Full Merge via Spark Data Frames

72 views 0 comments last modified about 19 days ago Raymond Tang

python spark pyspark

Overview

For SQL developers that are familiar with SCD and merge statements, you may wonder how to implement the same in big data platforms, considering database or storages in Hadoop are not designed/optimised for record level updates and inserts.

In this post, I’m going to demonstrate how to implement the same logic as a SQL Merge statement by using Spark. All these operations will be done in memory after reading your source and target data.  The code is implemented using Python by you can implement similar logic in Scala or R too.

Objective

Source data:

image

Target data (existing data, key is column id):

image

The purpose is to merge the source data into the target data set following a FULL Merge pattern.

image

Step by step

Imports the required packages and create Spark context

Follow the code below to import the required packages and also create a Spark context and a SQLContext object.

from pyspark.sql.functions import udf, lit, when, date_sub

from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField, StringType, BooleanType, DateType

import json

from pyspark import SparkContext, SparkConf, SQLContext

from pyspark.sql import Row

from datetime import datetime

appName = "Spark SCD Merge Example"

master = "local"

conf = SparkConf().setAppName(appName).setMaster(master)

sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

def quiet_logs(sc):

logger = sc._jvm.org.apache.log4j

logger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)

logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)

# hide info logs

quiet_logs(sc)

In this example, we are running Spark in local mode and you can change the master to yarn or any others. I also hide the info logs by setting the log level to ERROR.

Create the target data frame

Use the following code to create a Spark data frame.  In reality, you will need to read these data from HDFS, Hive or any other storages.

# Target data set

data_target = [

Row(1, "Hello!", False, False, datetime.strptime(

'2018-01-01', '%Y-%m-%d'), datetime.strptime('2018-12-31', '%Y-%m-%d')),

Row(1, "Hello World!", True, False, datetime.strptime(

'2019-01-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),

Row(2, "Hello Spark!", True, False, datetime.strptime(

'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),

Row(3, "Hello Old World!", True, False, datetime.strptime(

'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d'))

]

schema_target = StructType([

StructField("id", IntegerType(), True),

StructField("attr", StringType(), True),

StructField("is_current", BooleanType(), True),

StructField("is_deleted", BooleanType(), True),

StructField("start_date", DateType(), True),

StructField("end_date", DateType(), True)

])

df_target = sqlContext.createDataFrame(

sc.parallelize(data_target),

schema_target

)

df_target.show()

df_target.printSchema()

Create source data frame

Use similar approach to create a data frame that includes the source data.

# Source data set

data_source = [

Row(1, "Hello World!"),

Row(2, "Hello PySpark!"),

Row(4, "Hello Scala!")

]

schema_source = StructType([

StructField("src_id", IntegerType(), True),

StructField("src_attr", StringType(), True)

])

df_source = sqlContext.createDataFrame(

sc.parallelize(data_source),

schema_source

)

df_source.show()

df_source.printSchema()

Now, we can do a full join with these two data frames.

Implement full join between source and target data frames

As shown in the following code snippets, fullouter join type is used and the join keys are on column id and end_date. A new column action is also added to work what actions needs to be implemented for each record.

There are four possibilities for the actions:

  • UPSERT: attributes have changed in the source and the existing records need to be expired and new records need to be inserted.
  • DELETE: business keys no longer exist in source table and the records in target table need to be deleted logically.
  • INSERT: new business keys exist in source that need to be inserted into the target table directly.
  • NOACTION: no changes to the attributes or the records in target table are not current.

high_date = datetime.strptime('9999-12-31', '%Y-%m-%d').date()

print(high_date)

current_date = datetime.today().date()

print(current_date)

# Prepare for merge - Added effective and end date

df_source_new = df_source.withColumn('src_start_date', lit(

current_date)).withColumn('src_end_date', lit(high_date))

# FULL Merge, join on key column and also high date column to make only join to the latest records

df_merge = df_target.join(df_source_new, (df_source_new.src_id == df_target.id) &

(df_source_new.src_end_date == df_target.end_date), how='fullouter')

# Derive new column to indicate the action

df_merge = df_merge.withColumn('action',

when(df_merge.attr != df_merge.src_attr, 'UPSERT')

.when(df_merge.src_id.isNull() & df_merge.is_current, 'DELETE')

.when(df_merge.id.isNull(), 'INSERT')

.otherwise('NOACTION')

)

df_merge.show()

Implement the SCD type 2 actions

Now we can implement all the actions by generating different data frames:

# Generate the new data frames based on action code

column_names = ['id', 'attr', 'is_current',

'is_deleted', 'start_date', 'end_date']

# For records that needs no action

df_merge_p1 = df_merge.filter(

df_merge.action == 'NOACTION').select(column_names)

# For records that needs insert only

df_merge_p2 = df_merge.filter(df_merge.action == 'INSERT').select(df_merge.src_id.alias('id'),

df_merge.src_attr.alias(

'attr'),

lit(True).alias(

'is_current'),

lit(False).alias(

'is_deleted'),

df_merge.src_start_date.alias(

'start_date'),

df_merge.src_end_date.alias(

'end_date')

)

# For records that needs to be deleted

df_merge_p3 = df_merge.filter(

df_merge.action == 'DELETE').select(column_names).withColumn('is_current', lit(False)).withColumn('is_deleted', lit(True))

# For records that needs to be expired and then inserted

df_merge_p4_1 = df_merge.filter(df_merge.action == 'UPSERT').select(df_merge.src_id.alias('id'),

df_merge.src_attr.alias(

'attr'),

lit(True).alias(

'is_current'),

lit(False).alias(

'is_deleted'),

df_merge.src_start_date.alias(

'start_date'),

df_merge.src_end_date.alias(

'end_date')

)

df_merge_p4_2 = df_merge.filter(

df_merge.action == 'UPSERT').withColumn(

'end_date', date_sub(df_merge.src_start_date, 1)).withColumn(

'is_current', lit(False)).withColumn(

'is_deleted', lit(False)).select(column_names)

Union the data frames

Finally we can union the data frames to one.

# Union all records together

df_merge_final = df_merge_p1.unionAll(df_merge_p2).unionAll(

df_merge_p3).unionAll(df_merge_p4_1).unionAll(df_merge_p4_2)

df_merge_final.orderBy(['id', 'start_date']).show()

# At last, you can overwrite existing data using this new data frame.

# ...

Complete code example

from pyspark.sql.functions import udf, lit, when, date_sub

from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField, StringType, BooleanType, DateType

import json

from pyspark import SparkContext, SparkConf, SQLContext

from pyspark.sql import Row

from datetime import datetime

appName = "Spark SCD Merge Example"

master = "local"

conf = SparkConf().setAppName(appName).setMaster(master)

sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

def quiet_logs(sc):

logger = sc._jvm.org.apache.log4j

logger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)

logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)

# hide info logs

quiet_logs(sc)

# Target data set

data_target = [

Row(1, "Hello!", False, False, datetime.strptime(

'2018-01-01', '%Y-%m-%d'), datetime.strptime('2018-12-31', '%Y-%m-%d')),

Row(1, "Hello World!", True, False, datetime.strptime(

'2019-01-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),

Row(2, "Hello Spark!", True, False, datetime.strptime(

'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),

Row(3, "Hello Old World!", True, False, datetime.strptime(

'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d'))

]

schema_target = StructType([

StructField("id", IntegerType(), True),

StructField("attr", StringType(), True),

StructField("is_current", BooleanType(), True),

StructField("is_deleted", BooleanType(), True),

StructField("start_date", DateType(), True),

StructField("end_date", DateType(), True)

])

df_target = sqlContext.createDataFrame(

sc.parallelize(data_target),

schema_target

)

df_target.show()

df_target.printSchema()

# Source data set

data_source = [

Row(1, "Hello World!"),

Row(2, "Hello PySpark!"),

Row(4, "Hello Scala!")

]

schema_source = StructType([

StructField("src_id", IntegerType(), True),

StructField("src_attr", StringType(), True)

])

df_source = sqlContext.createDataFrame(

sc.parallelize(data_source),

schema_source

)

df_source.show()

df_source.printSchema()

high_date = datetime.strptime('9999-12-31', '%Y-%m-%d').date()

print(high_date)

current_date = datetime.today().date()

print(current_date)

# Prepare for merge - Added effective and end date

df_source_new = df_source.withColumn('src_start_date', lit(

current_date)).withColumn('src_end_date', lit(high_date))

# FULL Merge, join on key column and also high date column to make only join to the latest records

df_merge = df_target.join(df_source_new, (df_source_new.src_id == df_target.id) &

(df_source_new.src_end_date == df_target.end_date), how='fullouter')

# Derive new column to indicate the action

df_merge = df_merge.withColumn('action',

when(df_merge.attr != df_merge.src_attr, 'UPSERT')

.when(df_merge.src_id.isNull() & df_merge.is_current, 'DELETE')

.when(df_merge.id.isNull(), 'INSERT')

.otherwise('NOACTION')

)

df_merge.show()

# Generate the new data frames based on action code

column_names = ['id', 'attr', 'is_current',

'is_deleted', 'start_date', 'end_date']

# For records that needs no action

df_merge_p1 = df_merge.filter(

df_merge.action == 'NOACTION').select(column_names)

# For records that needs insert only

df_merge_p2 = df_merge.filter(df_merge.action == 'INSERT').select(df_merge.src_id.alias('id'),

df_merge.src_attr.alias(

'attr'),

lit(True).alias(

'is_current'),

lit(False).alias(

'is_deleted'),

df_merge.src_start_date.alias(

'start_date'),

df_merge.src_end_date.alias(

'end_date')

)

# For records that needs to be deleted

df_merge_p3 = df_merge.filter(

df_merge.action == 'DELETE').select(column_names).withColumn('is_current', lit(False)).withColumn('is_deleted', lit(True))

# For records that needs to be expired and then inserted

df_merge_p4_1 = df_merge.filter(df_merge.action == 'UPSERT').select(df_merge.src_id.alias('id'),

df_merge.src_attr.alias(

'attr'),

lit(True).alias(

'is_current'),

lit(False).alias(

'is_deleted'),

df_merge.src_start_date.alias(

'start_date'),

df_merge.src_end_date.alias(

'end_date')

)

df_merge_p4_2 = df_merge.filter(

df_merge.action == 'UPSERT').withColumn(

'end_date', date_sub(df_merge.src_start_date, 1)).withColumn(

'is_current', lit(False)).withColumn(

'is_deleted', lit(False)).select(column_names)

# Union all records together

df_merge_final = df_merge_p1.unionAll(df_merge_p2).unionAll(

df_merge_p3).unionAll(df_merge_p4_1).unionAll(df_merge_p4_2)

df_merge_final.orderBy(['id', 'start_date']).show()

# At last, you can overwrite existing data using this new data frame.

# ...

Other considerations

In this demo, I am reading all the target data into memory. In real world, you would not do this for performance consideration. Thus a better approach is to partition your data properly. For example, if the target data is stored in parquet format, you can partition the data by end_data. In this way, you only need to read the active partition into memory  to merge with source data. And also you can only overwrite a single partition in parquet too to save IO operations.  You will archive much better performance through this approach.

Refer to this document for more details:

https://arrow.apache.org/docs/python/parquet.html#partitioned-datasets-multiple-files

Related pages

PySpark: Convert JSON String Column to Array of Object (StructType) in Data Frame

145 views   0 comments last modified about 2 months ago

This post shows how to derive new column in a Spark data frame from a JSON array string column. I am running the code in Spark 2.2.1 though it is compatible with Spark 1.6.0 (with less JSON SQL functions). Prerequisites Refer to the following post to install Spark in Windows. ...

View detail

Write and Read Parquet Files in Spark/Scala

6407 views   2 comments last modified about 12 months ago

In this page, I’m going to demonstrate how to write and read parquet files in Spark/Scala by using Spark SQLContext class. Reference What is parquet format? Go the following project site to understand more about parquet. ...

View detail

Install Big Data Tools (Spark, Zeppelin, Hadoop) in Windows for Learning and Practice

1113 views   2 comments last modified about 10 months ago

Are you a Windows/.NET developer and willing to learn big data concepts and tools in your Windows? If yes, you can follow the links below to install them in your PC. The installations are usually easier to do in Linux/UNIX but they are not difficult to implement in Windows either since the...

View detail

Load Data into HDFS from SQL Server via Sqoop

966 views   0 comments last modified about 11 months ago

This page shows how to import data from SQL Server into Hadoop via Apache Sqoop. Prerequisites Please follow the link below to install Sqoop in your machine if you don’t have one environment ready. ...

View detail

Write and Read Parquet Files in HDFS through Spark/Scala

3594 views   0 comments last modified about 12 months ago

In my previous post, I demonstrated how to write and read parquet files in Spark/Scala. The parquet file destination is a local folder. Write and Read Parquet Files in Spark/Scala In this page...

View detail

Convert String to Date in Spark (Scala)

3353 views   0 comments last modified about 12 months ago

Context This pages demonstrates how to convert string to java.util.Date in Spark via Scala. Prerequisites If you have not installed Spark, follow the page below to install it: ...

View detail

Add comment

Please login first to add comments.  Log in New user?  Register

Comments (0)

No comments yet.

Contacts

  • enquiry[at]kontext.tech

Subscribe