Apache Kafka Python Producer and Consumer Clients Introduction
Apache Kafka is written with Scala. Thus, the most natural way is to use Scala (or Java) to call Kafka APIs, for example, Consumer APIs and Producer APIs. For Python developers, there are open source packages available that function similar as official Java clients.
This article shows you how to use kafka-python package to consume events in Kafka topics and also to generate events.
Kafka brokers
The code snippets in the following examples requires Kafka brokers. If you don't have a Kafka system to work with, try to configure one on Windows 10 following this guide:
Install and Run Kafka 2.6.0 On Windows 10
Python 2 or 3 is required to run the sample code. In my system, Python version is 3.8.2 (x64 bit).
python --version Python 3.8.2
Install package
Run the following command to install the latest package:
pip install kafka-python
KafkaProducer
KafkaProducer class is used to generate asynchronous messages.
The following example script producer.py send 100 messages to topic named kontext-kafka. At the end, it also shows the metrics of the producer.
from kafka import KafkaProducer topic = 'kontext-kafka' bootstrap_servers = 'localhost:9092' producer = KafkaProducer(bootstrap_servers=bootstrap_servers) # Generate 100 messages for _ in range(100): msg = f'Kontext kafka msg: {_}' future = producer.send(topic, msg.encode('utf-8')) print(f'Sending msg: {msg}') result = future.get(timeout=60) metrics = producer.metrics() print(metrics)
The output looks like the following:
Sending msg: Kontext kafka msg: 0 Sending msg: Kontext kafka msg: 1 Sending msg: Kontext kafka msg: 2 ...... Sending msg: Kontext kafka msg: 97 Sending msg: Kontext kafka msg: 98 Sending msg: Kontext kafka msg: 99 {'kafka-metrics-count': {'count': 56.0}, 'producer-metrics': {'connection-close-rate': 0.03261752621864605, 'connection-creation-rate': 0.06465134053710159, 'select-rate': 10.315461901921683, 'io-wait-time-ns-avg': 2256964.73751428, 'io-wait-ratio': 0.023281633763809227, 'io-time-ns-avg': 159617.4899886989, 'io-ratio': 0.001646528136858789, 'connection-count': 1.0, 'network-io-rate': 6.853042096932768, 'outgoing-byte-rate': 542.3277700954767, 'request-rate': 3.426521048466384, 'request-size-avg': 158.27358490566039, 'request-size-max': 165.0, 'incoming-byte-rate': 301.319486684433, 'response-rate': 3.4380910213724327, 'request-latency-avg': 3.048419952392578, 'request-latency-max': 107.99527168273926, 'bufferpool-wait-ratio': 0.0, 'batch-size-avg': 88.9, 'batch-size-max': 89.0, 'compression-rate-avg': 1.0, 'record-queue-time-avg': 0.0003571176528930664, 'record-queue-time-max': 0.0011472702026367188, 'produce-throttle-time-avg': 0.0, 'produce-throttle-time-max': 0.0, 'record-send-rate': 3.2970790407245127, 'records-per-request-avg': 1.0, 'byte-rate': 293.11032672040915, 'record-retry-rate': 0.0, 'record-error-rate': 0.0, 'record-size-max': 28.0, 'record-size-avg': 27.9, 'requests-in-flight': 0.0, 'metadata-age': 0.33087109375}, 'producer-node-metrics.node-bootstrap-0': {'outgoing-byte-rate': 3.8467547619575444, 'request-rate': 0.09697701080565238, 'request-size-avg': 39.666666666666664, 'request-size-max': 41.0, 'incoming-byte-rate': 109.37021626479098, 'response-rate': 0.09730446286903112, 'request-latency-avg': 36.14187240600586, 'request-latency-max': 104.10404205322266}, 'producer-node-metrics.node-0': {'outgoing-byte-rate': 543.3427517502058, 'request-rate': 3.3596052005205426, 'request-size-avg': 161.72815533980582, 'request-size-max': 165.0, 'incoming-byte-rate': 193.70656448901786, 'response-rate': 3.3713714333168032, 'request-latency-avg': 2.0845329877242302, 'request-latency-max': 107.99527168273926}, 'producer-topic-metrics.kontext-kafka': {'record-send-rate': 3.2970790407245127, 'byte-rate': 293.11032672040915, 'compression-rate': 1.0, 'record-retry-rate': 0.0, 'record-error-rate': 0.0}}
KafkaConsumer
KafkaConsumer class is used to retrieve topic events from Kafka topic.
The following sample script consumer.py reads from the topic created by the above producer. A
from kafka import KafkaConsumer topic = 'kontext-kafka' bootstrap_servers = 'localhost:9092' consumer = KafkaConsumer( topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest') for msg in consumer: print(msg)
Option auto_offset_reset='earliest' is added to retrieve events from the very beginning.
The output looks like the following:
ConsumerRecord(topic='kontext-kafka', partition=0, offset=0, timestamp=1599291349190, timestamp_type=0, key=None, value=b'Kontext kafka msg: 0', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=20, serialized_header_size=-1) ConsumerRecord(topic='kontext-kafka', partition=0, offset=1, timestamp=1599291349194, timestamp_type=0, key=None, value=b'Kontext kafka msg: 1', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=20, serialized_header_size=-1) ...... ConsumerRecord(topic='kontext-kafka', partition=0, offset=98, timestamp=1599291349511, timestamp_type=0, key=None, value=b'Kontext kafka msg: 98', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=21, serialized_header_size=-1) ConsumerRecord(topic='kontext-kafka', partition=0, offset=99, timestamp=1599291349519, timestamp_type=0, key=None, value=b'Kontext kafka msg: 99', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=21, serialized_header_size=-1)
There are 100 consumer records as expected.
To print out values directly, we just need to use the value property:
from kafka import KafkaConsumer topic = 'kontext-kafka' bootstrap_servers = 'localhost:9092' consumer = KafkaConsumer( topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest') for msg in consumer: print(msg.value.decode("utf-8"))
Kontext kafka msg: 0 Kontext kafka msg: 1 ...... Kontext kafka msg: 97 Kontext kafka msg: 98 Kontext kafka msg: 99
Read messages from a specified offset
We can use KafkaConsumer.seek function to seek a specific offset and start to read from there.
Example:
from kafka import KafkaConsumer from kafka.structs import TopicPartition topic = 'kontext-kafka' bootstrap_servers = 'localhost:9092' consumer = KafkaConsumer( topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest') partitions = consumer.partitions_for_topic(topic) for p in partitions: topic_partition = TopicPartition(topic, p) # Seek offset 95 consumer.seek(partition=topic_partition, offset=95) for msg in consumer: print(msg.value.decode("utf-8"))
Kontext kafka msg: 95 Kontext kafka msg: 96 Kontext kafka msg: 97 Kontext kafka msg: 98 Kontext kafka msg: 99
Messages before offset 95 are not read.
Summary
KafkaProducer and KafkaConsumer APIs allow us to produce and consume messages in Kafka topics very easily.
Refer to the API documentation on GitHub for more details:
Hello, the Kafka server console will will print out events when reading topic messages from client/consumer as the following screenshot shows:
This is similar as your screenshot for which I don't think it is from Python console. However when creating topics or write events, there is no logs printed out on the console.
Hello, thanks for the wonderful tutorial. But I got one question!!
Why in my kafka server (open in my CMD) I can't see the messages? Isn't supposed to appear on it?
Running the same script has you.
I Just see the messages coming in Python terminal. To consumer to producer script terminal.