Apache Kafka Python Producer and Consumer Clients Introduction

access_time 3 months ago visibility395 comment 0

Apache Kafka is written with Scala. Thus, the most natural way is to use Scala (or Java) to call Kafka APIs, for example, Consumer APIs and Producer APIs. For Python developers, there are open source packages available that function similar as official Java clients. 

This article shows you how to use kafka-python package to consume events in Kafka topics and also to generate events. 

Kafka brokers

The code snippets in the following examples requires Kafka brokers. If you don't have a Kafka system to work with, try to configure one on Windows 10 following this guide:

Install and Run Kafka 2.6.0 On Windows 10

Python 2 or 3 is required to run the sample code. In my system, Python version is 3.8.2 (x64 bit).

python --version
Python 3.8.2

Install package

Run the following command to install the latest package:

pip install kafka-python

KafkaProducer

KafkaProducer class is used to generate asynchronous messages.

The following example script producer.py send 100 messages to topic named kontext-kafka. At the end, it also shows the metrics of the producer. 

from kafka import KafkaProducer

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

# Generate 100 messages
for _ in range(100):
    msg = f'Kontext kafka msg: {_}'
    future = producer.send(topic, msg.encode('utf-8'))
    print(f'Sending msg: {msg}')
    result = future.get(timeout=60)

metrics = producer.metrics()
print(metrics)

The output looks like the following:

Sending msg: Kontext kafka msg: 0
Sending msg: Kontext kafka msg: 1
Sending msg: Kontext kafka msg: 2
......
Sending msg: Kontext kafka msg: 97
Sending msg: Kontext kafka msg: 98
Sending msg: Kontext kafka msg: 99
{'kafka-metrics-count': {'count': 56.0}, 'producer-metrics': {'connection-close-rate': 0.03261752621864605, 'connection-creation-rate': 0.06465134053710159, 'select-rate': 10.315461901921683, 'io-wait-time-ns-avg': 2256964.73751428, 'io-wait-ratio': 0.023281633763809227, 'io-time-ns-avg': 159617.4899886989, 'io-ratio': 0.001646528136858789, 'connection-count': 1.0, 'network-io-rate': 6.853042096932768, 'outgoing-byte-rate': 542.3277700954767, 'request-rate': 3.426521048466384, 'request-size-avg': 158.27358490566039, 'request-size-max': 165.0, 'incoming-byte-rate': 301.319486684433, 'response-rate': 3.4380910213724327, 'request-latency-avg': 3.048419952392578, 'request-latency-max': 107.99527168273926, 'bufferpool-wait-ratio': 0.0, 'batch-size-avg': 88.9, 'batch-size-max': 89.0, 'compression-rate-avg': 1.0, 'record-queue-time-avg': 0.0003571176528930664, 'record-queue-time-max': 0.0011472702026367188, 'produce-throttle-time-avg': 0.0, 'produce-throttle-time-max': 0.0, 'record-send-rate': 3.2970790407245127, 'records-per-request-avg': 1.0, 'byte-rate': 293.11032672040915, 'record-retry-rate': 0.0, 'record-error-rate': 0.0, 'record-size-max': 28.0, 'record-size-avg': 27.9, 'requests-in-flight': 0.0, 'metadata-age': 0.33087109375}, 'producer-node-metrics.node-bootstrap-0': {'outgoing-byte-rate': 3.8467547619575444, 'request-rate': 0.09697701080565238, 'request-size-avg': 39.666666666666664, 'request-size-max': 41.0, 'incoming-byte-rate': 109.37021626479098, 'response-rate': 0.09730446286903112, 'request-latency-avg': 36.14187240600586, 'request-latency-max': 104.10404205322266}, 'producer-node-metrics.node-0': {'outgoing-byte-rate': 543.3427517502058, 'request-rate': 3.3596052005205426, 'request-size-avg': 161.72815533980582, 'request-size-max': 165.0, 'incoming-byte-rate': 193.70656448901786, 'response-rate': 3.3713714333168032, 'request-latency-avg': 2.0845329877242302, 'request-latency-max': 107.99527168273926}, 'producer-topic-metrics.kontext-kafka': {'record-send-rate': 3.2970790407245127, 'byte-rate': 293.11032672040915, 'compression-rate': 1.0, 'record-retry-rate': 0.0, 'record-error-rate': 0.0}}

KafkaConsumer

KafkaConsumer class is used to retrieve topic events from Kafka topic. 

