python spark pyspark

Implement SCD Type 2 Full Merge via Spark Data Frames

2,931 views 0 comments about 8 months ago Raymond Tang

Overview

For SQL developers that are familiar with SCD and merge statements, you may wonder how to implement the same in big data platforms, considering database or storages in Hadoop are not designed/optimised for record level updates and inserts.

In this post, I’m going to demonstrate how to implement the same logic as a SQL Merge statement by using Spark. All these operations will be done in memory after reading your source and target data.  The code is implemented using Python by you can implement similar logic in Scala or R too.

Objective

Source data:

image

Target data (existing data, key is column id):

image

The purpose is to merge the source data into the target data set following a FULL Merge pattern.

image

Step by step

Imports the required packages and create Spark context

Follow the code below to import the required packages and also create a Spark context and a SQLContext object.

from pyspark.sql.functions import udf, lit, when, date_sub

from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField, StringType, BooleanType, DateType

import json

from pyspark import SparkContext, SparkConf, SQLContext

from pyspark.sql import Row

from datetime import datetime

appName = "Spark SCD Merge Example"

master = "local"

conf = SparkConf().setAppName(appName).setMaster(master)

sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

def quiet_logs(sc):

logger = sc._jvm.org.apache.log4j

logger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)

logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)

# hide info logs

quiet_logs(sc)

In this example, we are running Spark in local mode and you can change the master to yarn or any others. I also hide the info logs by setting the log level to ERROR.

Create the target data frame

Use the following code to create a Spark data frame.  In reality, you will need to read these data from HDFS, Hive or any other storages.

# Target data set

