Delta Lake with PySpark Walkthrough
- About Delta Lake
- Prerequisites
- Install PySpark 3.3.0
- Install delta-spark Python package
- Advantages of Delta Lake
- Delta catalog
- Use Delta Lake in PySpark
- Create a table
- Overwrite a delta table
- Update a delta table
- Delete records from table
- Merge data (upserts)
- Read from delta table and time travel
- Summary
- References
About Delta Lake
Delta Lake is an open source storage big data framework that supports Lakehouse architecture implementation. It works with computing engine like Spark, PrestoDB, Flink, Trino (Presto SQL) and Hive. The delta format files can be stored in cloud storages like GCS, Azure Data Lake Storage, AWS S3, HDFS, etc. It provides programming APIs for Scala, Java, Python, Rust, and Ruby.
This article provide a high-level introduction to Delta Lake with PySpark in a local Hadoop system. Examples will be provided to help you understand the concepts.
Prerequisites
For this article, I am using PySpark 3.3.0 with Delta Lake's latest release 2.1.0rc1.
If you don't have these environment available, you can install following these steps.
Install PySpark 3.3.0
You can follow this article to configure Spark 3.3.0 - Install Spark 3.3.0 on Linux or WSL.
Install delta-spark Python package
Use the following command line to install it in your system where you will run delta code.
pip install -i https://test.pypi.org/simple/ delta-spark==2.1.0rc1
You may hit the following error:
ERROR: Could not find a version that satisfies the requirement importlib-metadata>=1.0.0 (from delta-spark) (from versions: none) ERROR: No matching distribution found for importlib-metadata>=1.0.0
To fix it, run the following command:
pip install importlib-metadata
If you need to run the script in a distributed environment, make sure you install this package in all the nodes in the cluster. Alternatively, you can also pass on the required dependent package via command when submitting Spark script:
spark-submit --packages io.delta:delta-core_2.12:2.1.0rc1 ...
*Your cluster need web access (outbound 443 to the IP of PyPI website) to download the package.
Delta Lake release versions are only compatible with certain Spark version. Check out the details here: Releases — Delta Lake Documentation.
Advantages of Delta Lake
It's important for us to understand the benefits of Delta Lake before we deep dive into the details of Delta Lake. For example, why do we need Delta Lake?
There are many features available in Delta Lake but two of them are very important to consider(at least based on my experience):
- ACID support. Traditionally, when we write data into a file system using Spark, the whole write operation is not atomic. For example, if the job fails in the middle, the data in the target folder will be corrupted. The walkaround is usually to backup the target before overwriting. With Delta Lake format, we don't need to worry about that any more.
- Merge statement. In relational databases like SQL Server, Oracle, Teradata, PostgreSQL, etc. we can use MERGE statement to merge data from staging table to targeted table (usually SCD type 2 table). In my previous post, I did publish an article about how to merge into SCD type 2 table using Spark, but the code is not concise and the performance might not be that great.
- Streaming data ingestion. Delta table can be used as target for streaming ingestion We can also use delta table as target for Spark structured streaming. This provides the opportunity to unify both batch and streaming within one pipeline.
- Time travel. We can read older versions of data via time travel feature since the data is stored as versions.
Delta catalog
As most of the big data frameworks, Delta Lake also needs a data catalog. For example, Hive stores metadata in metastore which can be a relational database. In Delta Lake, this is implemented via Java class org.apache.spark.sql.delta.catalog.DeltaCatalog
. The source code is available on GitHub: delta/DeltaCatalog.scala at edaeb86304211513c8028d056a7d90e98ec2839c · delta-io/delta.
Delta catalog extension can properly handle the interaction between the HiveMetaStore and Delta tables, i.e. metadata of Delta table can be stored in Hive metastore. It also delegates all operations on non Delta data sources to the Spark catalog (which can be Hive).
If you want to enable Hive to read Delta tables, you can following this repository: connectors/hive at master · delta-io/connectors. A uber package can be added into Hive to enable it.
Now let's start to use Delta Lake with some examples.
Use Delta Lake in PySpark
Create a table
Create a PySpark script (spark-delta-lake.py) with the following content:
from pyspark.sql import SparkSession from pyspark.sql.functions import lit from delta import * if __name__ == "__main__": app_name = "PySpark Delta Lake Example" master = "local" # Create Spark session with Delta extension builder = SparkSession.builder.appName(app_name) \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .master(master) spark = builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") # Create a DataFrame df = spark.range(1, 10) df = df.withColumn('value', lit('ABC')) df.show() # Save as delta table df.write.format('delta').save('/data/delta/test_table')
Run the script using the following command:
spark-submit --packages "io.delta:delta-core_2.12:2.1.0rc1" spark-delta-lake.py
--packages
argument is required if delta JAR is not included in your Spark class path so that Spark can download these dependent packages from repositories. If your cluster doesn't allow internet connections, you can download it manually and add it into your Spark class path included directories. The two configurations (spark.sql.extensions
and spark.sql.catalog.spark_catalog
) are important to add Delta Lake file format support into the Spark session. The script saves the data into /data/delta/test_table
in HDFS.The output looks like the following:
+---+-----+
| id|value|
+---+-----+
| 1| ABC|
| 2| ABC|
| 3| ABC|
| 4| ABC|
| 5| ABC|
| 6| ABC|
| 7| ABC|
| 8| ABC|
| 9| ABC|
+---+-----+
Let's have a look into the delta table storage folder in HDFS:
$ hadoop fs -ls -R /data/delta/test_table drwxr-xr-x - kontext supergroup 0 2022-08-26 17:27 /data/delta/test_table/_delta_log -rw-r--r-- 1 kontext supergroup 1108 2022-08-26 17:27 /data/delta/test_table/_delta_log/00000000000000000000.json -rw-r--r-- 1 kontext supergroup 801 2022-08-26 17:27 /data/delta/test_table/part-00000-c0fe73b9-d321-4216-b437-1a61d7e33f46-c000.snappy.parquet
There are two files:
- _delta_log/00000000000000000000.json: the delta log file. The integer part indicates the transaction number.
- *.parquet: the data file.
The content of the log JSON file has the following content:
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} {"metaData":{"id":"a7f73eca-cfad-4f34-bcd0-77922d386e9c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1661498855283}} {"add":{"path":"part-00000-c0fe73b9-d321-4216-b437-1a61d7e33f46-c000.snappy.parquet","partitionValues":{},"size":801,"modificationTime":1661498857145,"dataChange":true,"stats":"{\"numRecords\":9,\"minValues\":{\"id\":1,\"value\":\"ABC\"},\"maxValues\":{\"id\":9,\"value\":\"ABC\"},\"nullCount\":{\"id\":0,\"value\":0}}"}} {"commitInfo":{"timestamp":1661498859458,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"9","numOutputBytes":"801"},"engineInfo":"Apache-Spark/3.3.0 Delta-Lake/2.1.0rc1","txnId":"c598f97b-fb3a-4dab-b45c-053a27577408"}}
This records the metadata of this table in file system directly. Thus we can pretty much tell that - Delta Lake doesn't rely on Hive or Hive metastore as the metadata information is stored in file system directly via log files. We can directly save Delta Lake table into local file system too:
df.write.format('delta').save('file:///home/kontext/data/delta/test_table')
When querying from the delta table, the log files will be used. Each write will end up with new creations of log files. This mechanism is similar as Hive's ACID support with ORC delta files. We can also tell from the data layout that Delta Lake storage format is utilizing parquet format internally with the aid of JSON log files.
Overwrite a delta table
We can directly overwrite an delta table by using mode
API of DataFrameWriter
.
df.write.format('delta').mode('overwrite').save('/data/delta/test_table')
An error will throw out if we are directly writing into an existing table without mode
specified. Overwrite will end up with a new version created (and with a new delta log file).
Update a delta table
As Delta Lake table supports ACID, we can now update partial records using update
API.
from pyspark.sql import SparkSession from delta.tables import * from pyspark.sql.functions import * if __name__ == "__main__": app_name = "PySpark Delta Lake Example" master = "local" # Create Spark session with Delta extension builder = SparkSession.builder.appName(app_name) \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .master(master) spark = builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") # Update Delta Lake table test_table = DeltaTable.forPath(spark, "/data/delta/test_table") # Update rows with even id to append the id test_table.update( condition=expr("id % 2 == 0"), set={"value": concat("value", lit('|'), "id")}) df = test_table.toDF() df.show()
We can tell from the output that the even rows are updated for column value
:
+---+-----+ | id|value| +---+-----+ | 1| ABC| | 2|ABC|2| | 3| ABC| | 4|ABC|4| | 5| ABC| | 6|ABC|6| | 7| ABC| | 8|ABC|8| | 9| ABC| +---+-----+
Looking into the HDFS, we can find out a new data file and delta log file were created:
$ hadoop fs -ls -R /data/delta/test_table drwxr-xr-x - kontext supergroup 0 2022-08-26 17:30 /data/delta/test_table/_delta_log -rw-r--r-- 1 kontext supergroup 1108 2022-08-26 17:27 /data/delta/test_table/_delta_log/00000000000000000000.json -rw-r--r-- 1 kontext supergroup 1017 2022-08-26 17:30 /data/delta/test_table/_delta_log/00000000000000000001.json -rw-r--r-- 1 kontext supergroup 825 2022-08-26 17:30 /data/delta/test_table/part-00000-ba8ded50-4069-40c3-912e-a3f7eb30b8de-c000.snappy.parquet -rw-r--r-- 1 kontext supergroup 801 2022-08-26 17:27 /data/delta/test_table/part-00000-c0fe73b9-d321-4216-b437-1a61d7e33f46-c000.snappy.parquet
The second delta log file includes information about the UPDATE operation which includes remove and add:
{"remove":{"path":"part-00000-c0fe73b9-d321-4216-b437-1a61d7e33f46-c000.snappy.parquet","deletionTimestamp":1661499047941,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":801}} {"add":{"path":"part-00000-ba8ded50-4069-40c3-912e-a3f7eb30b8de-c000.snappy.parquet","partitionValues":{},"size":825,"modificationTime":1661499047845,"dataChange":true,"stats":"{\"numRecords\":9,\"minValues\":{\"id\":1,\"value\":\"ABC\"},\"maxValues\":{\"id\":9,\"value\":\"ABC|8\"},\"nullCount\":{\"id\":0,\"value\":0}}"}} {"commitInfo":{"timestamp":1661499047961,"operation":"UPDATE","operationParameters":{"predicate":"((id#499L % 2) = 0)"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numCopiedRows":"5","numAddedChangeFiles":"0","executionTimeMs":"4580","scanTimeMs":"3481","numAddedFiles":"1","numUpdatedRows":"4","rewriteTimeMs":"1096"},"engineInfo":"Apache-Spark/3.3.0 Delta-Lake/2.1.0rc1","txnId":"16af137e-5139-4eea-82e3-8f21f99f0615"}}
Delete records from table
The following code snippets delete records from delta table:
# delete rows where id = 6 test_table.delete( condition=expr("id == 6")) df = test_table.toDF() df.show()
As the following output shows, record 6 is deleted:
+---+-----+ | id|value| +---+-----+ | 1| ABC| | 2|ABC|2| | 3| ABC| | 4|ABC|4| | 5| ABC| | 7| ABC| | 8|ABC|8| | 9| ABC| +---+-----+
In HDFS, a new delta log file and a new parquet file is added:
drwxr-xr-x - kontext supergroup 0 2022-08-26 17:38 /data/delta/test_table/_delta_log -rw-r--r-- 1 kontext supergroup 1108 2022-08-26 17:27 /data/delta/test_table/_delta_log/00000000000000000000.json -rw-r--r-- 1 kontext supergroup 1017 2022-08-26 17:30 /data/delta/test_table/_delta_log/00000000000000000001.json -rw-r--r-- 1 kontext supergroup 1012 2022-08-26 17:38 /data/delta/test_table/_delta_log/00000000000000000002.json -rw-r--r-- 1 kontext supergroup 810 2022-08-26 17:38 /data/delta/test_table/part-00000-55a899fb-1789-4753-b43e-aad0a4bd645f-c000.snappy.parquet -rw-r--r-- 1 kontext supergroup 825 2022-08-26 17:30 /data/delta/test_table/part-00000-ba8ded50-4069-40c3-912e-a3f7eb30b8de-c000.snappy.parquet -rw-r--r-- 1 kontext supergroup 801 2022-08-26 17:27 /data/delta/test_table/part-00000-c0fe73b9-d321-4216-b437-1a61d7e33f46-c000.snappy.parquet
Merge data (upserts)
As mentioned at the beginning, one the features I like most is MERGE.
The following code snippet merges data from one staging DataFrame to the delta table:
# Create a staging DataFrame for merge. df_stg = spark.range(9, 15) df_stg = df_stg.withColumn('value', lit('EDF')) # Merge into test_table merge = test_table.alias('tgt').merge(df_stg.alias('src'), "src.id = tgt.id") \ .whenMatchedUpdate(set={"value": col("src.value")}) \ .whenNotMatchedInsert(values={"id": col("src.id"), "value": col("src.value")}) merge.execute()
The output:
+---+-----+ | id|value| +---+-----+ | 1| ABC| | 2|ABC|2| | 3| ABC| | 4|ABC|4| | 5| ABC| | 7| ABC| | 8|ABC|8| | 9| EDF| | 10| EDF| | 11| EDF| | 12| EDF| | 13| EDF| | 14| EDF| +---+-----+
Record with id=9 is updated and 5 new records inserted.
Again, there are new delta log file and data file created in the file system similar as above example.
Read from delta table and time travel
Another cool feature provided by Delta Lake is time travel. Time travel allows us to go back to previous versions (snapshots) of the table. Thanks to the delta log files, this can be easily done using versionAsOf
option when reading from delta table.
from pyspark.sql import SparkSession from delta.tables import * from pyspark.sql.functions import * if __name__ == "__main__": app_name = "PySpark Delta Lake Read Example" master = "local" # Create Spark session with Delta extension builder = SparkSession.builder.appName(app_name) \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .master(master) spark = builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") # Read from Delta Lake table (the latest version) df = spark.read.format('delta').load("/data/delta/test_table") df.show() # Read version 1 df_v1 = spark.read.format('delta').option( 'versionAsOf', 1).load("/data/delta/test_table") df_v1.show()
Output:
+---+-----+ | id|value| +---+-----+ | 1| ABC| | 2|ABC|2| | 3| ABC| | 4|ABC|4| | 5| ABC| | 7| ABC| | 8|ABC|8| | 9| EDF| | 10| EDF| | 11| EDF| | 12| EDF| | 13| EDF| | 14| EDF| +---+-----+ +---+-----+ | id|value| +---+-----+ | 1| ABC| | 2|ABC|2| | 3| ABC| | 4|ABC|4| | 5| ABC| | 6|ABC|6| | 7| ABC| | 8|ABC|8| | 9| ABC| +---+-----+
The second one is from version 1 (i.e. after the UPDATE operation we did above).
Summary
As the above examples show, we can easily create and update delta tables and we can also go back to certain versions. This save a log effort to implement Data Lake tables.
I hope you now have a basic understanding of Delta Lake. In future articles, I will discuss more aspects of Delta Lake.
Feel free to post a comment if you have any questions.