Kafka Topic Partitions Walkthrough via Python

event 2020-09-06 visibility 14,084 comment 0 insights
more_vert
insights Stats
Kafka Topic Partitions Walkthrough via Python
Raymond Raymond Streaming Analytics & Kafka

Streaming analytics and Kafka related tutorials and ideas.

Partition is the parallelism unit in a Kafka cluster. Partitions are replicated in Kafka cluster (cluster of brokers) for fault tolerant and throughput. This articles show you how to work with Kafka partitions using Python as programming language.

2020090602244-kafka-partitions.png

Package kafka-python will be used in the following sections.

Find partitions of a topic

Function KafkaConsumer.partitions_for_topic (or KafkaProducer.partitions_for) can be used to retrieve partitions of a topic.

Example:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
consumer = KafkaConsumer(
    topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest')

partitions = consumer.partitions_for_topic(topic)
print(partitions)

Output:

{0}

In my Kafka cluster, there is only one partition for this topic with partition number 0.

Find TopicPartition of a topic

The previous method returns a set of partition numbers and TopicPartition can be constructed using partition number. 

For example, the following code snippet creates a TopicPartition object using topic name and partition number:

first_topic_part = TopicPartition(topic, 0)
print(first_topic_part)

Example output:

TopicPartition(topic='kontext-kafka', partition=0)

To retrieve the current assigned topics for consumer, function assignments can be used.

partitions = consumer.assignment()
print(partitions)

This function returns a set of TopicPartition instances:

{TopicPartition(topic='kontext-kafka', partition=0)}

Create new partitions for a topic

KafkaAdminClient class can be used to manage partitions for a topic.

The following is the function signature:

create_partitions(topic_partitions, timeout_ms=None, validate_only=False)

Example:

from kafka import KafkaAdminClient
from kafka.admin import NewPartitions

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'

admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic_partitions = {}
topic_partitions[topic] = NewPartitions(total_count=3)
admin_client.create_partitions(topic_partitions)

The above code will increase topic 'kontext-kafka' partitions to 3.

Write message to a partition

Producer clients can be use send function to write messages into Kafka cluster. The signature of this function looks like the following:

send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

As you can see, partition can be specified when calling this function.

Example:

from kafka import KafkaConsumer, KafkaProducer
from kafka.structs import TopicPartition

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

# Generate 100 messages
for _ in range(100, 200):
    msg = f'Kontext kafka msg: {_}'
    future = producer.send(topic, msg.encode('utf-8'), partition=1)
    print(f'Sending msg: {msg}')
    result = future.get(timeout=60)

partitions = consumer.assignment()
print(partitions)

Output:

...
Sending msg: Kontext kafka msg: 100
Sending msg: Kontext kafka msg: 101
Sending msg: Kontext kafka msg: 102
{TopicPartition(topic='kontext-kafka', partition=0), TopicPartition(topic='kontext-kafka', partition=1), TopicPartition(topic='kontext-kafka', partition=2)}

Read from a specific partition

We can also read from a specific Kafka topic partition in consumer.

Example code:

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'
consumer = KafkaConsumer(
    bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest')
# Read the specified partition
consumer.assign([TopicPartition(topic, 1)])
for msg in consumer:
    print(msg.value.decode("utf-8"))
Output:
Kontext kafka msg: 100
Kontext kafka msg: 101
......
Kontext kafka msg: 198
Kontext kafka msg: 199
Function assign is used to assign partition 1 of the topic to the consumer; thus the printed out messages are all from partition 1. 

Kafka partitioner

Kafka partitioner is used to decide which partition the message goes to for a topic. In Kafka Java library, there are two partitioners implemented named RoundRobinPartitioner and UniformStickyPartitioner. For the Python library we are using, a default partitioner DefaultPartitioner is created. This default partitioner uses murmur2 to implement which is the Python implementation of Java class org.apache.kafka.common.utils.Utils.murmur2.

We can also implement a customized partitioner. 

Customize a partitioner

Let's create a user function to partition.

import hashlib
from random import random
def hash_partitioner(key, all_partitions, available):
    """
    Customer Kafka partitioner to get the partition corresponding to key
    :param key: partitioning key
    :param all_partitions: list of all partitions sorted by partition ID
    :param available: list of available partitions in no particular order
    :return: one of the values from all_partitions or available
    """

    if key is None:
        if available:
            return random.choice(available)
        return random.choice(all_partitions)

    idx = int(hashlib.sha1(key).hexdigest(), 16) % (10 ** 8)
    idx &= 0x7fffffff
    idx %= len(all_partitions)
    return all_partitions[idx]

For this partitioner, it uses hash function instead of murmur2 to calculate partition.  

infoPython function hash will generate a different value in different sessions thus it is not a good idea to directly use that function if you want to ensure messages with same key are written into the same partition. 

The following are some examples of message keys and corresponded partition for topic 'kontext-kafka'.

KeyPartition
'abc'0
'bca'0
'cba'2

Produce message using custom partitioner

To produce messages using the customer partitioner, we need to ensure message has a key otherwise partition will be randomly decided. Customized partitioner can be passed through parameter partitioner when constructing producer client. 

Example:

from random import random
import hashlib
from kafka import KafkaProducer

topic = 'kontext-kafka'
bootstrap_servers = 'localhost:9092'

def hash_partitioner(key, all_partitions, available):
    """
    Customer Kafka partitioner to get the partition corresponding to key
    :param key: partitioning key
    :param all_partitions: list of all partitions sorted by partition ID
    :param available: list of available partitions in no particular order
    :return: one of the values from all_partitions or available
    """

    if key is None:
        if available:
            return random.choice(available)
        return random.choice(all_partitions)

    idx = int(hashlib.sha1(key).hexdigest(), 16) % (10 ** 8)
    idx &= 0x7fffffff
    idx %= len(all_partitions)
    return all_partitions[idx]

producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                         partitioner=hash_partitioner)
# Generate 100 messages
for _ in range(200, 300):
    msg = f'Kontext kafka msg: {_}'
    future = producer.send(topic, value=msg.encode('utf-8'),
                           key='cba'.encode('utf8'))
    print(f'Sending msg: {msg}')
    result = future.get(timeout=60)
Output:
Sending msg: Kontext kafka msg: 200
Sending msg: Kontext kafka msg: 201
......
Sending msg: Kontext kafka msg: 299

All messages were written into partition 2.

infoWhen creating customized partitioner, try to distribute the keys evenly to avoid high computing skewness. With skewed data, the performance won't be great in parallel distributed computing systems. 

References

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts