Subscribe to Multiple Topics in Kafka Consumer

When fetching records from Kafka bootstrap servers, we can specify one consumer to subscribe to more than one topic. This can be done in the init function by passing a list of topics: ``` topics = ['kontext-events', 'other-events'] bootstrap_servers = 'localhost:9092' consumer = KafkaConsumer( *topics, client_id='local-test', bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest' ) ``` Alternatively, we can use `subscribe()` function to subscribe to multiple topics. We can also regular expressions to match topics via patterns: ``` subscribe(topics=(), pattern=None, listener=None) ```

Kontext Kontext 0 3115 3.00 index 8/23/2022

Code description

When fetching records from Kafka bootstrap servers, we can specify one consumer to subscribe to more than one topic. This can be done in the init function by passing a list of topics:

    topics = ['kontext-events', 'other-events']
        bootstrap_servers = 'localhost:9092'
        consumer = KafkaConsumer(
            *topics,
            client_id='local-test',
            bootstrap_servers=bootstrap_servers,
            auto_offset_reset='earliest'
        )

Alternatively, we can use subscribe() function to subscribe to multiple topics. We can also regular expressions to match topics via patterns:

    subscribe(topics=(), pattern=None, listener=None)  
    

Code snippet

    from kafka import KafkaConsumer
    
    
    def main():
        """
        Kafka example
        """
        topics = ['kontext-events', 'other-events']
        bootstrap_servers = 'localhost:9092'
        consumer = KafkaConsumer(
            client_id='local-test',
            bootstrap_servers=bootstrap_servers,
            auto_offset_reset='earliest'
        )
    
        consumer.subscribe(topics=topics)
    
        while True:
            print('polling...')
            records = consumer.poll(timeout_ms=1000)
            for _, consumer_records in records.items():
                # Parse records
                for consumer_record in consumer_records:
                    print(str(consumer_record.value.decode('utf-8')))
                continue
    
    
    if __name__ == '__main__':
        main()
    
kafka python

Join the Discussion

View or add your thoughts below

Comments