Raymond Raymond

Delta Lake with PySpark Walkthrough

event 2022-08-26 visibility 6,059 comment 0 insights toc
more_vert
insights Stats
Delta Lake with PySpark Walkthrough

About Delta Lake

Delta Lake is an open source storage big data framework that supports Lakehouse architecture implementation. It works with computing engine like Spark, PrestoDB, Flink, Trino (Presto SQL) and Hive. The delta format files can be stored in cloud storages like GCS, Azure Data Lake Storage, AWS S3, HDFS, etc. It provides programming APIs for Scala, Java, Python, Rust, and Ruby.

This article provide a high-level introduction to Delta Lake with PySpark in a local Hadoop system. Examples will be provided to help you understand the concepts.

Prerequisites

For this article, I am using PySpark 3.3.0 with Delta Lake's latest release 2.1.0rc1.

If you don't have these environment available, you can install following these steps.

Install PySpark 3.3.0

You can follow this article to configure Spark 3.3.0 - Install Spark 3.3.0 on Linux or WSL

Install delta-spark Python package

Use the following command line to install it in your system where you will run delta code.

pip install -i https://test.pypi.org/simple/ delta-spark==2.1.0rc1

You may hit the following error:

ERROR: Could not find a version that satisfies the requirement importlib-metadata>=1.0.0 (from delta-spark) (from versions: none)
ERROR: No matching distribution found for importlib-metadata>=1.0.0

To fix it, run the following command:

pip install importlib-metadata

If you need to run the script in a distributed environment, make sure you install this package in all the nodes in the cluster. Alternatively, you can also pass on the required dependent package via command when submitting Spark script:

spark-submit --packages io.delta:delta-core_2.12:2.1.0rc1 ...

*Your cluster need web access (outbound 443 to the IP of PyPI website) to download the package.

Delta Lake release versions are only compatible with certain Spark version. Check out the details here: Releases — Delta Lake Documentation.

Advantages of Delta Lake

It's important for us to understand the benefits of Delta Lake before we deep dive into the details of Delta Lake. For example, why do we need Delta Lake?

There are many features available in Delta Lake but two of them are very important to consider(at least based on my experience):

  • ACID support. Traditionally, when we write data into a file system using Spark, the whole write operation is not atomic. For example, if the job fails in the middle, the data in the target folder will be corrupted. The walkaround is usually to backup the target before overwriting. With Delta Lake format, we don't need to worry about that any more. 
  • Merge statement. In relational databases like SQL Server, Oracle, Teradata, PostgreSQL, etc. we can use MERGE statement to merge data from staging table to targeted table (usually SCD type 2 table). In my previous post, I did publish an article about how to merge into SCD type 2 table using Spark, but the code is not concise and the performance might not be that great. 
  • Streaming data ingestion. Delta table can be used as target for streaming ingestion We can also use delta table as target for Spark structured streaming. This provides the opportunity to unify both batch and streaming within one pipeline.
  • Time travel. We can read older versions of data via time travel feature since the data is stored as versions.
infoInfo - You may find some similarity between Hive ACID feature and Delta Lake. Refer to Hive ACID Inserts, Updates and Deletes with ORC for more information.

Delta catalog

As most of the big data frameworks, Delta Lake also needs a data catalog. For example, Hive stores metadata in metastore which can be a relational database. In Delta Lake, this is implemented via Java class org.apache.spark.sql.delta.catalog.DeltaCatalog. The source code is available on GitHub: delta/DeltaCatalog.scala at edaeb86304211513c8028d056a7d90e98ec2839c · delta-io/delta.

Delta catalog extension can properly handle the interaction between the HiveMetaStore and Delta tables, i.e. metadata of Delta table can be stored in Hive metastore. It also delegates all operations on non Delta data sources to the Spark catalog (which can be Hive).

If you want to enable Hive to read Delta tables, you can following this repository: connectors/hive at master · delta-io/connectors. A uber package can be added into Hive to enable it.

infoInfo - Delta Lake tables can work directly without Hive metastore.

Now let's start to use Delta Lake with some examples.

