Retrieve Kafka Messages (Records) via Timestamp
In most scenarios, Kafka consumers read records in partitions via offset - an integer to indicate the position of next record to read for the consumer. Retrieval of records via timestamp is not recommended. However sometimes it might be useful to retrieve records based on certain timestamp. For example, finding out all the event occurred after certain time. This article show you how to retrieve Kafka records via timestamp using Python library (kafka-python).
About timestamp in Kafka records
From Kafka 0.10.0, Kafka messages/records automatically include timestamps. Based on the Kafka configuration, the timestamp can be either event-time or ingestion-time:
- event-time: the time when the event occurred in the source system.
- ingestion-time: the time when the records/messages get added into a partition via a Kafka broker.
The timestamp in message will be assigned by TimestampExtractor
interface.
Due to the above limitation, this article can only apply to Kafka 0.10.0 or above.
Find offsets via timestamp
To retrieve records from a certain timestamp, the slowest approach is to iterate through all the messages in the partition and find the earliest offset. There is a built in function in KafkaConsumer
provided to do this.
offsets_for_times(timestamps)
It look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
The parameter timestamps are a dictionary of type {TopicPartition: int}
.The integer is similar UNIX timestamp tough the unit is milliseconds instead of seconds. We need to specify the milliseconds since the beginning of epoch (midnight of 1st Jan 1970 UTC).
Convert timestamp to milliseconds in Python
In Python, there are different approaches to convert timestamp to milliseconds. Refer to the following code snippet for examples:
Convert Timestamp to Milliseconds since Epoch in Python
One thing to notice
Before we start to use offsets_for_times
API, one thing to remember is that this function is a blocking call.
Example
Now let's implement a consumer python script to lookup the offsets via offsets_for_times
function and then retrieve records using that offset.
To test the timestamp, I've manually produced some messages into the Kafka topic named kontext-events in my local Kafka cluster. This cluster was created following tutorial Install and Run Kafka 3.2.0 On WSL.
The following is the code snippet for reference.
from datetime import datetime from kafka import KafkaConsumer from kafka.structs import TopicPartition from datetime import datetime, timedelta # Topic name topic = 'kontext-events' # Bootstrap servers - you can input multiple ones with comma seperated. # for example, bs1:9092, bs2:9092 bootstrap_servers = 'localhost:9092' consumer = KafkaConsumer( topic, bootstrap_servers=bootstrap_servers) # Timestamp to look for events # Look for events created from 60 minutes ago ts = datetime.now() - timedelta(minutes=60) # Convert to epoch milliseconds ts_milliseconds = ts.timestamp()*1000.0 print(f'Looking for offsets of : {ts} ({ts_milliseconds})') # We only retrieve from partition 0 for this example # as there is only one partition created for the topic # To find out all partitions, partitions_for_topic can be used. topic_partition_0 = TopicPartition(topic, 0) timestamps = {topic_partition_0: ts_milliseconds} offsets = consumer.offsets_for_times(timestamps) print(offsets) offset_p0 = offsets[topic_partition_0] print(offset_p0) consumer.seek(partition=topic_partition_0, offset=offset_p0.offset) while True: print('polling...') records = consumer.poll(timeout_ms=1000) for topic_data, consumer_records in records.items(): for consumer_record in consumer_records: print(str(consumer_record.value.decode('utf-8'))) continue
The code is simple:
- It first creates a consumer that subscribes to the specified topic
- It then finds out the offset for partition 0 that has a timestamp is equal to or greater than the specified timestamp (60 minutes before the code runs).
- And then use the returned offset to seek records in consumer.
The output looks like the following:
Looking for offsets of : 2022-08-22 21:43:15.202923 (1661168595202.923) {TopicPartition(topic='kontext-events', partition=0): OffsetAndTimestamp(offset=3, timestamp=1661170009490)} OffsetAndTimestamp(offset=3, timestamp=1661170009490) polling... Kontext message 1 Kontext message 2 Kontext message 3 Kontext message 4 Kontext message 5 polling... polling... polling...
Summary
As you can see, it is very easy to find events from a certain point of time. In fact, Kafka Stream also relies on timestamp to do joins. We will discuss about it in another article. If you have any questions, feel free to post a comment.