Raymond Raymond

Spark Structured Streaming - Read from and Write into Kafka Topics

event 2020-09-06 visibility 21,703 comment 0 insights toc
more_vert
insights Stats
Spark Structured Streaming - Read from and Write into Kafka Topics

Spark structured streaming provides rich APIs to read from and write to Kafka topics. When reading from Kafka, Kafka sources can be created for both streaming and batch queries. When writing into Kafka, Kafka sinks can be created as destination for both streaming and batch queries too. 

2020090662454-kafka-spark.png

*Logos are registered trademarks of Apache Software Foundation.  

Prerequisites

Spark and Kafka environment

To run the sample code in this article, Spark and Kafka environment are required. You can follow these guides to setup them if you don't have existing ones to work with.

Spark SQL Kafka library

Spark SQL for Kafka is not built into Spark binary distribution. Thus you need to ensure the following jar package is included into Spark lib search path or passed when you submit Spark applications. 

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.0.0

You can download the JAR file from Maven repository: spark-sql-kafka-0-10_2.12/3.0.0.

An easier way is to use --packages option when deploy application:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0  spark-kafka.py

Spark will automatically download the artifact and also its dependencies automatically from local repositories or configured remote repositories. 

Use Kafka source for batch queries

To create Kafka source for batch queries, we can simply specify read format as 'kafka'. 

Subscribe to one topic

The following code snippet reads events data from topic 'kontext-kafka'; it also converts key and value columns to STRING data type from binary. Based on the key and value serializers when those topics were produced, the binary data needs to be decoded accordingly. 

from pyspark.sql import SparkSession

appName = "Kafka Examples"
master = "local"

spark = SparkSession.builder \
    .master(master) \
    .appName(appName) \
    .getOrCreate()

kafka_servers = "localhost:9092"

df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribe", "kontext-kafka") \
    .load()
df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop(
    'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value')
df.show(5)

Sample output:

+-------------+---------+------+--------------------+-------------+-------+--------------------+
|        topic|partition|offset|           timestamp|timestampType|key_str|           value_str|
+-------------+---------+------+--------------------+-------------+-------+--------------------+
|kontext-kafka|        0|     0|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 0|
|kontext-kafka|        0|     1|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 1|
|kontext-kafka|        0|     2|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 2|
|kontext-kafka|        0|     3|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 3|
|kontext-kafka|        0|     4|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 4|
+-------------+---------+------+--------------------+-------------+-------+--------------------+
only showing top 5 rows

Subscribe to two topics with specified offsets

The following code snippets subscribe to two topics named 'kontext-kafka' and 'kontext-events'. It also specifies the following offsets:

  • Topic kontext-kafka: reading from partition 2 for events starting with offeset 98; partion 0 and 1 for all events. 
  • Topic kontext-events: reading from partition 0 for all the events. 
infoIn the offset json, -2 represents an offset can be used to refer to earliest and -1 to latest. 
df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribe", "kontext-kafka,kontext-events") \
    .option("startingOffsets", """{"kontext-kafka":{"2":98,"0":0,"1":0},"kontext-events":{"0":-2}}""") \
    .option("endingOffsets", """{"kontext-kafka":{"2":-1,"0":-1,"1":-1},"kontext-events":{"0":-1}}""") \
    .load()
df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop(
    'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value')
df.show(5)

Sample output:

+-------------+---------+------+--------------------+-------------+-------+--------------------+
|        topic|partition|offset|           timestamp|timestampType|key_str|           value_str|
+-------------+---------+------+--------------------+-------------+-------+--------------------+
|kontext-kafka|        2|    98|2020-09-06 14:08:...|            0|    cba|Kontext kafka msg...|
|kontext-kafka|        2|    99|2020-09-06 14:08:...|            0|    cba|Kontext kafka msg...|
|kontext-kafka|        0|     0|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 0|
|kontext-kafka|        0|     1|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 1|
|kontext-kafka|        0|     2|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 2|
+-------------+---------+------+--------------------+-------------+-------+--------------------+

Subscribe to a pattern

The following example subscribe to all topics that match with regular expression 'kontext.*'. It reads all events in all partitions. 

df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribePattern", "kontext.*") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()
df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop(
    'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value')
df.show(5)

Option subscribePattern is used to match against topic names. 

Use Kafka source for streaming queries

To read from Kafka for streaming queries, we can use function SparkSession.readStream. Kafka server addresses and topic names are required.  Spark can subscribe to one or more topics and wildcards can be used to match with multiple topic names similarly as the batch query example provided above. 

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribe", "kontext-kafka") \
    .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
warning We cannot directly show the above data frame as it uses a streaming source. The following exception will occur if you call df.show():
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start()

We will use this data frame to write into another different topic. 

Write Spark data frame to Kafka topic

To write a Spark data frame to Kafka topic, the following columns are required:

ColumnType
key (optional)
string or binary
value (required)
string or binary
headers (*optional)
array
topic (*optional)
string
partition (*optional)
int


Kafka supports only at least once write semantics, thus it is impossible to avoid duplicates when writing messages. One approach to resolve this issues is to add some unique key to your messages and then deduplicate when reading from Kafka.

Write using streaming query output

For streaming sourced data frame, we can directly use DataFrame.writeStream function to write into a Kafka topic.

Example code:

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribe", "kontext-kafka") \
    .option("startingOffsets", "earliest") \
    .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("topic", "kontext-kafka-3") \
    .option("checkpointLocation", "file:///F:/tmp/kontext-kafka/checkpoint") \
    .start()

spark.streams.awaitAnyTermination()
The above code snippet read stream data from topic 'kontext-kafka' and then writes into another topic named 'kontext-kafka-3'.

warning To deploy Spark application to YARN, please change checkpointLocation to a HDFS path instead of local file path. 

Write using batch query output

Similarly, we can use save batch query output to a Kafka topic using DataFrame.save function with format 'kafka'.

Example code:

df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribePattern", "kontext.*") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_servers) \
  .option("topic", "kontext-kafka-5") \
  .save()

The above code snippet writes the batch query result of all topics that has a name starts with 'kontext' to a new topic named 'kontext-kafka-5'.

Summary

With Spark structured streaming APIs, it is very easy to read from Kafka topics and do some transformation/aggregations and then write the result into HDFS, database or another Kafka topic. Spark acts as both consumer and producer in the above examples.

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts