Spark Structured Streaming - Read from and Write into Kafka Topics

access_time 18 days ago visibility95 comment 0

Spark structured streaming provides rich APIs to read from and write to Kafka topics. When reading from Kafka, Kafka sources can be created for both streaming and batch queries. When writing into Kafka, Kafka sinks can be created as destination for both streaming and batch queries too. 

*Logos are registered trademarks of Apache Software Foundation.  

Prerequisites

Spark and Kafka environment

To run the sample code in this article, Spark and Kafka environment are required. You can follow these guides to setup them if you don't have existing ones to work with.

Spark SQL Kafka library

Spark SQL for Kafka is not built into Spark binary distribution. Thus you need to ensure the following jar package is included into Spark lib search path or passed when you submit Spark applications. 

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.0.0

You can download the JAR file from Maven repository: spark-sql-kafka-0-10_2.12/3.0.0.

An easier way is to use --packages option when deploy application:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0  spark-kafka.py

Spark will automatically download the artifact and also its dependencies automatically from local repositories or configured remote repositories. 

Use Kafka source for batch queries

To create Kafka source for batch queries, we can simply specify read format as 'kafka'. 

Subscribe to one topic

The following code snippet reads events data from topic 'kontext-kafka'; it also converts key and value columns to STRING data type from binary. Based on the key and value serializers when those topics were produced, the binary data needs to be decoded accordingly. 

from pyspark.sql import SparkSession

appName = "Kafka Examples"
master = "local"

spark = SparkSession.builder \
    .master(master) \
    .appName(appName) \
    .getOrCreate()

kafka_servers = "localhost:9092"

df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribe", "kontext-kafka") \
    .load()
df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop(
    'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value')
df.show(5)

Sample output:

+-------------+---------+------+--------------------+-------------+-------+--------------------+
|        topic|partition|offset|           timestamp|timestampType|key_str|           value_str|
+-------------+---------+------+--------------------+-------------+-------+--------------------+
|kontext-kafka|        0|     0|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 0|
|kontext-kafka|        0|     1|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 1|
|kontext-kafka|        0|     2|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 2|
|kontext-kafka|        0|     3|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 3|
|kontext-kafka|        0|     4|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 4|
+-------------+---------+------+--------------------+-------------+-------+--------------------+
only showing top 5 rows

Subscribe to two topics with specified offsets

The following code snippets subscribe to two topics named 'kontext-kafka' and 'kontext-events'. It also specifies the following offsets:

  • Topic kontext-kafka: reading from partition 2 for events starting with offeset 98; partion 0 and 1 for all events. 
  • Topic kontext-events: reading from partition 0 for all the events. 