data_target = [

Row(1, "Hello!", False, False, datetime.strptime(

'2018-01-01', '%Y-%m-%d'), datetime.strptime('2018-12-31', '%Y-%m-%d')),

Row(1, "Hello World!", True, False, datetime.strptime(

'2019-01-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),

Row(2, "Hello Spark!", True, False, datetime.strptime(

'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),

Row(3, "Hello Old World!", True, False, datetime.strptime(

'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d'))

]

schema_target = StructType([

StructField("id", IntegerType(), True),

StructField("attr", StringType(), True),

StructField("is_current", BooleanType(), True),

StructField("is_deleted", BooleanType(), True),

StructField("start_date", DateType(), True),

StructField("end_date", DateType(), True)

])

df_target = sqlContext.createDataFrame(

sc.parallelize(data_target),

schema_target

)

df_target.show()

df_target.printSchema()

Create source data frame

Use similar approach to create a data frame that includes the source data.

# Source data set

data_source = [

Row(1, "Hello World!"),

Row(2, "Hello PySpark!"),

Row(4, "Hello Scala!")

]

schema_source = StructType([

StructField("src_id", IntegerType(), True),

StructField("src_attr", StringType(), True)

])

df_source = sqlContext.createDataFrame(

sc.parallelize(data_source),

schema_source

)

df_source.show()

df_source.printSchema()

Now, we can do a full join with these two data frames.

Implement full join between source and target data frames

As shown in the following code snippets, fullouter join type is used and the join keys are on column id and end_date. A new column action is also added to work what actions needs to be implemented for each record.

There are four possibilities for the actions:

  • UPSERT: attributes have changed in the source and the existing records need to be expired and new records need to be inserted.
  • DELETE: business keys no longer exist in source table and the records in target table need to be deleted logically.
  • INSERT: new business keys exist in source that need to be inserted into the target table directly.
  • NOACTION: no changes to the attributes or the records in target table are not current.

high_date = datetime.strptime('9999-12-31', '%Y-%m-%d').date()

print(high_date)

current_date = datetime.today().date()

print(current_date)

# Prepare for merge - Added effective and end date

df_source_new = df_source.withColumn('src_start_date', lit(

current_date)).withColumn('src_end_date', lit(high_date))

# FULL Merge, join on key column and also high date column to make only join to the latest records

df_merge = df_target.join(df_source_new, (df_source_new.src_id == df_target.id) &

(df_source_new.src_end_date == df_target.end_date), how='fullouter')

# Derive new column to indicate the action

df_merge = df_merge.withColumn('action',

when(df_merge.attr != df_merge.src_attr, 'UPSERT')

.when(df_merge.src_id.isNull() & df_merge.is_current, 'DELETE')

.when(df_merge.id.isNull(), 'INSERT')

.otherwise('NOACTION')

)

df_merge.show()

Implement the SCD type 2 actions

Now we can implement all the actions by generating different data frames:

# Generate the new data frames based on action code

column_names = ['id', 'attr', 'is_current',

'is_deleted', 'start_date', 'end_date']

# For records that needs no action

df_merge_p1 = df_merge.filter(

df_merge.action == 'NOACTION').select(column_names)

# For records that needs insert only

df_merge_p2 = df_merge.filter(df_merge.action == 'INSERT').select(df_merge.src_id.alias('id'),

df_merge.src_attr.alias(

'attr'),

lit(True).alias(

'is_current'),

lit(False).alias(

'is_deleted'),

df_merge.src_start_date.alias(

'start_date'),

df_merge.src_end_date.alias(

'end_date')

)

# For records that needs to be deleted

df_merge_p3 = df_merge.filter(

df_merge.action == 'DELETE').select(column_names).withColumn('is_current', lit(False)).withColumn('is_deleted', lit(True))

# For records that needs to be expired and then inserted

df_merge_p4_1 = df_merge.filter(df_merge.action == 'UPSERT').select(df_merge.src_id.alias('id'),

df_merge.src_attr.alias(

'attr'),

lit(True).alias(

'is_current'),

lit(False).alias(

'is_deleted'),

df_merge.src_start_date.alias(

'start_date'),

df_merge.src_end_date.alias(

'end_date')

)

df_merge_p4_2 = df_merge.filter(

df_merge.action == 'UPSERT').withColumn(

'end_date', date_sub(df_merge.src_start_date, 1)).withColumn(

'is_current', lit(False)).withColumn(

'is_deleted', lit(False)).select(column_names)

Union the data frames

Finally we can union the data frames to one.

# Union all records together

df_merge_final = df_merge_p1.unionAll(df_merge_p2).unionAll(

df_merge_p3).unionAll(df_merge_p4_1).unionAll(df_merge_p4_2)

df_merge_final.orderBy(['id', 'start_date']).show()

# At last, you can overwrite existing data using this new data frame.

# ...

Complete code example

from pyspark.sql.functions import udf, lit, when, date_sub

from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField, StringType, BooleanType, DateType

import json

from pyspark import SparkContext, SparkConf, SQLContext

from pyspark.sql import Row

from datetime import datetime

appName = "Spark SCD Merge Example"

master = "local"

conf = SparkConf().setAppName(appName).setMaster(master)

sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

def quiet_logs(sc):

logger = sc._jvm.org.apache.log4j

logger.LogManager.getLogger("org"). setLevel(logger.Level.ERROR)

logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)

# hide info logs

quiet_logs(sc)

# Target data set

data_target = [

Row(1, "Hello!", False, False, datetime.strptime(

'2018-01-01', '%Y-%m-%d'), datetime.strptime('2018-12-31', '%Y-%m-%d')),

Row(1, "Hello World!", True, False, datetime.strptime(

'2019-01-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),

Row(2, "Hello Spark!", True, False, datetime.strptime(

'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d')),

Row(3, "Hello Old World!", True, False, datetime.strptime(

'2019-02-01', '%Y-%m-%d'), datetime.strptime('9999-12-31', '%Y-%m-%d'))

]

schema_target = StructType([

StructField("id", IntegerType(), True),

StructField("attr", StringType(), True),

StructField("is_current", BooleanType(), True),

StructField("is_deleted", BooleanType(), True),

StructField("start_date", DateType(), True),

StructField("end_date", DateType(), True)

])

df_target = sqlContext.createDataFrame(

sc.parallelize(data_target),

schema_target

)

df_target.show()

df_target.printSchema()

# Source data set

data_source = [

Row(1, "Hello World!"),

Row(2, "Hello PySpark!"),

Row(4, "Hello Scala!")

]

schema_source = StructType([

StructField("src_id", IntegerType(), True),

StructField("src_attr", StringType(), True)

])

df_source = sqlContext.createDataFrame(

sc.parallelize(data_source),

schema_source

)

df_source.show()

df_source.printSchema()

high_date = datetime.strptime('9999-12-31', '%Y-%m-%d').date()

print(high_date)

current_date = datetime.today().date()

print(current_date)

# Prepare for merge - Added effective and end date

df_source_new = df_source.withColumn('src_start_date', lit(

current_date)).withColumn('src_end_date', lit(high_date))

# FULL Merge, join on key column and also high date column to make only join to the latest records

df_merge = df_target.join(df_source_new, (df_source_new.src_id == df_target.id) &

(df_source_new.src_end_date == df_target.end_date), how='fullouter')

# Derive new column to indicate the action

df_merge = df_merge.withColumn('action',

when(df_merge.attr != df_merge.src_attr, 'UPSERT')

.when(df_merge.src_id.isNull() & df_merge.is_current, 'DELETE')

.when(df_merge.id.isNull(), 'INSERT')

.otherwise('NOACTION')

)

df_merge.show()

# Generate the new data frames based on action code

column_names = ['id', 'attr', 'is_current',

'is_deleted', 'start_date', 'end_date']

# For records that needs no action

df_merge_p1 = df_merge.filter(

df_merge.action == 'NOACTION').select(column_names)

# For records that needs insert only

df_merge_p2 = df_merge.filter(df_merge.action == 'INSERT').select(df_merge.src_id.alias('id'),

df_merge.src_attr.alias(

'attr'),

lit(True).alias(

'is_current'),

lit(False).alias(

'is_deleted'),

df_merge.src_start_date.alias(

'start_date'),

df_merge.src_end_date.alias(

'end_date')

)

# For records that needs to be deleted

df_merge_p3 = df_merge.filter(

df_merge.action == 'DELETE').select(column_names).withColumn('is_current', lit(False)).withColumn('is_deleted', lit(True))

# For records that needs to be expired and then inserted

df_merge_p4_1 = df_merge.filter(df_merge.action == 'UPSERT').select(df_merge.src_id.alias('id'),

df_merge.src_attr.alias(

'attr'),

lit(True).alias(

'is_current'),

lit(False).alias(

'is_deleted'),

df_merge.src_start_date.alias(

'start_date'),

df_merge.src_end_date.alias(

'end_date')

)

df_merge_p4_2 = df_merge.filter(

df_merge.action == 'UPSERT').withColumn(

'end_date', date_sub(df_merge.src_start_date, 1)).withColumn(

'is_current', lit(False)).withColumn(

'is_deleted', lit(False)).select(column_names)

# Union all records together

df_merge_final = df_merge_p1.unionAll(df_merge_p2).unionAll(

df_merge_p3).unionAll(df_merge_p4_1).unionAll(df_merge_p4_2)

df_merge_final.orderBy(['id', 'start_date']).show()

# At last, you can overwrite existing data using this new data frame.

# ...

Other considerations

In this demo, I am reading all the target data into memory. In real world, you would not do this for performance consideration. Thus a better approach is to partition your data properly. For example, if the target data is stored in parquet format, you can partition the data by end_data. In this way, you only need to read the active partition into memory  to merge with source data. And also you can only overwrite a single partition in parquet too to save IO operations.  You will archive much better performance through this approach.

Refer to this document for more details:

https://arrow.apache.org/docs/python/parquet.html#partitioned-datasets-multiple-files

Add comment

Comments (0)
No comments yet.