Spark Structured Streaming - Read from and Write into Kafka Topics

access_time 3 months ago visibility896 comment 0

Spark structured streaming provides rich APIs to read from and write to Kafka topics. When reading from Kafka, Kafka sources can be created for both streaming and batch queries. When writing into Kafka, Kafka sinks can be created as destination for both streaming and batch queries too. 

*Logos are registered trademarks of Apache Software Foundation.  

Prerequisites

Spark and Kafka environment

To run the sample code in this article, Spark and Kafka environment are required. You can follow these guides to setup them if you don't have existing ones to work with.

Spark SQL Kafka library

Spark SQL for Kafka is not built into Spark binary distribution. Thus you need to ensure the following jar package is included into Spark lib search path or passed when you submit Spark applications. 

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.0.0

You can download the JAR file from Maven repository: spark-sql-kafka-0-10_2.12/3.0.0.

An easier way is to use --packages option when deploy application:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0  spark-kafka.py

Spark will automatically download the artifact and also its dependencies automatically from local repositories or configured remote repositories. 

Use Kafka source for batch queries

To create Kafka source for batch queries, we can simply specify read format as 'kafka'. 

Subscribe to one topic

The following code snippet reads events data from topic 'kontext-kafka'; it also converts key and value columns to STRING data type from binary. Based on the key and value serializers when those topics were produced, the binary data needs to be decoded accordingly. 

from pyspark.sql import SparkSession

appName = "Kafka Examples"
master = "local"

spark = SparkSession.builder \
    .master(master) \
    .appName(appName) \
    .getOrCreate()

kafka_servers = "localhost:9092"

df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribe", "kontext-kafka") \
    .load()
df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop(
    'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value')
df.show(5)

Sample output:

+-------------+---------+------+--------------------+-------------+-------+--------------------+
|        topic|partition|offset|           timestamp|timestampType|key_str|           value_str|
+-------------+---------+------+--------------------+-------------+-------+--------------------+
|kontext-kafka|        0|     0|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 0|
|kontext-kafka|        0|     1|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 1|
|kontext-kafka|        0|     2|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 2|
|kontext-kafka|        0|     3|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 3|
|kontext-kafka|        0|     4|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 4|
+-------------+---------+------+--------------------+-------------+-------+--------------------+
only showing top 5 rows

Subscribe to two topics with specified offsets

The following code snippets subscribe to two topics named 'kontext-kafka' and 'kontext-events'. It also specifies the following offsets:

  • Topic kontext-kafka: reading from partition 2 for events starting with offeset 98; partion 0 and 1 for all events. 
  • Topic kontext-events: reading from partition 0 for all the events. 
infoIn the offset json, -2 represents an offset can be used to refer to earliest and -1 to latest. 
df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribe", "kontext-kafka,kontext-events") \
    .option("startingOffsets", """{"kontext-kafka":{"2":98,"0":0,"1":0},"kontext-events":{"0":-2}}""") \
    .option("endingOffsets", """{"kontext-kafka":{"2":-1,"0":-1,"1":-1},"kontext-events":{"0":-1}}""") \
    .load()
df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop(
    'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value')
df.show(5)

Sample output:

+-------------+---------+------+--------------------+-------------+-------+--------------------+
|        topic|partition|offset|           timestamp|timestampType|key_str|           value_str|
+-------------+---------+------+--------------------+-------------+-------+--------------------+
|kontext-kafka|        2|    98|2020-09-06 14:08:...|            0|    cba|Kontext kafka msg...|
|kontext-kafka|        2|    99|2020-09-06 14:08:...|            0|    cba|Kontext kafka msg...|
|kontext-kafka|        0|     0|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 0|
|kontext-kafka|        0|     1|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 1|
|kontext-kafka|        0|     2|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 2|
+-------------+---------+------+--------------------+-------------+-------+--------------------+

Subscribe to a pattern

The following example subscribe to all topics that match with regular expression 'kontext.*'. It reads all events in all partitions. 

df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribePattern", "kontext.*") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()
df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop(
    'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value')
df.show(5)

Option subscribePattern is used to match against topic names. 

Use Kafka source for streaming queries

´╗┐To read from Kafka for streaming queries, we can use function SparkSession.readStream. Kafka server addresses and topic names are required.  Spark can subscribe to one or more topics and wildcards can be used to match with multiple topic names similarly as the batch query example provided above. 

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribe", "kontext-kafka") \
    .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
warning We cannot directly show the above data frame as it uses a streaming source. The following exception will occur if you call df.show():
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start()

We will use this data frame to write into another different topic. 

Write Spark data frame to Kafka topic

To write a Spark data frame to Kafka topic, the following columns are required:

ColumnType
key (optional)
string or binary
value (required)
string or binary
headers (*optional)
array
topic (*optional)
string
partition (*optional)
int


Kafka supports only at least once write semantics, thus it is impossible to avoid duplicates when writing messages. One approach to resolve this issues is to add some unique key to your messages and then deduplicate when reading from Kafka.

Write using streaming query output

For streaming sourced data frame, we can directly use DataFrame.writeStream function to write into a Kafka topic.

Example code:

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribe", "kontext-kafka") \
    .option("startingOffsets", "earliest") \
    .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("topic", "kontext-kafka-3") \
    .option("checkpointLocation", "file:///F:/tmp/kontext-kafka/checkpoint") \
    .start()

spark.streams.awaitAnyTermination()
The above code snippet read stream data from topic 'kontext-kafka' and then writes into another topic named 'kontext-kafka-3'.

warning To deploy Spark application to YARN, please change checkpointLocation to a HDFS path instead of local file path. 

Write using batch query output

Similarly, we can use save batch query output to a Kafka topic using DataFrame.save function with format 'kafka'.

Example code:

df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribePattern", "kontext.*") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_servers) \
  .option("topic", "kontext-kafka-5") \
  .save()

The above code snippet writes the batch query result of all topics that has a name starts with 'kontext' to a new topic named 'kontext-kafka-5'.

Summary

With Spark structured streaming APIs, it is very easy to read from Kafka topics and do some transformation/aggregations and then write the result into HDFS, database or another Kafka topic. Spark acts as both consumer and producer in the above examples.

info Last modified by Raymond at 3 months ago copyright This page is subject to Site terms.
Like this article?
Share on

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

Want to publish your article on Kontext?

Learn more

Kontext Column

Created for everyone to publish data, programming and cloud related articles.
Follow three steps to create your columns.


Learn more arrow_forward

More from Kontext

local_offer spark local_offer pyspark local_offer how-to local_offer tutorial local_offer spark-dataframe

visibility 4453
thumb_up 1
access_time 4 months ago

This article shows you how to filter NULL/None values from a Spark data frame using Python. Function DataFrame.filter or DataFrame.where can be used to filter out null values.

local_offer pyspark local_offer spark local_offer spark-file-operations

visibility 1941
thumb_up 0
access_time 4 months ago

CSV is a commonly used data format. Spark provides rich APIs to load files from HDFS as data frame.  This page provides examples about how to load CSV from HDFS using Spark. If you want to read a local CSV file in Python, refer to this page  Python: Load / Read Multiline CSV File   ...

local_offer spark

visibility 83
thumb_up 0
access_time 2 months ago

Recently, one of my colleague asked me one question about Spark: for the same SQL statement on finding max value of partition column, different values are returned in Spark SQL and Hive/Impala SQL. The SQL statement looks like the following: SELECT MAX(PART_COL) FROM HiveDb.TestSQL; ...

About column

Streaming analytics related tutorials and ideas.

rss_feed Subscribe RSS