infoIn the offset json, -2 represents an offset can be used to refer to earliest and -1 to latest. 
df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribe", "kontext-kafka,kontext-events") \
    .option("startingOffsets", """{"kontext-kafka":{"2":98,"0":0,"1":0},"kontext-events":{"0":-2}}""") \
    .option("endingOffsets", """{"kontext-kafka":{"2":-1,"0":-1,"1":-1},"kontext-events":{"0":-1}}""") \
    .load()
df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop(
    'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value')
df.show(5)

Sample output:

+-------------+---------+------+--------------------+-------------+-------+--------------------+
|        topic|partition|offset|           timestamp|timestampType|key_str|           value_str|
+-------------+---------+------+--------------------+-------------+-------+--------------------+
|kontext-kafka|        2|    98|2020-09-06 14:08:...|            0|    cba|Kontext kafka msg...|
|kontext-kafka|        2|    99|2020-09-06 14:08:...|            0|    cba|Kontext kafka msg...|
|kontext-kafka|        0|     0|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 0|
|kontext-kafka|        0|     1|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 1|
|kontext-kafka|        0|     2|2020-09-05 17:35:...|            0|   null|Kontext kafka msg: 2|
+-------------+---------+------+--------------------+-------------+-------+--------------------+

Subscribe to a pattern

The following example subscribe to all topics that match with regular expression 'kontext.*'. It reads all events in all partitions. 

df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribePattern", "kontext.*") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()
df = df.withColumn('key_str', df['key'].cast('string').alias('key_str')).drop(
    'key').withColumn('value_str', df['value'].cast('string').alias('key_str')).drop('value')
df.show(5)

Option subscribePattern is used to match against topic names. 

Use Kafka source for streaming queries

´╗┐To read from Kafka for streaming queries, we can use function SparkSession.readStream. Kafka server addresses and topic names are required.  Spark can subscribe to one or more topics and wildcards can be used to match with multiple topic names similarly as the batch query example provided above. 

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribe", "kontext-kafka") \
    .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
warning We cannot directly show the above data frame as it uses a streaming source. The following exception will occur if you call df.show():
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start()

We will use this data frame to write into another different topic. 

Write Spark data frame to Kafka topic

To write a Spark data frame to Kafka topic, the following columns are required:

ColumnType
key (optional)
string or binary
value (required)
string or binary
headers (*optional)
array
topic (*optional)
string
partition (*optional)
int


Kafka supports only at least once write semantics, thus it is impossible to avoid duplicates when writing messages. One approach to resolve this issues is to add some unique key to your messages and then deduplicate when reading from Kafka.

Write using streaming query output

For streaming sourced data frame, we can directly use DataFrame.writeStream function to write into a Kafka topic.

Example code:

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribe", "kontext-kafka") \
    .option("startingOffsets", "earliest") \
    .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("topic", "kontext-kafka-3") \
    .option("checkpointLocation", "file:///F:/tmp/kontext-kafka/checkpoint") \
    .start()

spark.streams.awaitAnyTermination()
The above code snippet read stream data from topic 'kontext-kafka' and then writes into another topic named 'kontext-kafka-3'.

warning To deploy Spark application to YARN, please change checkpointLocation to a HDFS path instead of local file path. 

Write using batch query output

Similarly, we can use save batch query output to a Kafka topic using DataFrame.save function with format 'kafka'.

Example code:

df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_servers) \
    .option("subscribePattern", "kontext.*") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_servers) \
  .option("topic", "kontext-kafka-5") \
  .save()

The above code snippet writes the batch query result of all topics that has a name starts with 'kontext' to a new topic named 'kontext-kafka-5'.

Summary

With Spark structured streaming APIs, it is very easy to read from Kafka topics and do some transformation/aggregations and then write the result into HDFS, database or another Kafka topic. Spark acts as both consumer and producer in the above examples.

info Last modified by Raymond at 18 days ago copyright This page is subject to Site terms.
Like this article?
Share on

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

Kontext Column

Created for everyone to publish data, programming and cloud related articles.
Follow three steps to create your columns.


Learn more arrow_forward

More from Kontext

local_offer Azure local_offer python local_offer spark local_offer pyspark

visibility 6385
thumb_up 1
access_time 2 years ago

The page summarizes the steps required to run and debug PySpark (Spark for Python) in Visual Studio Code. Install Python from the official website: https://www.python.org/downloads/ . The version I am using is 3.6.4 32-bit. Pip is shipped together in this version. Download Spark 2.3.3 from ...

local_offer kafka local_offer shell

visibility 10
thumb_up 0
access_time 17 days ago

This page summarizes commonly used Apache Kafka Windows commands.  Setup an environment variable named KAFKA_HOME that points to where Kafka is located.  Example: SET KAFKA_HOME=F:\big-data\kafka_2.13-2.6.0 warning  Remember to change the server address, port number and Kafka ...

local_offer spark local_offer pyspark local_offer hive local_offer spark-database-connect

visibility 500
thumb_up 0
access_time 2 years ago

Form Spark 2.0, you can use Spark session builder to enable Hive support directly. The following example (Python) shows how to implement it. from pyspark.sql import SparkSession appName = "PySpark Hive Example" master = "local" # Create Spark session with Hive supported. spark = ...

About column

Streaming analytics related tutorials and ideas.

rss_feed Subscribe RSS