Coverage for Python_files/predictor.py: 0%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

34 statements  

1""" 

2The aim of this code is predict the number of retweet thanks to the estimated  

3parameters. 

4""" 

5 

6 

7import argparse # To parse command line arguments 

8import json # To parse and dump JSON 

9from kafka import KafkaConsumer # Import Kafka consumer 

10from kafka import KafkaProducer # Import Kafka producer 

11import numpy as np 

12 

13import predictor_tools as prd 

14import logger 

15 

16if __name__=="__main__": 

17 

18 logger = logger.get_logger('predictor', broker_list="localhost::9092",debug=True) 

19 ################################################ 

20 ####### Kafka Part ######## 

21 ################################################ 

22 

23 topic_reading="cascade_properties" 

24 topic_writing_sample="samples" 

25 topic_writing_alert="alerts" 

26 topic_writing_stats="stats" 

27 

28 logger.info("Setting up kafka consumer & producer for predictor part...") 

29 

30 

31 parser = argparse.ArgumentParser() 

32 parser.add_argument('--broker-list', type=str, help="the broker list", default="localhost:9092") 

33 args = parser.parse_args() # Parse arguments 

34 

35 consumer = KafkaConsumer(topic_reading, # Topic name 

36 bootstrap_servers = args.broker_list, # List of brokers passed from the command line 

37 value_deserializer=lambda v: json.loads(v.decode('utf-8')), # How to deserialize the value from a binary buffer 

38 key_deserializer= lambda v: v.decode() # How to deserialize the key (if any) 

39 ) 

40 

41 producer = KafkaProducer( 

42 bootstrap_servers = args.broker_list, # List of brokers passed from the command line 

43 value_serializer=lambda v: json.dumps(v).encode('utf-8'), # How to serialize the value to a binary buffer 

44 key_serializer=str.encode # How to serialize the key 

45 ) 

46 

47 

48 

49 ################################################ 

50 ##### Prediction Part ##### 

51 ################################################ 

52 logger.info("Start reading in cascade properties topic...") 

53 for msg in consumer : 

54 msg=msg.value # which will be remplaced by our object in a near future  

55 my_params=np.array([msg["p"],msg["beta"]]) 

56 cid=msg["cid"] 

57 

58 logger.info(f"Predictions computation for {cid} ...") 

59 # modifier predictions afin d'avoir G1 en valeur de sortie aussi et N_star 

60 N,N_star,G1= prd.predictions(params=my_params, history = msg["tweets"], alpha=2.016,mu=1) 

61 

62 send_sample= { 

63 'type': 'sample', 

64 'cid': cid, 

65 'params': my_params, 

66 'X': [msg["beta"],N_star,G1], 

67 'W' : 1,## équation à tourner isoler le facteur comme correction de l'erreur 

68 } 

69 producer.send(topic_writing_sample, key =msg["T_obs"], value = send_sample) 

70 

71 # to be tuned to make it nicer 

72 send_alert={ 

73 'type': 'alert', 

74 'to display' :'very hot topic, follow up with it', 

75 'cid': cid, 

76 'n_tot': N, 

77 } 

78 

79 producer.send(topic_writing_alert, key =msg["T_obs"], value = send_alert) 

80 

81 

82 error = 0 # to be implemented 

83 send_stats={ 

84 'type': 'stats', 

85 'cid' :cid, 

86 'T_obs': msg["T_obs"], 

87 'ARE' : error, 

88 } 

89 producer.send(topic_writing_stats, key =None, value = send_stats) 

90 logger.info(f"Messages sended post predictions for {cid}...")