Code description
This code snippet reads data from topic's beginning offset by utilizing seek_to_beginning()
API. This will sets each partitions subscribed by the consumers to fetch records from the earliest available offsets.
Example output:
polling...
Kontext message 1
Kontext message 2
Kontext message 3
Kontext message 4
Kontext message 5
New kontext events~!!
New events!
polling...
polling...
Code snippet
from kafka import KafkaConsumer
def main():
"""
Kafka example
"""
topic = 'kontext-events'
bootstrap_servers = 'localhost:9092'
consumer = KafkaConsumer(
topic,
client_id='local-test',
bootstrap_servers=bootstrap_servers,
# auto_offset_reset='earliest'
)
consumer.partitions_for_topic(topic)
# seek to beginning
consumer.seek_to_beginning()
while True:
print('polling...')
records = consumer.poll(timeout_ms=1000)
for _, consumer_records in records.items():
# Parse records
for consumer_record in consumer_records:
print(str(consumer_record.value.decode('utf-8')))
continue
if __name__ == '__main__':
main()