Coverage for Python_files/hawkes_estimator.py: 0%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2The aim of this code is to estimate the cascade's parameters.
3"""
7import argparse # To parse command line arguments
8import json # To parse and dump JSON
9from kafka import KafkaConsumer # Import Kafka consumer
10from kafka import KafkaProducer # Import Kafka producer
11import os
12import numpy as np
14import hawkes_tools as HT
15import logger
19if __name__=="__main__" :
21 logger = logger.get_logger('estimator', broker_list="localhost::9092",debug=True)
23 ################################################
24 ####### Kafka Part ########
25 ################################################
27 topic_reading="cascade_series"
28 topic_writing="cascade_properties"
31 ## default value without typing anything in the terminal
32 parser = argparse.ArgumentParser()
33 parser.add_argument('--broker-list', type=str, help="the broker list", default="localhost:9092")
34 args = parser.parse_args() # Parse arguments
37 consumer = KafkaConsumer(topic_reading, # Topic name
38 bootstrap_servers = args.broker_list, # List of brokers passed from the command line
39 value_deserializer=lambda v: json.loads(v.decode('utf-8')), # How to deserialize the value from a binary buffer
40 key_deserializer= lambda v: v.decode() # How to deserialize the key (if any)
41 )
43 producer = KafkaProducer(
44 bootstrap_servers = args.broker_list, # List of brokers passed from the command line
45 value_serializer=lambda v: json.dumps(v).encode('utf-8'), # How to serialize the value to a binary buffer
46 key_serializer=str.encode # How to serialize the key
47 )
49 ################################################
50 ####### Stats part ########
51 ################################################
53 # Constants given by Mishra et al
54 mu,alpha=1 , 2.016
55 logger.info("Start reading in cascade serie topic...")
56 # for i in range(0,10):
57 # cascade=np.load(f"Python_files/Cascades/test_cascade_{i}.npy")
59 # dico= {
60 # "cid": i ,
61 # "tweets" : cascade,
62 # "T_obs" : cascade[-1,0],
63 # }
65 for msg in consumer :
66 # I'll construct a cascade object thanks to msg
67 cid=msg.value["cid"]
68 logger.info(f"Map computation for {cid} ...")
69 MAP_res=HT.compute_MAP(history=msg.value['tweets'],t=msg.value['T_obs'],alpha=alpha, mu=mu)
70 p,beta=MAP_res[-1]
71 my_params=[p,beta]
73 send ={
74 'type': 'parameters',
75 'n_obs' : msg.value["T_obs"],
76 'n_supp' : 0,
77 'params' : my_params,
78 }
79 logger.info(f"Sending estimated parameter for {cid}...")
80 producer.send(topic_writing, key = msg.value['T_obs'], value = send)