Kafka Consumer - Fetch Records from Beginning

This code snippet reads data from topic's beginning offset by utilizing `seek_to_beginning()` API. This will sets each partitions subscribed by the consumers to fetch records from the earliest available offsets. Example output: > polling... > Kontext message 1 > Kontext message 2 > Kontext message 3 > Kontext message 4 > Kontext message 5 > New kontext events~!! > New events! > polling... > polling...

Kontext Kontext 0 2308 2.22 index 8/22/2022

Code description

This code snippet reads data from topic's beginning offset by utilizing seek_to_beginning() API. This will sets each partitions subscribed by the consumers to fetch records from the earliest available offsets.

Example output:

polling...
Kontext message 1
Kontext message 2
Kontext message 3
Kontext message 4
Kontext message 5
New kontext events~!!
New events!
polling...
polling...

Code snippet

    from kafka import KafkaConsumer
    
    
    def main():
        """
        Kafka example
        """
        topic = 'kontext-events'
        bootstrap_servers = 'localhost:9092'
        consumer = KafkaConsumer(
            topic,
            client_id='local-test',
            bootstrap_servers=bootstrap_servers,
            # auto_offset_reset='earliest'
        )
    
        consumer.partitions_for_topic(topic)
    
        # seek to beginning
        consumer.seek_to_beginning()
    
        while True:
            print('polling...')
            records = consumer.poll(timeout_ms=1000)
            for _, consumer_records in records.items():
                # Parse records
                for consumer_record in consumer_records:
                    print(str(consumer_record.value.decode('utf-8')))
                continue
    
    
    if __name__ == '__main__':
        main()
    
kafka python

Join the Discussion

View or add your thoughts below

Comments