Kafka Topic Partitions Walkthrough via Python

access_time 2 months ago visibility170 comment 0

Partition is the parallelism unit in a Kafka cluster. Partitions are replicated in Kafka cluster (cluster of brokers) for fault tolerant and throughput. This articles show you how to work with Kafka partitions using Python as programming language.

Package kafka-python will be used in the following sections.

Find partitions of a topic

Function KafkaConsumer.partitions_for_topic (or KafkaProducer.partitions_for) can be used to retrieve partitions of a topic.

Example:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
consumer = KafkaConsumer(
    topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest')

partitions = consumer.partitions_for_topic(topic)
print(partitions)

Output:

{0}

In my Kafka cluster, there is only one partition for this topic with partition number 0.

Find TopicPartition of a topic

The previous method returns a set of partition numbers and TopicPartition can be constructed using partition number. 

For example, the following code snippet creates a TopicPartition object using topic name and partition number:

first_topic_part = TopicPartition(topic, 0)
print(first_topic_part)

Example output:

TopicPartition(topic='kontext-kafka', partition=0)

To retrieve the current assigned topics for consumer, function assignments can be used.

partitions = consumer.assignment()
print(partitions)

This function returns a set of TopicPartition instances:

{TopicPartition(topic='kontext-kafka', partition=0)}

Create new partitions for a topic

KafkaAdminClient class can be used to manage partitions for a topic.

The following is the function signature:

create_partitions(topic_partitions, timeout_ms=None, validate_only=False)

Example:

from kafka import KafkaAdminClient
from kafka.admin import NewPartitions

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'

admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic_partitions = {}
topic_partitions[topic] = NewPartitions(total_count=3)
admin_client.create_partitions(topic_partitions)

The above code will increase topic 'kontext-kafka' partitions to 3.

Write message to a partition

Producer clients can be use send function to write messages into Kafka cluster. The signature of this function looks like the following:

send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

As you can see, partition can be specified when calling this function.

Example:

from kafka import KafkaConsumer, KafkaProducer
from kafka.structs import TopicPartition

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

# Generate 100 messages
for _ in range(100, 200):
    msg = f'Kontext kafka msg: {_}'
    future = producer.send(topic, msg.encode('utf-8'), partition=1)
    print(f'Sending msg: {msg}')
    result = future.get(timeout=60)

partitions = consumer.assignment()
print(partitions)

Output:

...
Sending msg: Kontext kafka msg: 100
Sending msg: Kontext kafka msg: 101
Sending msg: Kontext kafka msg: 102
{TopicPartition(topic='kontext-kafka', partition=0), TopicPartition(topic='kontext-kafka', partition=1), TopicPartition(topic='kontext-kafka', partition=2)}

Read from a specific partition

We can also read from a specific Kafka topic partition in consumer.

Example code:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
consumer = KafkaConsumer(
    bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest')
# Read the specified partition
consumer.assign([TopicPartition(topic, 1)])
for msg in consumer:
    print(msg.value.decode("utf-8"))
Output:
Kontext kafka msg: 100
Kontext kafka msg: 101
......
Kontext kafka msg: 198
Kontext kafka msg: 199
Function assign is used to assign partition 1 of the topic to the consumer; thus the printed out messages are all from partition 1. 

Kafka partitioner

Kafka partitioner is used to decide which partition the message goes to for a topic. In Kafka Java library, there are two partitioners implemented named RoundRobinPartitioner and UniformStickyPartitioner. For the Python library we are using, a default partitioner DefaultPartitioner is created. This default partitioner uses murmur2 to implement which is the Python implementation of Java class org.apache.kafka.common.utils.Utils.murmur2.

We can also implement a customized partitioner. 

Customize a partitioner

Let's create a user function to partition.

import hashlib
from random import random
def hash_partitioner(key, all_partitions, available):
    """
    Customer Kafka partitioner to get the partition corresponding to key
    :param key: partitioning key
    :param all_partitions: list of all partitions sorted by partition ID
    :param available: list of available partitions in no particular order
    :return: one of the values from all_partitions or available
    """

    if key is None:
        if available:
            return random.choice(available)
        return random.choice(all_partitions)

    idx = int(hashlib.sha1(key).hexdigest(), 16) % (10 ** 8)
    idx &= 0x7fffffff
    idx %= len(all_partitions)
    return all_partitions[idx]

For this partitioner, it uses hash function instead of murmur2 to calculate partition.  

infoPython function hash will generate a different value in different sessions thus it is not a good idea to directly use that function if you want to ensure messages with same key are written into the same partition. 

The following are some examples of message keys and corresponded partition for topic 'kontext-kafka'.

KeyPartition
'abc'0
'bca'0
'cba'2

Produce message using custom partitioner

To produce messages using the customer partitioner, we need to ensure message has a key otherwise partition will be randomly decided. Customized partitioner can be passed through parameter partitioner when constructing producer client. 

Example:

from random import random
import hashlib
from kafka import KafkaProducer

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'

def hash_partitioner(key, all_partitions, available):
    """
    Customer Kafka partitioner to get the partition corresponding to key
    :param key: partitioning key
    :param all_partitions: list of all partitions sorted by partition ID
    :param available: list of available partitions in no particular order
    :return: one of the values from all_partitions or available
    """

    if key is None:
        if available:
            return random.choice(available)
        return random.choice(all_partitions)

    idx = int(hashlib.sha1(key).hexdigest(), 16) % (10 ** 8)
    idx &= 0x7fffffff
    idx %= len(all_partitions)
    return all_partitions[idx]

producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                         partitioner=hash_partitioner)
# Generate 100 messages
for _ in range(200, 300):
    msg = f'Kontext kafka msg: {_}'
    future = producer.send(topic, value=msg.encode('utf-8'),
                           key='cba'.encode('utf8'))
    print(f'Sending msg: {msg}')
    result = future.get(timeout=60)
Output:
Sending msg: Kontext kafka msg: 200
Sending msg: Kontext kafka msg: 201
......
Sending msg: Kontext kafka msg: 299

All messages were written into partition 2.

infoWhen creating customized partitioner, try to distribute the keys evenly to avoid high computing skewness. With skewed data, the performance won't be great in parallel distributed computing systems. 

References

copyright This page is subject to Site terms.
Like this article?
Share on

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

Want to publish your article on Kontext?

Learn more

Kontext Column

Created for everyone to publish data, programming and cloud related articles.
Follow three steps to create your columns.


Learn more arrow_forward

More from Kontext

Pandas DataFrame Plot - Scatter and Hexbin Chart

local_offer plot local_offer pandas local_offer jupyter-notebook local_offer python local_offer pandas-plot

visibility 163
thumb_up 0
access_time 7 months ago

 In this article I'm going to show you some examples about plotting scatter and hexbin chart with Pandas DataFrame. I'm using Jupyter Notebook as IDE/code execution environment.  Hexbin chart is a pcolor of a 2-D histogram with hexagonal cell and can be more informative compared ...

local_offer kafka local_offer python

visibility 98
thumb_up 0
access_time 2 months ago

Apache Kafka is written with Scala. Thus, the most natural way is to use Scala (or Java) to call Kafka APIs, for example, Consumer APIs and Producer APIs. For Python developers, there are open source packages available that function similar as official Java clients.  This article shows you ...

local_offer python local_offer spark-2-x local_offer spark-dataframe

visibility 3500
thumb_up 0
access_time 12 months ago

In Spark, SparkContext.parallelize function can be used to convert list of objects to RDD and then RDD can be converted to DataFrame object through SparkSession.

About column

Streaming analytics related tutorials and ideas.

rss_feed Subscribe RSS