The following sample script consumer.py reads from the topic created by the above producer. A

from kafka import KafkaConsumer

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
consumer = KafkaConsumer(
    topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest')
for msg in consumer:
    print(msg)

Option auto_offset_reset='earliest' is added to retrieve events from the very beginning.

The output looks like the following:

ConsumerRecord(topic='kontext-kafka', partition=0, offset=0, timestamp=1599291349190, timestamp_type=0, key=None, value=b'Kontext kafka msg: 0', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=20, serialized_header_size=-1)
ConsumerRecord(topic='kontext-kafka', partition=0, offset=1, timestamp=1599291349194, timestamp_type=0, key=None, value=b'Kontext kafka msg: 1', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=20, serialized_header_size=-1)
......
ConsumerRecord(topic='kontext-kafka', partition=0, offset=98, timestamp=1599291349511, timestamp_type=0, key=None, value=b'Kontext kafka msg: 98', headers=[], checksum=None, serialized_key_size=-1, 
serialized_value_size=21, serialized_header_size=-1)
ConsumerRecord(topic='kontext-kafka', partition=0, offset=99, timestamp=1599291349519, timestamp_type=0, key=None, value=b'Kontext kafka msg: 99', headers=[], checksum=None, serialized_key_size=-1, 
serialized_value_size=21, serialized_header_size=-1)

There are 100 consumer records as expected.

To print out values directly, we just need to use the value property:

from kafka import KafkaConsumer

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
consumer = KafkaConsumer(
    topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest')
for msg in consumer:
    print(msg.value.decode("utf-8"))
Output:
Kontext kafka msg: 0
Kontext kafka msg: 1
......
Kontext kafka msg: 97
Kontext kafka msg: 98
Kontext kafka msg: 99
infoFunction send in KafkaProducer takes bytes as input thus the message was converted to UTF-8 binary; when consuming the messages, we just need to decode it back to string. You can choose other encoder or decoders.

Read messages from a specified offset

We can use KafkaConsumer.seek function to seek a specific offset and start to read from there.

Example:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
consumer = KafkaConsumer(
    topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest')

partitions = consumer.partitions_for_topic(topic)
for p in partitions:
    topic_partition = TopicPartition(topic, p)
    # Seek offset 95
    consumer.seek(partition=topic_partition, offset=95)
    for msg in consumer:
        print(msg.value.decode("utf-8"))
Output:
Kontext kafka msg: 95
Kontext kafka msg: 96
Kontext kafka msg: 97
Kontext kafka msg: 98
Kontext kafka msg: 99

Messages before offset 95 are not read. 

Summary

KafkaProducer and KafkaConsumer APIs allow us to produce and consume messages in Kafka topics very easily. 

Refer to the API documentation on GitHub for more details:

Kafka APIs Documentation

info Last modified by Raymond at 3 months ago copyright This page is subject to Site terms.
Like this article?
Share on

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

Want to publish your article on Kontext?

Learn more

Kontext Column

Created for everyone to publish data, programming and cloud related articles.
Follow three steps to create your columns.


Learn more arrow_forward

More from Kontext

Pandas DataFrame Plot - Bar Chart

local_offer plot local_offer pandas local_offer python local_offer jupyter-notebook local_offer pandas-plot

visibility 1514
thumb_up 0
access_time 8 months ago

Recently, I've been doing some visualization/plot with Pandas DataFrame in Jupyter notebook. In this article I'm going to show you some examples about plotting bar chart (incl. stacked bar chart with series) with Pandas DataFrame. I'm using Jupyter Notebook as IDE/code execution environment.  ...

Pandas DataFrame Plot - Pie Chart

local_offer plot local_offer pandas local_offer jupyter-notebook local_offer python local_offer pandas-plot

visibility 6518
thumb_up 0
access_time 8 months ago

This article provides examples about plotting pie chart using  pandas.DataFrame.plot  function. The data I'm going to use is the same as the other article  Pandas DataFrame Plot - Bar Chart . I'm also using Jupyter Notebook to plot them. The DataFrame has 9 records: DATE TYPE ...

local_offer python local_offer spark-2-x local_offer spark-file-operations

visibility 7003
thumb_up 0
access_time 2 years ago

Spark has easy fluent APIs that can be used to read data from JSON file as DataFrame object. 

About column

Streaming analytics related tutorials and ideas.

rss_feed Subscribe RSS