Use Delta Lake in PySpark

Create a table

Create a PySpark script (spark-delta-lake.py) with the following content:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from delta import *

if __name__ == "__main__":
    app_name = "PySpark Delta Lake Example"
    master = "local"

    # Create Spark session with Delta extension

    builder = SparkSession.builder.appName(app_name) \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .master(master)

    spark = builder.getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    # Create a DataFrame
    df = spark.range(1, 10)
    df = df.withColumn('value', lit('ABC'))
    df.show()

    # Save as delta table
    df.write.format('delta').save('/data/delta/test_table')

Run the script using the following command:

spark-submit --packages "io.delta:delta-core_2.12:2.1.0rc1" spark-delta-lake.py
--packages argument is required if delta JAR is not included in your Spark class path so that Spark can download these dependent packages from repositories. If your cluster doesn't allow internet connections, you can download it manually and add it into your Spark class path included directories. The two configurations (spark.sql.extensions and spark.sql.catalog.spark_catalog) are important to add Delta Lake file format support into the Spark session. The script saves the data into /data/delta/test_table in HDFS.

The output looks like the following:

+---+-----+
| id|value|
+---+-----+
|  1|  ABC|
|  2|  ABC|
|  3|  ABC|
|  4|  ABC|
|  5|  ABC|
|  6|  ABC|
|  7|  ABC|
|  8|  ABC|
|  9|  ABC|
+---+-----+

Let's have a look into the delta table storage folder in HDFS:

$ hadoop fs -ls -R /data/delta/test_table
drwxr-xr-x   - kontext supergroup          0 2022-08-26 17:27 /data/delta/test_table/_delta_log
-rw-r--r--   1 kontext supergroup       1108 2022-08-26 17:27 /data/delta/test_table/_delta_log/00000000000000000000.json
-rw-r--r--   1 kontext supergroup        801 2022-08-26 17:27 /data/delta/test_table/part-00000-c0fe73b9-d321-4216-b437-1a61d7e33f46-c000.snappy.parquet

There are two files:

  • _delta_log/00000000000000000000.json: the delta log file. The integer part indicates the transaction number. 
  • *.parquet: the data file.

The content of the log JSON file has the following content:

{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"a7f73eca-cfad-4f34-bcd0-77922d386e9c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1661498855283}}
{"add":{"path":"part-00000-c0fe73b9-d321-4216-b437-1a61d7e33f46-c000.snappy.parquet","partitionValues":{},"size":801,"modificationTime":1661498857145,"dataChange":true,"stats":"{\"numRecords\":9,\"minValues\":{\"id\":1,\"value\":\"ABC\"},\"maxValues\":{\"id\":9,\"value\":\"ABC\"},\"nullCount\":{\"id\":0,\"value\":0}}"}}
{"commitInfo":{"timestamp":1661498859458,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"9","numOutputBytes":"801"},"engineInfo":"Apache-Spark/3.3.0 Delta-Lake/2.1.0rc1","txnId":"c598f97b-fb3a-4dab-b45c-053a27577408"}}

This records the metadata of this table in file system directly. Thus we can pretty much tell that - Delta Lake doesn't rely on Hive or Hive metastore as the metadata information is stored in file system directly via log files. We can directly save Delta Lake table into local file system too:

df.write.format('delta').save('file:///home/kontext/data/delta/test_table')

When querying from the delta table, the log files will be used. Each write will end up with new creations of log files. This mechanism is similar as Hive's ACID support with ORC delta files. We can also tell from the data layout that Delta Lake storage format is utilizing parquet format internally with the aid of JSON log files. 

Overwrite a delta table

We can directly overwrite an delta table by using mode API of DataFrameWriter.

df.write.format('delta').mode('overwrite').save('/data/delta/test_table')

An error will throw out if we are directly writing into an existing table without mode specified. Overwrite will end up with a new version created (and with a new delta log file).

Update a delta table

As Delta Lake table supports ACID, we can now update partial records using update API. 

from pyspark.sql import SparkSession
from delta.tables import *
from pyspark.sql.functions import *

