Raymond Raymond / Spark & PySpark

Streaming from Kafka to Delta Lake Table via PySpark

event 2022-08-26 visibility 2,101 comment 0 insights toc
more_vert
insights Stats

In article Delta Lake with PySpark Walkthrough, I walked through the features of Delta Lake with examples. In this article, I am going to continue the discussion with the streaming capability of delta lake format. 

Prerequisites

If you have not used Delta Lake before, please refer to Delta Lake with PySpark Walkthrough to understand the basics first.

For the coding examples in this article, a Kafka topic is used as streaming source:

  • Topic name: 'kontext-events'
  • Bootstrap server:  'localhost:9092'

The Kafka instance is created following tutorial Install and Run Kafka 3.2.0 On WSL.

Write a stream of data to a delta table

We can write a stream of data into a delta table using structured streaming. Delta's transaction log guarantees that each message will be processed exactly once. It also supports concurrent write into the same table from multiple streams or batch jobs. By default, the messages in the stream will be appended into the delta table.

Let's create a sample script to write data into a delta table. For structured streaming part, we will keep it very similar as Spark Structured Streaming - Read from and Write into Kafka Topics.

Create the PySpark script

Create a PySpark script named spark-kafka-to-delta.py with the following content:

import time 
from pyspark.sql import SparkSession
from delta.tables import *
from pyspark.sql.functions import *

if __name__ == "__main__":
    app_name = "PySpark Delta Lake Streaming Example"
    master = "local"

    # Create Spark session with Delta extension

    builder = SparkSession.builder.appName(app_name) \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .master(master)

    spark = builder.getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    kafka_topic = "kontext-events"
    kafka_servers = "localhost:9092"

    # Create a streaming DataFrame from Kafka
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_servers) \
        .option("subscribe", kafka_topic) \
        .option("startingOffsets", "earliest") \
        .load()

    # Write into delta table (/data/delta/kafka-events)
    stream = df.selectExpr("CAST(key AS STRING) as key",
                           "CAST(value AS STRING) as value") \
        .writeStream.format("delta") \
        .option("checkpointLocation", "/data/delta/checkpoint") \
        .start("/data/delta/kafka-events")

    time.sleep(120)

    # Stop the stream
    while stream.isActive:
        msg = stream.status['message']
        data_avail = stream.status['isDataAvailable']
        trigger_active = stream.status['isTriggerActive']
        if not data_avail and not trigger_active and msg != "Initializing sources":
            print('Stopping query...')
            stream.stop()
        time.sleep(0.5)

    # Okay wait for the stop to happen
    print('Awaiting termination...')
    stream.awaitTermination()

Run the script

Run the script using the following command:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,io.delta:delta-core_2.12:2.1.0rc1 spark-kafka-to-delta.py

The packages argument includes the dependency to spark-sql-kafka library as it structured streaming package is not included in Spark by default.

The script does the following:

  • Create a SparkSession object with Delta Lake extension.
  • Create a streaming DataFrame from Kafka bootstrap server localhost:9092 and topic kontext-events. The offset is set as earliest. 
  • Write the streaming DataFrame into delta table (/data/delta/kafka-events).
  • The Python thread wait for 2 minutes and then stop the streaming query gracefully. I did this for demo purpose while in production our streaming application can run 7/24 depends on requirements. 

When you run the script, you may encounter an error like java.lang.NoSuchMethodError: PoolConfig.setMinEvictableIdleTime. Follow the workaround in the article to resolve it.\

Read the results

Once the script is terminated, we can have a look at the ingested data in HDFS via the following command:

hadoop fs -ls -R /data/delta/kafka-events/

It prints out the following text:

$ hadoop fs -ls -R /data/delta/kafka-events/
drwxr-xr-x   - kontext supergroup          0 2022-08-27 10:53 /data/delta/kafka-events/_delta_log
-rw-r--r--   1 kontext supergroup       1307 2022-08-27 10:53 /data/delta/kafka-events/_delta_log/00000000000000000000.json
-rw-r--r--   1 kontext supergroup        785 2022-08-27 10:53 /data/delta/kafka-events/part-00000-e1e43969-8b93-46ea-9b1d-dee7ee0267f7-c000.snappy.parquet

As expected, a delta transaction log file and data file (parquet) is created. The delta transactional log JSON file has the following content:

