Coverage for Python_files/kafkaConsumer.py: 0%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

10 statements  

1import argparse # To parse command line arguments 

2import json # To parse and dump JSON 

3from kafka import KafkaConsumer # Import Kafka consumer 

4 

5topic_lecture="tweets" 

6 

7 

8 

9## we have to put in the terminal the port  

10parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter) 

11parser.add_argument('--broker-list', type=str, required=True, help="the broker list") 

12args = parser.parse_args() # Parse arguments 

13 

14consumer = KafkaConsumer(topic_lecture, # Topic name 

15 bootstrap_servers = args.broker_list, # List of brokers passed from the command line 

16 value_deserializer=lambda v: json.loads(v.decode('utf-8')), # How to deserialize the value from a binary buffer 

17 key_deserializer= lambda v: v.decode(), # How to deserialize the key (if any) 

18 group_id = "groupe1" 

19) 

20 

21for msg in consumer: # Blocking call waiting for a new message 

22 print (f"msg: ({msg.key}, {msg.value})") # Write key and payload of the received message