Code description
When fetching records from Kafka bootstrap servers, we can specify one consumer to subscribe to more than one topic. This can be done in the init function by passing a list of topics:
topics = ['kontext-events', 'other-events']
bootstrap_servers = 'localhost:9092'
consumer = KafkaConsumer(
*topics,
client_id='local-test',
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest'
)
Alternatively, we can use subscribe()
function to subscribe to multiple topics. We can also regular expressions to match topics via patterns:
subscribe(topics=(), pattern=None, listener=None)
Code snippet
from kafka import KafkaConsumer
def main():
"""
Kafka example
"""
topics = ['kontext-events', 'other-events']
bootstrap_servers = 'localhost:9092'
consumer = KafkaConsumer(
client_id='local-test',
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest'
)
consumer.subscribe(topics=topics)
while True:
print('polling...')
records = consumer.poll(timeout_ms=1000)
for _, consumer_records in records.items():
# Parse records
for consumer_record in consumer_records:
print(str(consumer_record.value.decode('utf-8')))
continue
if __name__ == '__main__':
main()