Coverage for Python_files/kafkaProducer.py: 0%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

15 statements  

1import argparse # To parse command line arguments 

2import json # To parse and dump JSON 

3from kafka import KafkaProducer # Import Kafka producder 

4 

5 

6import time 

7 

8topic_writing="tweets" 

9 

10parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter) 

11parser.add_argument('--broker-list', type=str, required=True, help="the broker list") 

12args = parser.parse_args() # Parse arguments 

13 

14producer = KafkaProducer( 

15 bootstrap_servers = args.broker_list, # List of brokers passed from the command line 

16 value_serializer=lambda v: json.dumps(v).encode('utf-8'), # How to serialize the value to a binary buffer 

17 key_serializer=str.encode # How to serialize the key 

18) 

19 

20msg = { 

21 'dst': 'Metz', 

22 'temp': 2, 

23 'type': 'rain', 

24 'comment': 'Nothing special' 

25} 

26for _ in range(5000): 

27 print("Message : ",_, ", temps(s) : ",_*5) 

28 producer.send(topic_writing, key = msg['dst'], value = msg) # Send a new message to topic 

29 time.sleep(5) 

30 

31producer.flush() # Flush: force purging intermediate buffers before leaving