Spark Structured Streaming - Read from and Write into Kafka Topics
Spark structured streaming provides rich APIs to read from and write to Kafka topics. When reading from Kafka, Kafka sources can be created for both streaming and batch queries. When writing into Kafka, Kafka sinks can be created as destination for both streaming and batch queries too.
*Logos are registered trademarks of Apache Software Foundation.
Prerequisites
Spark and Kafka environment
To run the sample code in this article, Spark and Kafka environment are required. You can follow these guides to setup them if you don't have existing ones to work with.
Spark SQL Kafka library
Spark SQL for Kafka is not built into Spark binary distribution. Thus you need to ensure the following jar package is included into Spark lib search path or passed when you submit Spark applications.
groupId = org.apache.spark artifactId = spark-sql-kafka-0-10_2.12 version = 3.0.0
You can download the JAR file from Maven repository: spark-sql-kafka-0-10_2.12/3.0.0.
An easier way is to use --packages option when deploy application:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 spark-kafka.py
Spark will automatically download the artifact and also its dependencies automatically from local repositories or configured remote repositories.
Use Kafka source for batch queries
To create Kafka source for batch queries, we can simply specify read format as 'kafka'.
Subscribe to one topic
The following code snippet reads events data from topic 'kontext-kafka'; it also converts key and value columns to STRING data type from binary. Based on the key and value serializers when those topics were produced, the binary data needs to be decoded accordingly.
from pyspark.sql import SparkSession appName = "Kafka Examples" master = "local" spark = SparkSession.builder \ .master(master) \ .appName(appName) \ .getOrCreate() kafka_servers = "localhost:9092" df = spark \ .read \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_servers) \ .option("subscribe", "kontext-kafka") \ .load() df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop( 'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value') df.show(5)
Sample output:
+-------------+---------+------+--------------------+-------------+-------+--------------------+ | topic|partition|offset| timestamp|timestampType|key_str| value_str| +-------------+---------+------+--------------------+-------------+-------+--------------------+ |kontext-kafka| 0| 0|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 0| |kontext-kafka| 0| 1|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 1| |kontext-kafka| 0| 2|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 2| |kontext-kafka| 0| 3|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 3| |kontext-kafka| 0| 4|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 4| +-------------+---------+------+--------------------+-------------+-------+--------------------+ only showing top 5 rows
Subscribe to two topics with specified offsets
The following code snippets subscribe to two topics named 'kontext-kafka' and 'kontext-events'. It also specifies the following offsets:
- Topic kontext-kafka: reading from partition 2 for events starting with offeset 98; partion 0 and 1 for all events.
- Topic kontext-events: reading from partition 0 for all the events.
df = spark \ .read \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_servers) \ .option("subscribe", "kontext-kafka,kontext-events") \ .option("startingOffsets", """{"kontext-kafka":{"2":98,"0":0,"1":0},"kontext-events":{"0":-2}}""") \ .option("endingOffsets", """{"kontext-kafka":{"2":-1,"0":-1,"1":-1},"kontext-events":{"0":-1}}""") \ .load() df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop( 'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value') df.show(5)
Sample output:
+-------------+---------+------+--------------------+-------------+-------+--------------------+ | topic|partition|offset| timestamp|timestampType|key_str| value_str| +-------------+---------+------+--------------------+-------------+-------+--------------------+ |kontext-kafka| 2| 98|2020-09-06 14:08:...| 0| cba|Kontext kafka msg...| |kontext-kafka| 2| 99|2020-09-06 14:08:...| 0| cba|Kontext kafka msg...| |kontext-kafka| 0| 0|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 0| |kontext-kafka| 0| 1|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 1| |kontext-kafka| 0| 2|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 2| +-------------+---------+------+--------------------+-------------+-------+--------------------+
Subscribe to a pattern
The following example subscribe to all topics that match with regular expression 'kontext.*'. It reads all events in all partitions.
df = spark \ .read \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_servers) \ .option("subscribePattern", "kontext.*") \ .option("startingOffsets", "earliest") \ .option("endingOffsets", "latest") \ .load() df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop( 'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value') df.show(5)
Option subscribePattern is used to match against topic names.
Use Kafka source for streaming queries
To read from Kafka for streaming queries, we can use function SparkSession.readStream. Kafka server addresses and topic names are required. Spark can subscribe to one or more topics and wildcards can be used to match with multiple topic names similarly as the batch query example provided above.
df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_servers) \ .option("subscribe", "kontext-kafka") \ .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start()
We will use this data frame to write into another different topic.
Write Spark data frame to Kafka topic
To write a Spark data frame to Kafka topic, the following columns are required:
Column | Type |
key (optional) | string or binary |
value (required) | string or binary |
headers (*optional) | array |
topic (*optional) | string |
partition (*optional) | int |
Kafka supports only at least once write semantics, thus it is impossible to avoid duplicates when writing messages. One approach to resolve this issues is to add some unique key to your messages and then deduplicate when reading from Kafka.
Write using streaming query output
For streaming sourced data frame, we can directly use DataFrame.writeStream function to write into a Kafka topic.
Example code:
df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_servers) \ .option("subscribe", "kontext-kafka") \ .option("startingOffsets", "earliest") \ .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_servers) \ .option("topic", "kontext-kafka-3") \ .option("checkpointLocation", "file:///F:/tmp/kontext-kafka/checkpoint") \ .start() spark.streams.awaitAnyTermination()
Write using batch query output
Similarly, we can use save batch query output to a Kafka topic using DataFrame.save function with format 'kafka'.
Example code:
df = spark \ .read \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_servers) \ .option("subscribePattern", "kontext.*") \ .option("startingOffsets", "earliest") \ .option("endingOffsets", "latest") \ .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ .write \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_servers) \ .option("topic", "kontext-kafka-5") \ .save()
The above code snippet writes the batch query result of all topics that has a name starts with 'kontext' to a new topic named 'kontext-kafka-5'.
Summary
With Spark structured streaming APIs, it is very easy to read from Kafka topics and do some transformation/aggregations and then write the result into HDFS, database or another Kafka topic. Spark acts as both consumer and producer in the above examples.