Coverage for Python_files/predictor.py: 0%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2The aim of this code is predict the number of retweet thanks to the estimated
3parameters.
4"""
7import argparse # To parse command line arguments
8import json # To parse and dump JSON
9from kafka import KafkaConsumer # Import Kafka consumer
10from kafka import KafkaProducer # Import Kafka producer
11import numpy as np
13import predictor_tools as prd
14import logger
16if __name__=="__main__":
18 logger = logger.get_logger('predictor', broker_list="localhost::9092",debug=True)
19 ################################################
20 ####### Kafka Part ########
21 ################################################
23 topic_reading="cascade_properties"
24 topic_writing_sample="samples"
25 topic_writing_alert="alerts"
26 topic_writing_stats="stats"
28 logger.info("Setting up kafka consumer & producer for predictor part...")
31 parser = argparse.ArgumentParser()
32 parser.add_argument('--broker-list', type=str, help="the broker list", default="localhost:9092")
33 args = parser.parse_args() # Parse arguments
35 consumer = KafkaConsumer(topic_reading, # Topic name
36 bootstrap_servers = args.broker_list, # List of brokers passed from the command line
37 value_deserializer=lambda v: json.loads(v.decode('utf-8')), # How to deserialize the value from a binary buffer
38 key_deserializer= lambda v: v.decode() # How to deserialize the key (if any)
39 )
41 producer = KafkaProducer(
42 bootstrap_servers = args.broker_list, # List of brokers passed from the command line
43 value_serializer=lambda v: json.dumps(v).encode('utf-8'), # How to serialize the value to a binary buffer
44 key_serializer=str.encode # How to serialize the key
45 )
49 ################################################
50 ##### Prediction Part #####
51 ################################################
52 logger.info("Start reading in cascade properties topic...")
53 for msg in consumer :
54 msg=msg.value # which will be remplaced by our object in a near future
55 my_params=np.array([msg["p"],msg["beta"]])
56 cid=msg["cid"]
58 logger.info(f"Predictions computation for {cid} ...")
59 # modifier predictions afin d'avoir G1 en valeur de sortie aussi et N_star
60 N,N_star,G1= prd.predictions(params=my_params, history = msg["tweets"], alpha=2.016,mu=1)
62 send_sample= {
63 'type': 'sample',
64 'cid': cid,
65 'params': my_params,
66 'X': [msg["beta"],N_star,G1],
67 'W' : 1,## équation à tourner isoler le facteur comme correction de l'erreur
68 }
69 producer.send(topic_writing_sample, key =msg["T_obs"], value = send_sample)
71 # to be tuned to make it nicer
72 send_alert={
73 'type': 'alert',
74 'to display' :'very hot topic, follow up with it',
75 'cid': cid,
76 'n_tot': N,
77 }
79 producer.send(topic_writing_alert, key =msg["T_obs"], value = send_alert)
82 error = 0 # to be implemented
83 send_stats={
84 'type': 'stats',
85 'cid' :cid,
86 'T_obs': msg["T_obs"],
87 'ARE' : error,
88 }
89 producer.send(topic_writing_stats, key =None, value = send_stats)
90 logger.info(f"Messages sended post predictions for {cid}...")