{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"29874ceb-21a8-4e02-b5bf-7f994df6d641","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1661561591480}}
{"txn":{"appId":"6f94e31e-b4db-4ffb-ab26-7fe530651daa","version":0,"lastUpdated":1661561597419}}
{"add":{"path":"part-00000-e1e43969-8b93-46ea-9b1d-dee7ee0267f7-c000.snappy.parquet","partitionValues":{},"size":785,"modificationTime":1661561597124,"dataChange":true,"stats":"{\"numRecords\":7,\"minValues\":{\"value\":\"Kontext message 1\"},\"maxValues\":{\"value\":\"New kontext events~!!\"},\"nullCount\":{\"key\":7,\"value\":0}}"}}
{"commitInfo":{"timestamp":1661561597556,"operation":"STREAMING UPDATE","operationParameters":{"outputMode":"Append","queryId":"6f94e31e-b4db-4ffb-ab26-7fe530651daa","epochId":"0"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numRemovedFiles":"0","numOutputRows":"7","numOutputBytes":"785","numAddedFiles":"1"},"engineInfo":"Apache-Spark/3.3.0 Delta-Lake/2.1.0rc1","txnId":"39c3c540-2464-4e14-bfbc-a1b7c4a9e8bd"}}

We can read from the delta table using Spark Shell:

df = spark.read.format('delta').load("/data/delta/kafka-events")
df.show()

The output looks like the following:

+----+--------------------+
| key|               value|
+----+--------------------+
|null|   Kontext message 1|
|null|   Kontext message 2|
|null|   Kontext message 3|
|null|   Kontext message 4|
|null|   Kontext message 5|
|null|New kontext event...|
|null|         New events!|
+----+--------------------+

The above messages are populated as part of my previous tutorials without key specified (not a good practice).

Rerun the streaming application

Let's populate more messages into the Kafka topic using Kafka built-in shell:

$KAFKA_HOME/bin/kafka-console-producer.sh --topic kontext-events --bootstrap-server localhost:9092

2022082712933-image.png

Now run the application again:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,io.delta:delta-core_2.12:2.1.0rc1 spark-kafka-to-delta.py

By looking into the file system, we can find out that new delta transaction log file and parquet file are created:

$ hadoop fs -ls -R /data/delta/kafka-events/
drwxr-xr-x   - kontext supergroup          0 2022-08-27 11:30 /data/delta/kafka-events/_delta_log
-rw-r--r--   1 kontext supergroup       1307 2022-08-27 10:53 /data/delta/kafka-events/_delta_log/00000000000000000000.json
-rw-r--r--   1 kontext supergroup        885 2022-08-27 11:30 /data/delta/kafka-events/_delta_log/00000000000000000001.json
-rw-r--r--   1 kontext supergroup        760 2022-08-27 11:30 /data/delta/kafka-events/part-00000-bf433610-ffe9-4318-be5b-1d58f5ab0c46-c000.snappy.parquet
-rw-r--r--   1 kontext supergroup        785 2022-08-27 10:53 /data/delta/kafka-events/part-00000-e1e43969-8b93-46ea-9b1d-dee7ee0267f7-c000.snappy.parquet

The table content now looks like the following output with new rows added:

+----+--------------------+
| key|               value|
+----+--------------------+
|null|   Kontext message 1|
|null|   Kontext message 2|
|null|   Kontext message 3|
|null|   Kontext message 4|
|null|   Kontext message 5|
|null|New kontext event...|
|null|         New events!|
|null|       New messages!|
|null|   Hello Delta Lake!|
|null|Hello Spark Struc...|
+----+--------------------+

Read a stream of changes from delta table

Another streaming related cool feature for Delta Lake is that we can read a stream of changes form delta table. For example, when streaming ingestion process write into delta table, you can also do transformations and publish it as another stream and write into Kafka or other sinks.

The following example reads the delta table we created above as a stream and output the content to Console directly:

from pyspark.sql import SparkSession
from delta.tables import *
from pyspark.sql.functions import *

if __name__ == "__main__":
    app_name = "PySpark Delta Lake Read Table as Stream Example"
    master = "local"

    # Create Spark session with Delta extension

    builder = SparkSession.builder.appName(app_name) \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .master(master)

    spark = builder.getOrCreate()

    # Read from Delta Lake table as stream
    stream = spark.readStream.format("delta") \
        .load("/data/delta/kafka-events") \
        .writeStream.format("console") \
        .outputMode('append') \
        .start() \
        .awaitTermination()

Output:

-------------------------------------------
Batch: 0
-------------------------------------------
+----+--------------------+
| key|               value|
+----+--------------------+
|null|   Kontext message 1|
|null|   Kontext message 2|
|null|   Kontext message 3|
|null|   Kontext message 4|
|null|   Kontext message 5|
|null|New kontext event...|
|null|         New events!|
|null|       New messages!|
|null|   Hello Delta Lake!|
|null|Hello Spark Struc...|
+----+--------------------+

 org.apache.spark.sql.execution.streaming.MicroBatchExecution - Streaming query made progress: {
  "id" : "ac5a7bd0-2e6a-4801-bb6f-694668dea60f",
  "runId" : "d8b90c4d-927a-4e10-8365-1a34e6cb07a8",
  "name" : null,
  "timestamp" : "2022-08-27T01:51:10.484Z",
  "batchId" : 1,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 5,
    "triggerExecution" : 5
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[hdfs://localhost:9000/data/delta/kafka-events]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "29874ceb-21a8-4e02-b5bf-7f994df6d641",
      "reservoirVersion" : 1,
      "index" : 1,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "29874ceb-21a8-4e02-b5bf-7f994df6d641",
      "reservoirVersion" : 1,
      "index" : 1,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@16ff9f0d",
    "numOutputRows" : 0
  }
}

We can also learn from the output that Spark streaming is in fact macro batches. We can press Ctrl + C to cancel this streaming application.

Summary

I hope you now have a good understanding about the basics in Delta Lake through these two articles about Delta Lake I published recently. There are more features that are worthwhile to explore and I will publish more articles in future. Please stay tuned! If you do have any questions, feel free to make a comment and I will try my best to answer. 

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts