Coverage for Python_files/kafkaConsumer.py: 0%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import argparse # To parse command line arguments
2import json # To parse and dump JSON
3from kafka import KafkaConsumer # Import Kafka consumer
5topic_lecture="tweets"
9## we have to put in the terminal the port
10parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
11parser.add_argument('--broker-list', type=str, required=True, help="the broker list")
12args = parser.parse_args() # Parse arguments
14consumer = KafkaConsumer(topic_lecture, # Topic name
15 bootstrap_servers = args.broker_list, # List of brokers passed from the command line
16 value_deserializer=lambda v: json.loads(v.decode('utf-8')), # How to deserialize the value from a binary buffer
17 key_deserializer= lambda v: v.decode(), # How to deserialize the key (if any)
18 group_id = "groupe1"
19)
21for msg in consumer: # Blocking call waiting for a new message
22 print (f"msg: ({msg.key}, {msg.value})") # Write key and payload of the received message