Kafka Topic Partitions Walkthrough via Python

access_time 5 months ago visibility808 comment 0

Partition is the parallelism unit in a Kafka cluster. Partitions are replicated in Kafka cluster (cluster of brokers) for fault tolerant and throughput. This articles show you how to work with Kafka partitions using Python as programming language.

Package kafka-python will be used in the following sections.

Find partitions of a topic

Function KafkaConsumer.partitions_for_topic (or KafkaProducer.partitions_for) can be used to retrieve partitions of a topic.

Example:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
consumer = KafkaConsumer(
    topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest')

partitions = consumer.partitions_for_topic(topic)
print(partitions)

Output:

{0}

In my Kafka cluster, there is only one partition for this topic with partition number 0.

Find TopicPartition of a topic

The previous method returns a set of partition numbers and TopicPartition can be constructed using partition number. 

For example, the following code snippet creates a TopicPartition object using topic name and partition number:

first_topic_part = TopicPartition(topic, 0)
print(first_topic_part)

Example output:

TopicPartition(topic='kontext-kafka', partition=0)

To retrieve the current assigned topics for consumer, function assignments can be used.

partitions = consumer.assignment()
print(partitions)

This function returns a set of TopicPartition instances:

{TopicPartition(topic='kontext-kafka', partition=0)}

Create new partitions for a topic

KafkaAdminClient class can be used to manage partitions for a topic.

The following is the function signature:

create_partitions(topic_partitions, timeout_ms=None, validate_only=False)

Example:

from kafka import KafkaAdminClient
from kafka.admin import NewPartitions

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'

admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic_partitions = {}
topic_partitions[topic] = NewPartitions(total_count=3)
admin_client.create_partitions(topic_partitions)

The above code will increase topic 'kontext-kafka' partitions to 3.

Write message to a partition

Producer clients can be use send function to write messages into Kafka cluster. The signature of this function looks like the following:

send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

As you can see, partition can be specified when calling this function.

Example:

from kafka import KafkaConsumer, KafkaProducer
from kafka.structs import TopicPartition

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

# Generate 100 messages
for _ in range(100, 200):
    msg = f'Kontext kafka msg: {_}'
    future = producer.send(topic, msg.encode('utf-8'), partition=1)
    print(f'Sending msg: {msg}')
    result = future.get(timeout=60)

partitions = consumer.assignment()
print(partitions)

Output:

...
Sending msg: Kontext kafka msg: 100
Sending msg: Kontext kafka msg: 101
Sending msg: Kontext kafka msg: 102
{TopicPartition(topic='kontext-kafka', partition=0), TopicPartition(topic='kontext-kafka', partition=1), TopicPartition(topic='kontext-kafka', partition=2)}

Read from a specific partition

We can also read from a specific Kafka topic partition in consumer.

Example code:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
consumer = KafkaConsumer(
    bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest')
# Read the specified partition
consumer.assign([TopicPartition(topic, 1)])
for msg in consumer:
    print(msg.value.decode("utf-8"))
Output:
Kontext kafka msg: 100
Kontext kafka msg: 101
......
Kontext kafka msg: 198
Kontext kafka msg: 199
Function assign is used to assign partition 1 of the topic to the consumer; thus the printed out messages are all from partition 1. 

Kafka partitioner

Kafka partitioner is used to decide which partition the message goes to for a topic. In Kafka Java library, there are two partitioners implemented named RoundRobinPartitioner and UniformStickyPartitioner. For the Python library we are using, a default partitioner DefaultPartitioner is created. This default partitioner uses murmur2 to implement which is the Python implementation of Java class org.apache.kafka.common.utils.Utils.murmur2.

We can also implement a customized partitioner. 

Customize a partitioner

Let's create a user function to partition.

import hashlib
from random import random
def hash_partitioner(key, all_partitions, available):
    """
    Customer Kafka partitioner to get the partition corresponding to key
    :param key: partitioning key
    :param all_partitions: list of all partitions sorted by partition ID
    :param available: list of available partitions in no particular order
    :return: one of the values from all_partitions or available
    """

    if key is None:
        if available:
            return random.choice(available)
        return random.choice(all_partitions)

    idx = int(hashlib.sha1(key).hexdigest(), 16) % (10 ** 8)
    idx &= 0x7fffffff
    idx %= len(all_partitions)
    return all_partitions[idx]

For this partitioner, it uses hash function instead of murmur2 to calculate partition.  

infoPython function hash will generate a different value in different sessions thus it is not a good idea to directly use that function if you want to ensure messages with same key are written into the same partition. 

The following are some examples of message keys and corresponded partition for topic 'kontext-kafka'.

KeyPartition
'abc'0
'bca'0
'cba'2

Produce message using custom partitioner

To produce messages using the customer partitioner, we need to ensure message has a key otherwise partition will be randomly decided. Customized partitioner can be passed through parameter partitioner when constructing producer client. 

Example:

from random import random
import hashlib
from kafka import KafkaProducer

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'

def hash_partitioner(key, all_partitions, available):
    """
    Customer Kafka partitioner to get the partition corresponding to key
    :param key: partitioning key
    :param all_partitions: list of all partitions sorted by partition ID
    :param available: list of available partitions in no particular order
    :return: one of the values from all_partitions or available
    """

    if key is None:
        if available:
            return random.choice(available)
        return random.choice(all_partitions)

    idx = int(hashlib.sha1(key).hexdigest(), 16) % (10 ** 8)
    idx &= 0x7fffffff
    idx %= len(all_partitions)
    return all_partitions[idx]

producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                         partitioner=hash_partitioner)
# Generate 100 messages
for _ in range(200, 300):
    msg = f'Kontext kafka msg: {_}'
    future = producer.send(topic, value=msg.encode('utf-8'),
                           key='cba'.encode('utf8'))
    print(f'Sending msg: {msg}')
    result = future.get(timeout=60)
Output:
Sending msg: Kontext kafka msg: 200
Sending msg: Kontext kafka msg: 201
......
Sending msg: Kontext kafka msg: 299

All messages were written into partition 2.

infoWhen creating customized partitioner, try to distribute the keys evenly to avoid high computing skewness. With skewed data, the performance won't be great in parallel distributed computing systems. 

References

copyright This page is subject to Site terms.
Like this article?
Share on

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

Follow Kontext

Get our latest updates on LinkedIn or Twitter.

Want to publish your article on Kontext?

Learn more

More from Kontext

visibility 3067
thumb_up 1
access_time 9 months ago

Pandas is commonly used by Python users to perform data operations. In many scenarios, the results need to be saved to a storage like Teradata. This article shows you how to do that easily using JayDeBeApi or  sqlalchemy-teradata   package.  JayDeBeApi package and Teradata JDBC ...

Spark Structured Streaming - Read from and Write into Kafka Topics
visibility 1803
thumb_up 0
access_time 5 months ago

Spark structured streaming provides rich APIs to read from and write to Kafka topics. When reading from Kafka, Kafka sources can be created for both streaming and batch queries. When writing into Kafka, Kafka sinks can be created as destination for both streaming and batch queries too.  ...

Load Microsoft 365 SharePoint List Data in Python
visibility 826
thumb_up 0
access_time 5 months ago

A Microsoft SharePoint list is a collection of data can be shared with team members or people who you give access to. It is commonly used to capture commonly maintained master data from manual inputs.  This article summarizes steps to create a SharePoint list and then load the data in the list ...