Kafka Topic Partitions Walkthrough via Python
insights Stats
Partition is the parallelism unit in a Kafka cluster. Partitions are replicated in Kafka cluster (cluster of brokers) for fault tolerant and throughput. This articles show you how to work with Kafka partitions using Python as programming language.
Package kafka-python will be used in the following sections.
Find partitions of a topic
Function KafkaConsumer.partitions_for_topic (or KafkaProducer.partitions_for) can be used to retrieve partitions of a topic.
Example:
from kafka import KafkaConsumer from kafka.structs import TopicPartition topic = 'kontext-kafka' bootstrap_servers = 'localhost:9092' consumer = KafkaConsumer( topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest') partitions = consumer.partitions_for_topic(topic) print(partitions)
Output:
{0}
In my Kafka cluster, there is only one partition for this topic with partition number 0.
Find TopicPartition of a topic
The previous method returns a set of partition numbers and TopicPartition can be constructed using partition number.
For example, the following code snippet creates a TopicPartition object using topic name and partition number:
first_topic_part = TopicPartition(topic, 0) print(first_topic_part)
Example output:
TopicPartition(topic='kontext-kafka', partition=0)
To retrieve the current assigned topics for consumer, function assignments can be used.
partitions = consumer.assignment() print(partitions)
This function returns a set of TopicPartition instances:
{TopicPartition(topic='kontext-kafka', partition=0)}
Create new partitions for a topic
KafkaAdminClient class can be used to manage partitions for a topic.
The following is the function signature:
create_partitions(topic_partitions, timeout_ms=None, validate_only=False)
Example:
from kafka import KafkaAdminClient from kafka.admin import NewPartitions topic = 'kontext-kafka' bootstrap_servers = 'localhost:9092' admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers) topic_partitions = {} topic_partitions[topic] = NewPartitions(total_count=3) admin_client.create_partitions(topic_partitions)
The above code will increase topic 'kontext-kafka' partitions to 3.
Write message to a partition
Producer clients can be use send function to write messages into Kafka cluster. The signature of this function looks like the following:
send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)
As you can see, partition can be specified when calling this function.
Example:
from kafka import KafkaConsumer, KafkaProducer from kafka.structs import TopicPartition topic = 'kontext-kafka' bootstrap_servers = 'localhost:9092' producer = KafkaProducer(bootstrap_servers=bootstrap_servers) # Generate 100 messages for _ in range(100, 200): msg = f'Kontext kafka msg: {_}' future = producer.send(topic, msg.encode('utf-8'), partition=1) print(f'Sending msg: {msg}') result = future.get(timeout=60) partitions = consumer.assignment() print(partitions)
Output:
... Sending msg: Kontext kafka msg: 100 Sending msg: Kontext kafka msg: 101 Sending msg: Kontext kafka msg: 102 {TopicPartition(topic='kontext-kafka', partition=0), TopicPartition(topic='kontext-kafka', partition=1), TopicPartition(topic='kontext-kafka', partition=2)}
Read from a specific partition
We can also read from a specific Kafka topic partition in consumer.
Example code:
from kafka import KafkaConsumer from kafka.structs import TopicPartition topic = 'kontext-kafka' bootstrap_servers = 'localhost:9092' consumer = KafkaConsumer( bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest') # Read the specified partition consumer.assign([TopicPartition(topic, 1)]) for msg in consumer: print(msg.value.decode("utf-8"))
Kontext kafka msg: 100 Kontext kafka msg: 101 ...... Kontext kafka msg: 198 Kontext kafka msg: 199
Kafka partitioner
Kafka partitioner is used to decide which partition the message goes to for a topic. In Kafka Java library, there are two partitioners implemented named RoundRobinPartitioner and UniformStickyPartitioner. For the Python library we are using, a default partitioner DefaultPartitioner is created. This default partitioner uses murmur2 to implement which is the Python implementation of Java class org.apache.kafka.common.utils.Utils.murmur2.
We can also implement a customized partitioner.
Customize a partitioner
Let's create a user function to partition.
import hashlib from random import random def hash_partitioner(key, all_partitions, available): """ Customer Kafka partitioner to get the partition corresponding to key :param key: partitioning key :param all_partitions: list of all partitions sorted by partition ID :param available: list of available partitions in no particular order :return: one of the values from all_partitions or available """ if key is None: if available: return random.choice(available) return random.choice(all_partitions) idx = int(hashlib.sha1(key).hexdigest(), 16) % (10 ** 8) idx &= 0x7fffffff idx %= len(all_partitions) return all_partitions[idx]
For this partitioner, it uses hash function instead of murmur2 to calculate partition.
The following are some examples of message keys and corresponded partition for topic 'kontext-kafka'.
Key | Partition |
'abc' | 0 |
'bca' | 0 |
'cba' | 2 |
Produce message using custom partitioner
To produce messages using the customer partitioner, we need to ensure message has a key otherwise partition will be randomly decided. Customized partitioner can be passed through parameter partitioner when constructing producer client.
Example:
from random import random import hashlib from kafka import KafkaProducer topic = 'kontext-kafka' bootstrap_servers = 'localhost:9092' def hash_partitioner(key, all_partitions, available): """ Customer Kafka partitioner to get the partition corresponding to key :param key: partitioning key :param all_partitions: list of all partitions sorted by partition ID :param available: list of available partitions in no particular order :return: one of the values from all_partitions or available """ if key is None: if available: return random.choice(available) return random.choice(all_partitions) idx = int(hashlib.sha1(key).hexdigest(), 16) % (10 ** 8) idx &= 0x7fffffff idx %= len(all_partitions) return all_partitions[idx] producer = KafkaProducer(bootstrap_servers=bootstrap_servers, partitioner=hash_partitioner) # Generate 100 messages for _ in range(200, 300): msg = f'Kontext kafka msg: {_}' future = producer.send(topic, value=msg.encode('utf-8'), key='cba'.encode('utf8')) print(f'Sending msg: {msg}') result = future.get(timeout=60)
Sending msg: Kontext kafka msg: 200 Sending msg: Kontext kafka msg: 201 ...... Sending msg: Kontext kafka msg: 299
All messages were written into partition 2.