Kontext Kontext / Streaming Analytics & Kafka

Retrieve Kafka Messages (Records) via Timestamp

event 2022-08-22 visibility 10,068 comment 0 insights toc
more_vert
insights Stats
Retrieve Kafka Messages (Records) via Timestamp

In most scenarios, Kafka consumers read records in partitions via offset - an integer to indicate the position of next record to read for the consumer. Retrieval of records via timestamp is not recommended. However sometimes it might be useful to retrieve records based on certain timestamp. For example, finding out all the event occurred after certain time. This article show you how to retrieve Kafka records via timestamp using Python library (kafka-python).

About timestamp in Kafka records

From Kafka 0.10.0, Kafka messages/records automatically include timestamps. Based on the Kafka configuration, the timestamp can be either event-time or ingestion-time:

  • event-time: the time when the event occurred in the source system. 
  • ingestion-time: the time when the records/messages get added into a partition via a Kafka broker. 

The timestamp in message will be assigned by TimestampExtractor interface. 

Due to the above limitation, this article can only apply to Kafka 0.10.0 or above.

Find offsets via timestamp

To retrieve records from a certain timestamp, the slowest approach is to iterate through all the messages in the partition and find the earliest offset. There is a built in function in KafkaConsumer provided to do this.

offsets_for_times(timestamps)

It look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.

The parameter timestamps are a dictionary of type {TopicPartition: int}.The integer is similar UNIX timestamp tough the unit is milliseconds instead of seconds. We need to specify the milliseconds since the beginning of epoch (midnight of 1st Jan 1970 UTC).

Convert timestamp to milliseconds in Python

In Python, there are different approaches to convert timestamp to milliseconds. Refer to the following code snippet for examples:

Convert Timestamp to Milliseconds since Epoch in Python

One thing to notice

Before we start to use offsets_for_times API, one thing to remember is that this function is a blocking call.

Example

Now let's implement a consumer python script to lookup the offsets via offsets_for_times function and then retrieve records using that offset. 

To test the timestamp, I've manually produced some messages into the Kafka topic named kontext-events in my local Kafka cluster. This cluster was created following tutorial Install and Run Kafka 3.2.0 On WSL.

20220822120747-image.png

The following is the code snippet for reference.

from datetime import datetime
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
from datetime import datetime, timedelta

# Topic name
topic = 'kontext-events'
# Bootstrap servers - you can input multiple ones with comma seperated.
# for example, bs1:9092, bs2:9092
bootstrap_servers = 'localhost:9092'
consumer = KafkaConsumer(
    topic, bootstrap_servers=bootstrap_servers)

# Timestamp to look for events
# Look for events created from 60 minutes ago
ts = datetime.now() - timedelta(minutes=60)
# Convert to epoch milliseconds
ts_milliseconds = ts.timestamp()*1000.0

print(f'Looking for offsets of : {ts} ({ts_milliseconds})')

# We only retrieve from partition 0 for this example
# as there is only one partition created for the topic
# To find out all partitions, partitions_for_topic can be used.
topic_partition_0 = TopicPartition(topic, 0)
timestamps = {topic_partition_0: ts_milliseconds}
offsets = consumer.offsets_for_times(timestamps)

print(offsets)

offset_p0 = offsets[topic_partition_0]

print(offset_p0)

consumer.seek(partition=topic_partition_0, offset=offset_p0.offset)

while True:
    print('polling...')
    records = consumer.poll(timeout_ms=1000)
    for topic_data, consumer_records in records.items():
        for consumer_record in consumer_records:
            print(str(consumer_record.value.decode('utf-8')))
        continue

The code is simple:

  • It first creates a consumer that subscribes to the specified topic
  • It then finds out the offset for partition 0 that has a timestamp is equal to or greater than the specified timestamp (60 minutes before the code runs).
  • And then use the returned offset to seek records in consumer. 

The output looks like the following:

Looking for offsets of : 2022-08-22 21:43:15.202923 (1661168595202.923)
{TopicPartition(topic='kontext-events', partition=0): OffsetAndTimestamp(offset=3, timestamp=1661170009490)}
OffsetAndTimestamp(offset=3, timestamp=1661170009490)
polling...
Kontext message 1
Kontext message 2
Kontext message 3
Kontext message 4
Kontext message 5
polling...
polling...
polling...

Summary

As you can see, it is very easy to find events from a certain point of time. In fact, Kafka Stream also relies on timestamp to do joins. We will discuss about it in another article. If you have any questions, feel free to post a comment.

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts