Partition is the parallelism unit in a Kafka cluster. Partitions are replicated in Kafka cluster (cluster of brokers) for fault tolerant and throughput. This articles show you how to work with Kafka partitions using Python as programming language.
Package kafka-python will be used in the following sections.
Find partitions of a topic
Function KafkaConsumer.partitions_for_topic (or KafkaProducer.partitions_for) can be used to retrieve partitions of a topic.
Example:
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
consumer = KafkaConsumer(
topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest')
partitions = consumer.partitions_for_topic(topic)
print(partitions)
Output:
{0}
In my Kafka cluster, there is only one partition for this topic with partition number 0.
Find TopicPartition of a topic
The previous method returns a set of partition numbers and TopicPartitioncan be constructed using partition number.
For example, the following code snippet creates a TopicPartition object using topic name and partition number:
first_topic_part = TopicPartition(topic, 0)
print(first_topic_part)
Example output:
TopicPartition(topic='kontext-kafka', partition=0)
To retrieve the current assigned topics for consumer, function assignments can be used.
partitions = consumer.assignment()
print(partitions)
This function returns a set of TopicPartition instances:
{TopicPartition(topic='kontext-kafka', partition=0)}
Create new partitions for a topic
KafkaAdminClientclass can be used to manage partitions for a topic.
The following is the function signature:
create_partitions(topic_partitions, timeout_ms=None, validate_only=False)
Example:
from kafka import KafkaAdminClient
from kafka.admin import NewPartitions
topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic_partitions = {}
topic_partitions[topic] = NewPartitions(total_count=3)
admin_client.create_partitions(topic_partitions)
The above code will increase topic 'kontext-kafka' partitions to 3.
Write message to a partition
Producer clients can be use sendfunction to write messages into Kafka cluster. The signature of this function looks like the following:
send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)
As you can see, partition can be specified when calling this function.
Example:
from kafka import KafkaConsumer, KafkaProducer
from kafka.structs import TopicPartition
topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
# Generate 100 messages
for _ in range(100, 200):
msg = f'Kontext kafka msg: {_}'
future = producer.send(topic, msg.encode('utf-8'), partition=1)
print(f'Sending msg: {msg}')
result = future.get(timeout=60)
partitions = consumer.assignment()
print(partitions)
Output:
...
Sending msg: Kontext kafka msg: 100
Sending msg: Kontext kafka msg: 101
Sending msg: Kontext kafka msg: 102
{TopicPartition(topic='kontext-kafka', partition=0), TopicPartition(topic='kontext-kafka', partition=1), TopicPartition(topic='kontext-kafka', partition=2)}
Read from a specific partition
We can also read from a specific Kafka topic partition in consumer.
Example code:
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest')
# Read the specified partition
consumer.assign([TopicPartition(topic, 1)])
for msg in consumer:
print(msg.value.decode("utf-8"))
Output:
Kontext kafka msg: 100
Kontext kafka msg: 101
......
Kontext kafka msg: 198
Kontext kafka msg: 199
Function assign is used to assign partition 1 of the topic to the consumer; thus the printed out messages are all from partition 1.
Kafka partitioner
Kafka partitioner is used to decide which partition the message goes to for a topic. In Kafka Java library, there are two partitioners implemented named RoundRobinPartitioner and UniformStickyPartitioner. For the Python library we are using, a default partitioner DefaultPartitioner is created. This default partitioner uses murmur2to implement which is the Python implementation of Java class org.apache.kafka.common.utils.Utils.murmur2.
We can also implement a customized partitioner.
Customize a partitioner
Let's create a user function to partition.
import hashlib
from random import random
def hash_partitioner(key, all_partitions, available):
"""
Customer Kafka partitioner to get the partition corresponding to key
:param key: partitioning key
:param all_partitions: list of all partitions sorted by partition ID
:param available: list of available partitions in no particular order
:return: one of the values from all_partitions or available
"""
if key is None:
if available:
return random.choice(available)
return random.choice(all_partitions)
idx = int(hashlib.sha1(key).hexdigest(), 16) % (10 ** 8)
idx &= 0x7fffffff
idx %= len(all_partitions)
return all_partitions[idx]
For this partitioner, it uses hash function instead of murmur2to calculate partition.
infoPython function hash will generate a different value in different sessions thus it is not a good idea to directly use that function if you want to ensure messages with same key are written into the same partition.
The following are some examples of message keys and corresponded partition for topic 'kontext-kafka'.
Key | Partition |
---|---|
'abc' | 0 |
'bca' | 0 |
'cba' | 2 |
Produce message using custom partitioner
To produce messages using the customer partitioner, we need to ensure message has a key otherwise partition will be randomly decided. Customized partitioner can be passed through parameter partitionerwhen constructing producer client.
Example:
from random import random
import hashlib
from kafka import KafkaProducer
topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
def hash_partitioner(key, all_partitions, available):
"""
Customer Kafka partitioner to get the partition corresponding to key
:param key: partitioning key
:param all_partitions: list of all partitions sorted by partition ID
:param available: list of available partitions in no particular order
:return: one of the values from all_partitions or available
"""
if key is None:
if available:
return random.choice(available)
return random.choice(all_partitions)
idx = int(hashlib.sha1(key).hexdigest(), 16) % (10 ** 8)
idx &= 0x7fffffff
idx %= len(all_partitions)
return all_partitions[idx]
producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
partitioner=hash_partitioner)
# Generate 100 messages
for _ in range(200, 300):
msg = f'Kontext kafka msg: {_}'
future = producer.send(topic, value=msg.encode('utf-8'),
key='cba'.encode('utf8'))
print(f'Sending msg: {msg}')
result = future.get(timeout=60)
Output:
Sending msg: Kontext kafka msg: 200
Sending msg: Kontext kafka msg: 201
......
Sending msg: Kontext kafka msg: 299
All messages were written into partition 2.
infoWhen creating customized partitioner, try to distribute the keys evenly to avoid high computing skewness. With skewed data, the performance won't be great in parallel distributed computing systems.