22 #include <boost/heap/binomial_heap.hpp>
24 #define DURATION_END_CASCADE 1000
46 std::string in, out_series, out_properties;
55 std::vector<std::size_t> observation;
56 std::size_t terminated;
66 std::size_t min_cascade_size;
77 std::string current_section;
79 std::pair<std::string, std::string> parse_value(std::istream &is)
84 while (c ==
'#' || c ==
'[')
87 std::getline(is, current_section,
']');
88 std::getline(is, buf,
'\n');
94 std::getline(is, key,
'=');
96 std::getline(is, buf,
'\n');
114 std::ifstream ifs(config_filename.c_str());
116 throw std::runtime_error(std::string(
"Cannot open \"") + config_filename +
"\" for reading parameters.");
117 ifs.exceptions(std::ios::failbit | std::ios::badbit | std::ios::eofbit);
122 auto [key, val] = parse_value(ifs);
123 if (current_section ==
"kafka")
125 if (key ==
"brokers")
128 else if (current_section ==
"topic")
132 else if (key ==
"out_series")
133 topic.out_series = val;
134 else if (key ==
"out_properties")
135 topic.out_properties = val;
137 else if (current_section ==
"times")
139 if (key ==
"observation")
140 times.observation.push_back(std::stoul(val));
141 else if (key ==
"terminated")
142 times.terminated = std::stoul(val);
144 else if (current_section ==
"cascade")
146 if (key ==
"min_cascade_size")
147 cascade.min_cascade_size = std::stoul(val);
151 catch (
const std::exception &e)
164 inline std::ostream &operator<<(std::ostream &os,
const collector &c)
166 os <<
"[kafka]" << std::endl
167 <<
" brokers=" << c.kafka.brokers << std::endl
169 <<
"[topic]" << std::endl
170 <<
" in=" << c.topic.in << std::endl
171 <<
" out_series=" << c.topic.out_series << std::endl
172 <<
" out_properties=" << c.topic.out_properties << std::endl
174 <<
"[times]" << std::endl;
175 for (
auto &o : c.times.observation)
176 os <<
" observation=" << o << std::endl;
177 os <<
" terminated=" << c.times.terminated << std::endl
179 <<
"[cascade]" << std::endl
180 <<
" min_cascade_size=" << c.cascade.min_cascade_size << std::endl;
185 using timestamp = std::size_t;
189 using idf = std::size_t;
194 using idf = std::size_t;
203 std::string type =
"";
204 std::string msg =
"";
206 double magnitude = 0;
207 source::idf source = 0;
208 std::string info =
"";
217 inline std::string get_string_val(std::istream &is)
222 std::getline(is, value,
'"');
233 inline std::istream &operator>>(std::istream &is, tweet &t)
248 std::getline(is, tag,
'"');
251 t.type = get_string_val(is);
252 else if (tag ==
"msg")
253 t.msg = get_string_val(is);
254 else if (tag ==
"info")
255 t.info = get_string_val(is);
260 else if (tag ==
"source")
289 struct CascadeRefComparator;
292 using cascade_ref = std::shared_ptr<Cascade>;
293 using cascade_wref = std::weak_ptr<Cascade>;
294 using priority_queue = boost::heap::binomial_heap<cascade_ref, boost::heap::compare<CascadeRefComparator> >;
295 using idf = std::size_t;
305 std::ostream &operator<<(std::ostream &os, std::vector<std::pair<timestamp, int> > &time_magnitude)
308 auto it_time_magnitude = time_magnitude.begin();
309 while (it_time_magnitude != time_magnitude.end() - 1)
311 os <<
" [" << it_time_magnitude->first <<
',' << it_time_magnitude->second <<
"] ,";
314 os <<
" [" << it_time_magnitude->first <<
',' << it_time_magnitude->second <<
"] ";
334 bool operator()(cascade_ref ref_c1, cascade_ref ref_c2)
const;
339 return ref_c1 > ref_c2;
348 std::string m_msg =
"";
349 timestamp m_timeOfFirstTweet;
350 timestamp m_timeOfLastTweet;
351 std::vector<std::pair<timestamp, int> > m_pairsOfTimesAndMagnitudes;
352 source::idf m_source;
356 friend struct CascadeRefComaparator;
358 friend std::ostream &
operator<<(std::ostream &os, std::vector<std::pair<timestamp, int> > &time_magnitude);
411 std::string
getId()
const;
417 std::string
getMsg()
const;
455 void operator+=(std::pair<tweet, std::string> &elt);
463 bool operator<(
const cascade_ref &ref_other_cascade)
const;
476 m_timeOfFirstTweet(twt.time),
477 m_timeOfLastTweet(twt.time),
478 m_pairsOfTimesAndMagnitudes({std::make_pair(twt.time, twt.magnitude)}),
488 this->m_pairsOfTimesAndMagnitudes.push_back(std::make_pair(twt.time, twt.magnitude));
489 this->m_timeOfLastTweet = twt.time;
494 tweet &twt = elt.first;
495 std::string &key = elt.second;
497 if (key == this->m_id)
499 this->m_pairsOfTimesAndMagnitudes.push_back(std::make_pair(twt.time, twt.magnitude));
501 if (twt.source == this->m_source && twt.source > this->m_timeOfLastTweet)
503 this->m_timeOfLastTweet = twt.time;
510 return m_timeOfLastTweet < ref_other_cascade->getTimeOfLastTweet();
513 inline cascade_ref makeRef(
tweet &twt, std::string &key)
515 return std::make_shared<Cascade>(twt, key);
523 source::idf m_source;
524 timestamp m_sourceTime;
525 priority_queue m_priorityQueue;
526 std::map<timestamp, std::queue<cascade_wref> > m_FIFO;
527 std::map<std::string, cascade_wref> m_symbolTable;
532 friend std::ostream &
operator<<(std::ostream &os, std::vector<std::pair<timestamp, int> > &time_magnitude);
602 std::map<timestamp, std::queue<cascade_wref> >
getFIFO()
const;
623 void addToFIFO(
const int &pos,
const cascade_wref &weak_ref_cascade);
630 void addToSymbolTable(
const std::string &key,
const cascade_wref &weak_ref_cascade);
644 void decreasePriorityQueue(
const priority_queue::handle_type &elt,
const cascade_ref &sh_ref_cascade);
665 m_sourceTime(twt.time),
677 inline std::map<timestamp, std::queue<cascade_wref> >
Processor::getFIFO()
const {
return this->m_FIFO; }
684 this->m_FIFO[t_obs].push(weak_ref_cascade);
689 this->m_symbolTable.insert(std::make_pair(key, weak_ref_cascade));
694 return this->m_priorityQueue.push(sh_ref_cascade);
699 this->m_priorityQueue.decrease(elt, sh_ref_cascade);
705 std::vector<std::string> seriesToSend;
706 for (
auto &t_obs : obs)
708 std::vector<std::string> ids;
709 if (!this->m_FIFO[t_obs].empty())
711 cascade_wref wRefCascade = this->m_FIFO[t_obs].front();
712 auto currentCascade = wRefCascade.lock();
715 if (currentCascade == 0)
719 while ((this->m_sourceTime - currentCascade->m_timeOfFirstTweet) >= t_obs)
721 auto it = currentCascade->m_pairsOfTimesAndMagnitudes.begin();
722 std::vector<std::pair<timestamp, int> > partialPairsTimesMagnitudes;
723 while ((it->first - currentCascade->m_timeOfFirstTweet <= t_obs) & (it != currentCascade->m_pairsOfTimesAndMagnitudes.end()))
725 partialPairsTimesMagnitudes.push_back(*it);
728 std::ostringstream os;
730 <<
"\"type\" : \"serie\""
731 <<
", \"cid\" : " << currentCascade->getId()
732 <<
", \"msg\": \"" << currentCascade->getMsg() <<
'"'
733 <<
", \"T_obs\" : " << t_obs
734 <<
",\"tweets\" :" << partialPairsTimesMagnitudes
737 std::string msg_series = os.str();
740 if (std::count(ids.begin(), ids.end(), currentCascade->m_id))
742 std::cout <<
"Duplicated key : " << currentCascade->getId() <<
" , T_obs : " << t_obs << std::endl;
746 std::cout << msg_series << std::endl;
747 seriesToSend.push_back(msg_series);
748 ids.push_back(currentCascade->m_id);
750 this->m_FIFO[t_obs].pop();
751 if (!(this->m_FIFO[t_obs].empty()))
753 wRefCascade = this->m_FIFO[t_obs].front();
754 auto currentCascade = wRefCascade.lock();
768 std::vector<std::string> propertiesToSend;
770 if (!(this->m_priorityQueue.empty()))
772 auto topCascade = this->m_priorityQueue.top();
773 while ((this->m_sourceTime - topCascade->m_timeOfLastTweet) > end_time)
777 if (topCascade->m_pairsOfTimesAndMagnitudes.size() > min_size)
779 std::ostringstream os;
781 <<
"\"type\" : \"size\""
782 <<
", \"cid\" : " << topCascade->m_id
783 <<
", \"n_tot\": \"" << topCascade->m_pairsOfTimesAndMagnitudes.size() <<
'"'
784 <<
", \"t_end\" : " << topCascade->m_timeOfLastTweet
787 std::string msg_properties = os.str();
788 propertiesToSend.push_back(msg_properties);
790 this->m_priorityQueue.pop();
793 if (!(this->m_priorityQueue.empty()))
795 topCascade = m_priorityQueue.top();
803 return propertiesToSend;