Spark structured streaming provides rich APIs to read from and write to Kafka topics. When reading from Kafka, Kafka sources can be created for both streaming and batch queries. When writing into Kafka, Kafka sinks can be created as destination for both streaming and batch queries too.
*Logos are registered trademarks of Apache Software Foundation.
Prerequisites
Spark and Kafka environment
To run the sample code in this article, Spark and Kafka environment are required. You can follow these guides to setup them if you don't have existing ones to work with.
Spark SQL Kafka library
Spark SQL for Kafka is not built into Spark binary distribution. Thus you need to ensure the following jar package is included into Spark lib search path or passed when you submit Spark applications.
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.0.0
You can download the JAR file from Maven repository: spark-sql-kafka-0-10_2.12/3.0.0.
An easier way is to use --packages option when deploy application:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 spark-kafka.py
Spark will automatically download the artifact and also its dependencies automatically from local repositories or configured remote repositories.
Use Kafka source for batch queries
To create Kafka source for batch queries, we can simply specify read format as 'kafka'.
Subscribe to one topic
The following code snippet reads events data from topic 'kontext-kafka'; it also converts key and value columns to STRING data type from binary. Based on the key and value serializers when those topics were produced, the binary data needs to be decoded accordingly.
from pyspark.sql import SparkSession
appName = "Kafka Examples"
master = "local"
spark = SparkSession.builder \
.master(master) \
.appName(appName) \
.getOrCreate()
kafka_servers = "localhost:9092"
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_servers) \
.option("subscribe", "kontext-kafka") \
.load()
df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop(
'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value')
df.show(5)
Sample output:
+-------------+---------+------+--------------------+-------------+-------+--------------------+
| topic|partition|offset| timestamp|timestampType|key_str| value_str|
+-------------+---------+------+--------------------+-------------+-------+--------------------+
|kontext-kafka| 0| 0|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 0|
|kontext-kafka| 0| 1|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 1|
|kontext-kafka| 0| 2|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 2|
|kontext-kafka| 0| 3|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 3|
|kontext-kafka| 0| 4|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 4|
+-------------+---------+------+--------------------+-------------+-------+--------------------+
only showing top 5 rows
Subscribe to two topics with specified offsets
The following code snippets subscribe to two topics named 'kontext-kafka' and 'kontext-events'. It also specifies the following offsets:
- Topic kontext-kafka: reading from partition 2 for events starting with offeset 98; partion 0 and 1 for all events.
- Topic kontext-events: reading from partition 0 for all the events.
infoIn the offset json, -2 represents an offset can be used to refer to earliest and -1 to latest.
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_servers) \
.option("subscribe", "kontext-kafka,kontext-events") \
.option("startingOffsets", """{"kontext-kafka":{"2":98,"0":0,"1":0},"kontext-events":{"0":-2}}""") \
.option("endingOffsets", """{"kontext-kafka":{"2":-1,"0":-1,"1":-1},"kontext-events":{"0":-1}}""") \
.load()
df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop(
'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value')
df.show(5)
Sample output:
+-------------+---------+------+--------------------+-------------+-------+--------------------+
| topic|partition|offset| timestamp|timestampType|key_str| value_str|
+-------------+---------+------+--------------------+-------------+-------+--------------------+
|kontext-kafka| 2| 98|2020-09-06 14:08:...| 0| cba|Kontext kafka msg...|
|kontext-kafka| 2| 99|2020-09-06 14:08:...| 0| cba|Kontext kafka msg...|
|kontext-kafka| 0| 0|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 0|
|kontext-kafka| 0| 1|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 1|
|kontext-kafka| 0| 2|2020-09-05 17:35:...| 0| null|Kontext kafka msg: 2|
+-------------+---------+------+--------------------+-------------+-------+--------------------+
Subscribe to a pattern
The following example subscribe to all topics that match with regular expression 'kontext.*'. It reads all events in all partitions.
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_servers) \
.option("subscribePattern", "kontext.*") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop(
'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value')
df.show(5)
Option subscribePatternis used to match against topic names.
Use Kafka source for streaming queries
To read from Kafka for streaming queries, we can use function SparkSession.readStream. Kafka server addresses and topic names are required. Spark can subscribe to one or more topics and wildcards can be used to match with multiple topic names similarly as the batch query example provided above.
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_servers) \
.option("subscribe", "kontext-kafka") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
warning We cannot directly show the above data frame as it uses a streaming source. The following exception will occur if you call df.show(): pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start()
We will use this data frame to write into another different topic.
Write Spark data frame to Kafka topic
To write a Spark data frame to Kafka topic, the following columns are required:
Column | Type |
---|---|
key (optional) | string or binary |
value (required) | string or binary |
headers (*optional) | array |
topic (*optional) | string |
partition (*optional) | int |
Kafka supports only at least once write semantics, thus it is impossible to avoid duplicates when writing messages. One approach to resolve this issues is to add some unique key to your messages and then deduplicate when reading from Kafka.
Write using streaming query output
For streaming sourced data frame, we can directly use DataFrame.writeStreamfunction to write into a Kafka topic.
Example code:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_servers) \
.option("subscribe", "kontext-kafka") \
.option("startingOffsets", "earliest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_servers) \
.option("topic", "kontext-kafka-3") \
.option("checkpointLocation", "file:///F:/tmp/kontext-kafka/checkpoint") \
.start()
spark.streams.awaitAnyTermination()
The above code snippet read stream data from topic 'kontext-kafka' and then writes into another topic named 'kontext-kafka-3'.
warning To deploy Spark application to YARN, please change checkpointLocation to a HDFS path instead of local file path.
Write using batch query output
Similarly, we can use save batch query output to a Kafka topic using DataFrame.save function with format 'kafka'.
Example code:
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_servers) \
.option("subscribePattern", "kontext.*") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_servers) \
.option("topic", "kontext-kafka-5") \
.save()
The above code snippet writes the batch query result of all topics that has a name starts with 'kontext' to a new topic named 'kontext-kafka-5'.
Summary
With Spark structured streaming APIs, it is very easy to read from Kafka topics and do some transformation/aggregations and then write the result into HDFS, database or another Kafka topic. Spark acts as both consumer and producer in the above examples.