Coverage for Python_files/hawkes_estimator.py: 0%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

28 statements  

1""" 

2The aim of this code is to estimate the cascade's parameters.  

3""" 

4 

5 

6 

7import argparse # To parse command line arguments 

8import json # To parse and dump JSON 

9from kafka import KafkaConsumer # Import Kafka consumer 

10from kafka import KafkaProducer # Import Kafka producer 

11import os 

12import numpy as np 

13 

14import hawkes_tools as HT 

15import logger 

16 

17 

18 

19if __name__=="__main__" : 

20 

21 logger = logger.get_logger('estimator', broker_list="localhost::9092",debug=True) 

22 

23 ################################################ 

24 ####### Kafka Part ######## 

25 ################################################ 

26 

27 topic_reading="cascade_series" 

28 topic_writing="cascade_properties" 

29 

30 

31 ## default value without typing anything in the terminal 

32 parser = argparse.ArgumentParser() 

33 parser.add_argument('--broker-list', type=str, help="the broker list", default="localhost:9092") 

34 args = parser.parse_args() # Parse arguments 

35 

36 

37 consumer = KafkaConsumer(topic_reading, # Topic name 

38 bootstrap_servers = args.broker_list, # List of brokers passed from the command line 

39 value_deserializer=lambda v: json.loads(v.decode('utf-8')), # How to deserialize the value from a binary buffer 

40 key_deserializer= lambda v: v.decode() # How to deserialize the key (if any) 

41 ) 

42 

43 producer = KafkaProducer( 

44 bootstrap_servers = args.broker_list, # List of brokers passed from the command line 

45 value_serializer=lambda v: json.dumps(v).encode('utf-8'), # How to serialize the value to a binary buffer 

46 key_serializer=str.encode # How to serialize the key 

47 ) 

48 

49 ################################################ 

50 ####### Stats part ######## 

51 ################################################ 

52 

53 # Constants given by Mishra et al  

54 mu,alpha=1 , 2.016 

55 logger.info("Start reading in cascade serie topic...") 

56 # for i in range(0,10):  

57 # cascade=np.load(f"Python_files/Cascades/test_cascade_{i}.npy") 

58 

59 # dico= { 

60 # "cid": i , 

61 # "tweets" : cascade, 

62 # "T_obs" : cascade[-1,0], 

63 # } 

64 

65 for msg in consumer : 

66 # I'll construct a cascade object thanks to msg 

67 cid=msg.value["cid"] 

68 logger.info(f"Map computation for {cid} ...") 

69 MAP_res=HT.compute_MAP(history=msg.value['tweets'],t=msg.value['T_obs'],alpha=alpha, mu=mu) 

70 p,beta=MAP_res[-1] 

71 my_params=[p,beta] 

72 

73 send ={ 

74 'type': 'parameters', 

75 'n_obs' : msg.value["T_obs"], 

76 'n_supp' : 0, 

77 'params' : my_params, 

78 } 

79 logger.info(f"Sending estimated parameter for {cid}...") 

80 producer.send(topic_writing, key = msg.value['T_obs'], value = send) 

81 

82