if __name__ == "__main__":
    app_name = "PySpark Delta Lake Example"
    master = "local"

    # Create Spark session with Delta extension

    builder = SparkSession.builder.appName(app_name) \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .master(master)

    spark = builder.getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    # Update Delta Lake table
    test_table = DeltaTable.forPath(spark, "/data/delta/test_table")

    # Update rows with even id to append the id 
    test_table.update(
        condition=expr("id % 2 == 0"),
        set={"value": concat("value", lit('|'), "id")})
    
    df = test_table.toDF()
    df.show()

We can tell from the output that the even rows are updated for column value:

+---+-----+
| id|value|
+---+-----+
|  1|  ABC|
|  2|ABC|2|
|  3|  ABC|
|  4|ABC|4|
|  5|  ABC|
|  6|ABC|6|
|  7|  ABC|
|  8|ABC|8|
|  9|  ABC|
+---+-----+

Looking into the HDFS, we can find out a new data file and delta log file were created:

$ hadoop fs -ls -R /data/delta/test_table
drwxr-xr-x   - kontext supergroup          0 2022-08-26 17:30 /data/delta/test_table/_delta_log
-rw-r--r--   1 kontext supergroup       1108 2022-08-26 17:27 /data/delta/test_table/_delta_log/00000000000000000000.json
-rw-r--r--   1 kontext supergroup       1017 2022-08-26 17:30 /data/delta/test_table/_delta_log/00000000000000000001.json
-rw-r--r--   1 kontext supergroup        825 2022-08-26 17:30 /data/delta/test_table/part-00000-ba8ded50-4069-40c3-912e-a3f7eb30b8de-c000.snappy.parquet
-rw-r--r--   1 kontext supergroup        801 2022-08-26 17:27 /data/delta/test_table/part-00000-c0fe73b9-d321-4216-b437-1a61d7e33f46-c000.snappy.parquet

The second delta log file includes information about the UPDATE operation which includes remove and add:

