Coverage for Python_files/kafkaProducer.py: 0%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import argparse # To parse command line arguments
2import json # To parse and dump JSON
3from kafka import KafkaProducer # Import Kafka producder
6import time
8topic_writing="tweets"
10parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
11parser.add_argument('--broker-list', type=str, required=True, help="the broker list")
12args = parser.parse_args() # Parse arguments
14producer = KafkaProducer(
15 bootstrap_servers = args.broker_list, # List of brokers passed from the command line
16 value_serializer=lambda v: json.dumps(v).encode('utf-8'), # How to serialize the value to a binary buffer
17 key_serializer=str.encode # How to serialize the key
18)
20msg = {
21 'dst': 'Metz',
22 'temp': 2,
23 'type': 'rain',
24 'comment': 'Nothing special'
25}
26for _ in range(5000):
27 print("Message : ",_, ", temps(s) : ",_*5)
28 producer.send(topic_writing, key = msg['dst'], value = msg) # Send a new message to topic
29 time.sleep(5)
31producer.flush() # Flush: force purging intermediate buffers before leaving