visibility 35 comment 0 access_time 2 months ago language English

codeKafka Consumer - Fetch Records from Beginning

This code snippet reads data from topic's beginning offset by utilizing seek_to_beginning() API. This will sets each partitions subscribed by the consumers to fetch records from the earliest available offsets.

Example output:

polling...
Kontext message 1
Kontext message 2
Kontext message 3
Kontext message 4
Kontext message 5
New kontext events~!!
New events!
polling...
polling...

Code snippet

from kafka import KafkaConsumer


def main():
    """
    Kafka example
    """
    topic = 'kontext-events'
    bootstrap_servers = 'localhost:9092'
    consumer = KafkaConsumer(
        topic,
        client_id='local-test',
        bootstrap_servers=bootstrap_servers,
        # auto_offset_reset='earliest'
    )

    consumer.partitions_for_topic(topic)

    # seek to beginning
    consumer.seek_to_beginning()

    while True:
        print('polling...')
        records = consumer.poll(timeout_ms=1000)
        for _, consumer_records in records.items():
            # Parse records
            for consumer_record in consumer_records:
                print(str(consumer_record.value.decode('utf-8')))
            continue


if __name__ == '__main__':
    main()
fork_right Fork
copyright This page is subject to Site terms.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

comment Comments
No comments yet.