{"remove":{"path":"part-00000-c0fe73b9-d321-4216-b437-1a61d7e33f46-c000.snappy.parquet","deletionTimestamp":1661499047941,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":801}}
{"add":{"path":"part-00000-ba8ded50-4069-40c3-912e-a3f7eb30b8de-c000.snappy.parquet","partitionValues":{},"size":825,"modificationTime":1661499047845,"dataChange":true,"stats":"{\"numRecords\":9,\"minValues\":{\"id\":1,\"value\":\"ABC\"},\"maxValues\":{\"id\":9,\"value\":\"ABC|8\"},\"nullCount\":{\"id\":0,\"value\":0}}"}}
{"commitInfo":{"timestamp":1661499047961,"operation":"UPDATE","operationParameters":{"predicate":"((id#499L % 2) = 0)"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numCopiedRows":"5","numAddedChangeFiles":"0","executionTimeMs":"4580","scanTimeMs":"3481","numAddedFiles":"1","numUpdatedRows":"4","rewriteTimeMs":"1096"},"engineInfo":"Apache-Spark/3.3.0 Delta-Lake/2.1.0rc1","txnId":"16af137e-5139-4eea-82e3-8f21f99f0615"}}

Delete records from table

The following code snippets delete records from delta table:

    # delete rows where id = 6
    test_table.delete(
        condition=expr("id == 6"))
    
    df = test_table.toDF()
    df.show()

As the following output shows, record 6 is deleted:

+---+-----+
| id|value|
+---+-----+
|  1|  ABC|
|  2|ABC|2|
|  3|  ABC|
|  4|ABC|4|
|  5|  ABC|
|  7|  ABC|
|  8|ABC|8|
|  9|  ABC|
+---+-----+

In HDFS, a new delta log file and a new parquet file is added:

drwxr-xr-x   - kontext supergroup          0 2022-08-26 17:38 /data/delta/test_table/_delta_log
-rw-r--r--   1 kontext supergroup       1108 2022-08-26 17:27 /data/delta/test_table/_delta_log/00000000000000000000.json
-rw-r--r--   1 kontext supergroup       1017 2022-08-26 17:30 /data/delta/test_table/_delta_log/00000000000000000001.json
-rw-r--r--   1 kontext supergroup       1012 2022-08-26 17:38 /data/delta/test_table/_delta_log/00000000000000000002.json
-rw-r--r--   1 kontext supergroup        810 2022-08-26 17:38 /data/delta/test_table/part-00000-55a899fb-1789-4753-b43e-aad0a4bd645f-c000.snappy.parquet
-rw-r--r--   1 kontext supergroup        825 2022-08-26 17:30 /data/delta/test_table/part-00000-ba8ded50-4069-40c3-912e-a3f7eb30b8de-c000.snappy.parquet
-rw-r--r--   1 kontext supergroup        801 2022-08-26 17:27 /data/delta/test_table/part-00000-c0fe73b9-d321-4216-b437-1a61d7e33f46-c000.snappy.parquet

Merge data (upserts)

As mentioned at the beginning, one the features I like most is MERGE.

The following code snippet merges data from one staging DataFrame to the delta table:

    # Create a staging DataFrame for merge.
    df_stg = spark.range(9, 15)
    df_stg = df_stg.withColumn('value', lit('EDF'))

    # Merge into test_table
    merge = test_table.alias('tgt').merge(df_stg.alias('src'),
                                  "src.id = tgt.id") \
        .whenMatchedUpdate(set={"value": col("src.value")}) \
        .whenNotMatchedInsert(values={"id": col("src.id"), "value": col("src.value")})
    merge.execute()

The output:

+---+-----+
| id|value|
+---+-----+
|  1|  ABC|
|  2|ABC|2|
|  3|  ABC|
|  4|ABC|4|
|  5|  ABC|
|  7|  ABC|
|  8|ABC|8|
|  9|  EDF|
| 10|  EDF|
| 11|  EDF|
| 12|  EDF|
| 13|  EDF|
| 14|  EDF|
+---+-----+

Record with id=9 is updated and 5 new records inserted. 

Again, there are new delta log file and data file created in the file system similar as above example. 

Read from delta table and time travel

Another cool feature provided by Delta Lake is time travel. Time travel allows us to go back to previous versions (snapshots) of the table. Thanks to the delta log files, this can be easily done using versionAsOf option when reading from delta table.

from pyspark.sql import SparkSession
from delta.tables import *
from pyspark.sql.functions import *

if __name__ == "__main__":
    app_name = "PySpark Delta Lake Read Example"
    master = "local"

    # Create Spark session with Delta extension

    builder = SparkSession.builder.appName(app_name) \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .master(master)

    spark = builder.getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    # Read from Delta Lake table (the latest version)
    df = spark.read.format('delta').load("/data/delta/test_table")
    df.show()

    # Read version 1
    df_v1 = spark.read.format('delta').option(
        'versionAsOf', 1).load("/data/delta/test_table")
    df_v1.show()

Output:

+---+-----+
| id|value|
+---+-----+
|  1|  ABC|
|  2|ABC|2|
|  3|  ABC|
|  4|ABC|4|
|  5|  ABC|
|  7|  ABC|
|  8|ABC|8|
|  9|  EDF|
| 10|  EDF|
| 11|  EDF|
| 12|  EDF|
| 13|  EDF|
| 14|  EDF|
+---+-----+

+---+-----+
| id|value|
+---+-----+
|  1|  ABC|
|  2|ABC|2|
|  3|  ABC|
|  4|ABC|4|
|  5|  ABC|
|  6|ABC|6|
|  7|  ABC|
|  8|ABC|8|
|  9|  ABC|
+---+-----+

The second one is from version 1 (i.e. after the UPDATE operation we did above).

Summary

As the above examples show, we can easily create and update delta tables and we can also go back to certain versions. This save a log effort to implement Data Lake tables.

I hope you now have a basic understanding of Delta Lake. In future articles, I will discuss more aspects of Delta Lake.

Feel free to post a comment if you have any questions.

References

Home | Delta Lake

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts