Collector  1
tweetoscopeCollectorParams.hpp
1 /*
2 
3  The boost library has clever tools for handling program
4  parameters. Here, for the sake of code simplification, we use a
5  custom class.
6 
7 */
8 
9 #pragma once
10 
11 #include <tuple>
12 #include <string>
13 #include <iostream>
14 #include <fstream>
15 #include <vector>
16 #include <cstddef>
17 #include <stdexcept>
18 #include <memory>
19 #include <map>
20 #include <queue>
21 #include <iomanip>
22 #include <boost/heap/binomial_heap.hpp>
23 
24 #define DURATION_END_CASCADE 1000
25 
26 namespace tweetoscope
27 {
28  namespace params
29  {
30  namespace section
31  {
36  struct Kafka
37  {
38  std::string brokers;
39  };
44  struct Topic
45  {
46  std::string in, out_series, out_properties;
47  };
48 
53  struct Times
54  {
55  std::vector<std::size_t> observation;
56  std::size_t terminated;
57  };
58 
63  struct Cascade
64  {
65 
66  std::size_t min_cascade_size;
67  };
68  }
69 
74  struct collector
75  {
76  private:
77  std::string current_section;
78 
79  std::pair<std::string, std::string> parse_value(std::istream &is)
80  {
81  char c;
82  std::string buf;
83  is >> std::ws >> c;
84  while (c == '#' || c == '[')
85  {
86  if (c == '[')
87  std::getline(is, current_section, ']'); //getline reads characters from an input stream and places them into a string
88  std::getline(is, buf, '\n'); //input-the stream to get data from str - the string to put the data into delim-the delimiter character
89  is >> std::ws >> c;
90  }
91  is.putback(c); // recupere le dernier input stream utilisé
92  std::string key, val;
93  is >> std::ws; // supprime les esapces
94  std::getline(is, key, '=');
95  is >> val;
96  std::getline(is, buf, '\n');
97  return {key, val};
98  }
99 
100  public:
101  section::Kafka kafka;
102  section::Topic topic;
103  section::Times times;
104  section::Cascade cascade;
105 
106 
112  collector(const std::string &config_filename)
113  {
114  std::ifstream ifs(config_filename.c_str()); //.c_str renvoie A pointer to the c-string representation of the string object's value.
115  if (!ifs)
116  throw std::runtime_error(std::string("Cannot open \"") + config_filename + "\" for reading parameters.");
117  ifs.exceptions(std::ios::failbit | std::ios::badbit | std::ios::eofbit);
118  try
119  {
120  while (true)
121  {
122  auto [key, val] = parse_value(ifs);
123  if (current_section == "kafka")
124  {
125  if (key == "brokers")
126  kafka.brokers = val;
127  }
128  else if (current_section == "topic")
129  {
130  if (key == "in")
131  topic.in = val;
132  else if (key == "out_series")
133  topic.out_series = val;
134  else if (key == "out_properties")
135  topic.out_properties = val;
136  }
137  else if (current_section == "times")
138  {
139  if (key == "observation")
140  times.observation.push_back(std::stoul(val));
141  else if (key == "terminated")
142  times.terminated = std::stoul(val);
143  }
144  else if (current_section == "cascade")
145  {
146  if (key == "min_cascade_size")
147  cascade.min_cascade_size = std::stoul(val);
148  }
149  }
150  }
151  catch (const std::exception &e)
152  { /* nope, end of file occurred. */
153  }
154  }
155  };
156 
164  inline std::ostream &operator<<(std::ostream &os, const collector &c)
165  {
166  os << "[kafka]" << std::endl
167  << " brokers=" << c.kafka.brokers << std::endl
168  << std::endl
169  << "[topic]" << std::endl
170  << " in=" << c.topic.in << std::endl
171  << " out_series=" << c.topic.out_series << std::endl
172  << " out_properties=" << c.topic.out_properties << std::endl
173  << std::endl
174  << "[times]" << std::endl;
175  for (auto &o : c.times.observation)
176  os << " observation=" << o << std::endl;
177  os << " terminated=" << c.times.terminated << std::endl
178  << std::endl
179  << "[cascade]" << std::endl
180  << " min_cascade_size=" << c.cascade.min_cascade_size << std::endl;
181  return os;
182  }
183  }
184 
185  using timestamp = std::size_t; // unsigned long
186 
187  namespace source
188  {
189  using idf = std::size_t;
190  }
191 
192  namespace cascade
193  {
194  using idf = std::size_t;
195  }
196 
201  struct tweet
202  {
203  std::string type = "";
204  std::string msg = "";
205  timestamp time = 0;
206  double magnitude = 0;
207  source::idf source = 0;
208  std::string info = "";
209  };
210 
217  inline std::string get_string_val(std::istream &is)
218  {
219  char c;
220  is >> c; // eats "
221  std::string value;
222  std::getline(is, value, '"'); // eats tweet", but value has tweet
223  return value;
224  }
225 
233  inline std::istream &operator>>(std::istream &is, tweet &t)
234  {
235  // A tweet is : {"type" : "tweet"|"retweet",
236  // "msg": "...",
237  // "time": timestamp,
238  // "magnitude": 1085.0,
239  // "source": 0,
240  // "info": "blabla"}
241  std::string buf;
242  char c;
243  is >> c; // eats '{'
244  is >> c; // eats '"'
245  while (c != '}')
246  {
247  std::string tag;
248  std::getline(is, tag, '"'); // Eats until next ", that is eaten but not stored into tag.
249  is >> c; // eats ":"
250  if (tag == "type")
251  t.type = get_string_val(is);
252  else if (tag == "msg")
253  t.msg = get_string_val(is);
254  else if (tag == "info")
255  t.info = get_string_val(is);
256  else if (tag == "t")
257  is >> t.time;
258  else if (tag == "m")
259  is >> t.magnitude;
260  else if (tag == "source")
261  is >> t.source;
262 
263  is >> c; // eats either } or ,
264  if (c == ',')
265  is >> c; // eats '"'
266  }
267  return is;
268  }
269 
270  namespace cascade
271  {
272 
273  // Definition of the two classes
278  class Processor;
283  class Cascade; // Class for storing cascade information.
284 
289  struct CascadeRefComparator; // Definition of a class of comparison functor for boost queues.
290 
291  // Definition of types like
292  using cascade_ref = std::shared_ptr<Cascade>;
293  using cascade_wref = std::weak_ptr<Cascade>;
294  using priority_queue = boost::heap::binomial_heap<cascade_ref, boost::heap::compare<CascadeRefComparator> >;
295  using idf = std::size_t;
296 
297  // overloading of << operator
305  std::ostream &operator<<(std::ostream &os, std::vector<std::pair<timestamp, int> > &time_magnitude)
306  {
307  os << "[";
308  auto it_time_magnitude = time_magnitude.begin();
309  while (it_time_magnitude != time_magnitude.end() - 1)
310  {
311  os << " [" << it_time_magnitude->first << ',' << it_time_magnitude->second << "] ,";
312  ++it_time_magnitude;
313  }
314  os << " [" << it_time_magnitude->first << ',' << it_time_magnitude->second << "] ";
315  os << "]";
316  return os;
317  }
318 
319  // Implementation of CascadeRefComparator class
325  {
334  bool operator()(cascade_ref ref_c1, cascade_ref ref_c2) const;
335  };
336 
337  inline bool CascadeRefComparator::operator()(cascade_ref ref_c1, cascade_ref ref_c2) const
338  {
339  return ref_c1 > ref_c2;
340  }
341 
342  // Implementation of the Cascade class
343  class Cascade
344  {
345  private:
346  // Attributes
347  std::string m_id;
348  std::string m_msg = "";
349  timestamp m_timeOfFirstTweet;
350  timestamp m_timeOfLastTweet;
351  std::vector<std::pair<timestamp, int> > m_pairsOfTimesAndMagnitudes;
352  source::idf m_source;
353 
354  // Declare friend classes
355  friend class Processor;
356  friend struct CascadeRefComaparator;
357  // Declare friend operators
358  friend std::ostream &operator<<(std::ostream &os, std::vector<std::pair<timestamp, int> > &time_magnitude);
359 
360  public:
361  // Constructors
368  Cascade(const tweet &twt, const std::string &key);
374  Cascade(const Cascade &process) = default;
380  Cascade(Cascade &&process) = default;
381 
388  Cascade &operator=(const Cascade &process) = default;
395  Cascade &operator=(Cascade &&process) = default;
396 
397  // Destructor
402  ~Cascade();
403 
404  // Methods
405  // Assessors
411  std::string getId() const;
417  std::string getMsg() const;
423  timestamp getTimeOfFirstTweet() const;
429  timestamp getTimeOfLastTweet() const;
435  std::vector<std::pair<timestamp, int> > getpairsOfTimesAndMagnitudes() const;
441  source::idf getSource() const;
442  // Others
449  void addTweetToCascade(const tweet &twt, const std::string &key);
455  void operator+=(std::pair<tweet, std::string> &elt);
463  bool operator<(const cascade_ref &ref_other_cascade) const;
464  };
465 
466  // Inlining methods of the Cascade class
467  inline std::string Cascade::getId() const { return m_id; }
468  inline std::string Cascade::getMsg() const { return m_msg; }
469  inline timestamp Cascade::getTimeOfFirstTweet() const { return m_timeOfFirstTweet; }
470  inline timestamp Cascade::getTimeOfLastTweet() const { return m_timeOfLastTweet; }
471  inline std::vector<std::pair<timestamp, int> > Cascade::getpairsOfTimesAndMagnitudes() const { return m_pairsOfTimesAndMagnitudes; }
472  inline source::idf Cascade::getSource() const { return m_source; }
473 
474  inline Cascade::Cascade(const tweet &twt, const std::string &key) : m_id(key),
475  m_msg(twt.msg),
476  m_timeOfFirstTweet(twt.time),
477  m_timeOfLastTweet(twt.time),
478  m_pairsOfTimesAndMagnitudes({std::make_pair(twt.time, twt.magnitude)}),
479  m_source(twt.source)
480  {
481  }
482 
483  inline Cascade::~Cascade() {}
484 
485  inline void Cascade::addTweetToCascade(const tweet &twt, const std::string &key)
486  {
487 
488  this->m_pairsOfTimesAndMagnitudes.push_back(std::make_pair(twt.time, twt.magnitude));
489  this->m_timeOfLastTweet = twt.time;
490  }
491 
492  inline void Cascade::operator+=(std::pair<tweet, std::string> &elt)
493  {
494  tweet &twt = elt.first;
495  std::string &key = elt.second;
496 
497  if (key == this->m_id)
498  {
499  this->m_pairsOfTimesAndMagnitudes.push_back(std::make_pair(twt.time, twt.magnitude));
500 
501  if (twt.source == this->m_source && twt.source > this->m_timeOfLastTweet)
502  {
503  this->m_timeOfLastTweet = twt.time;
504  }
505  }
506  }
507 
508  inline bool Cascade::operator<(const cascade_ref &ref_other_cascade) const
509  {
510  return m_timeOfLastTweet < ref_other_cascade->getTimeOfLastTweet();
511  }
512 
513  inline cascade_ref makeRef(tweet &twt, std::string &key)
514  {
515  return std::make_shared<Cascade>(twt, key);
516  }
517 
518  // Implementation of the Processor class
519  class Processor
520  {
521  private:
522  // Attributes
523  source::idf m_source;
524  timestamp m_sourceTime;
525  priority_queue m_priorityQueue;
526  std::map<timestamp, std::queue<cascade_wref> > m_FIFO;
527  std::map<std::string, cascade_wref> m_symbolTable;
528 
529  // Declare friend classes
530  friend class Cascade;
531  // Declare friend operators
532  friend std::ostream &operator<<(std::ostream &os, std::vector<std::pair<timestamp, int> > &time_magnitude);
533 
534  public:
535  // Constructor
541  Processor(const tweet &twt);
547  Processor(const Processor &process) = default;
553  Processor(Processor &&process) = default;
560  Processor &operator=(const Processor &process) = default;
567  Processor &operator=(Processor &&process) = default;
568 
569  // Destructor
574  ~Processor();
575 
576  // Methods
577  // Assessors
578  // Get
584  source::idf getSource() const;
590  timestamp getSourceTime() const;
596  priority_queue getPriorityQueue() const;
602  std::map<timestamp, std::queue<cascade_wref> > getFIFO() const;
608  std::map<std::string, cascade_wref> getSymbolTable() const;
609  // Set
615  void setSourceTime(const timestamp &src_time);
616  // Others
623  void addToFIFO(const int &pos, const cascade_wref &weak_ref_cascade);
630  void addToSymbolTable(const std::string &key, const cascade_wref &weak_ref_cascade);
637  auto addToPriorityQueue(const cascade_ref &sh_ref_cascade);
644  void decreasePriorityQueue(const priority_queue::handle_type &elt, const cascade_ref &sh_ref_cascade);
651  std::vector<std::string> sendPartialCascade(const std::vector<std::size_t> &obs);
659  std::vector<std::string> sendTerminatedCascade(timestamp &end_time, const std::size_t &min_size);
660  };
661 
662  // Inlining methods of the Processor class
663 
664  inline Processor::Processor(const tweet &twt) : m_source(twt.source),
665  m_sourceTime(twt.time),
666  m_priorityQueue{},
667  m_FIFO{},
668  m_symbolTable{}
669  {
670  }
671 
673 
674  inline source::idf Processor::getSource() const { return this->m_source; }
675  inline timestamp Processor::getSourceTime() const { return this->m_sourceTime; }
676  inline priority_queue Processor::getPriorityQueue() const { return this->m_priorityQueue; }
677  inline std::map<timestamp, std::queue<cascade_wref> > Processor::getFIFO() const { return this->m_FIFO; }
678  inline std::map<std::string, cascade_wref> Processor::getSymbolTable() const { return this->m_symbolTable; }
679 
680  inline void Processor::setSourceTime(const timestamp &src_time) { this->m_sourceTime = src_time; }
681 
682  inline void Processor::addToFIFO(const int &t_obs, const cascade_wref &weak_ref_cascade)
683  {
684  this->m_FIFO[t_obs].push(weak_ref_cascade);
685  }
686 
687  inline void Processor::addToSymbolTable(const std::string &key, const cascade_wref &weak_ref_cascade)
688  {
689  this->m_symbolTable.insert(std::make_pair(key, weak_ref_cascade));
690  }
691 
692  inline auto Processor::addToPriorityQueue(const cascade_ref &sh_ref_cascade)
693  {
694  return this->m_priorityQueue.push(sh_ref_cascade);
695  }
696 
697  inline void Processor::decreasePriorityQueue(const priority_queue::handle_type &elt, const cascade_ref &sh_ref_cascade)
698  {
699  this->m_priorityQueue.decrease(elt, sh_ref_cascade);
700  }
701 
702  inline std::vector<std::string> Processor::sendPartialCascade(const std::vector<std::size_t> &obs)
703  {
704  //obs is a vector of time to send the cascade
705  std::vector<std::string> seriesToSend;
706  for (auto &t_obs : obs)
707  {
708  std::vector<std::string> ids;
709  if (!this->m_FIFO[t_obs].empty())
710  {
711  cascade_wref wRefCascade = this->m_FIFO[t_obs].front(); // Take the last element of the FIFO
712  auto currentCascade = wRefCascade.lock(); // Take a weak pointer on it to be sure the shared pointer exists
713  // loop while time beetwen the source time and time of the last tweet
714  // is still higher than observation time
715  if (currentCascade == 0)
716  {
717  break;
718  }
719  while ((this->m_sourceTime - currentCascade->m_timeOfFirstTweet) >= t_obs)
720  {
721  auto it = currentCascade->m_pairsOfTimesAndMagnitudes.begin();
722  std::vector<std::pair<timestamp, int> > partialPairsTimesMagnitudes;
723  while ((it->first - currentCascade->m_timeOfFirstTweet <= t_obs) & (it != currentCascade->m_pairsOfTimesAndMagnitudes.end()))
724  {
725  partialPairsTimesMagnitudes.push_back(*it);
726  ++it;
727  }
728  std::ostringstream os;
729  os << "{"
730  << "\"type\" : \"serie\""
731  << ", \"cid\" : " << currentCascade->getId()
732  << ", \"msg\": \"" << currentCascade->getMsg() << '"'
733  << ", \"T_obs\" : " << t_obs
734  << ",\"tweets\" :" << partialPairsTimesMagnitudes
735  << '}';
736 
737  std::string msg_series = os.str();
738  // Check if the key is not duplicated
739 
740  if (std::count(ids.begin(), ids.end(), currentCascade->m_id))
741  {
742  std::cout << "Duplicated key : " << currentCascade->getId() << " , T_obs : " << t_obs << std::endl;
743  }
744  else
745  {
746  std::cout << msg_series << std::endl;
747  seriesToSend.push_back(msg_series);
748  ids.push_back(currentCascade->m_id);
749  }
750  this->m_FIFO[t_obs].pop();
751  if (!(this->m_FIFO[t_obs].empty()))
752  {
753  wRefCascade = this->m_FIFO[t_obs].front();
754  auto currentCascade = wRefCascade.lock();
755  }
756  else
757  {
758  break;
759  }
760  }
761  }
762  }
763  return seriesToSend;
764  }
765 
766  inline std::vector<std::string> Processor::sendTerminatedCascade(timestamp &end_time, const std::size_t &min_size)
767  {
768  std::vector<std::string> propertiesToSend;
769  // First check that priorityqueue is not empty
770  if (!(this->m_priorityQueue.empty()))
771  {
772  auto topCascade = this->m_priorityQueue.top();
773  while ((this->m_sourceTime - topCascade->m_timeOfLastTweet) > end_time)
774  {
775  // Check the size of the cascade is greater than the min size required
776  // So, it can determine if the cascade should be considered
777  if (topCascade->m_pairsOfTimesAndMagnitudes.size() > min_size)
778  {
779  std::ostringstream os;
780  os << "{"
781  << "\"type\" : \"size\""
782  << ", \"cid\" : " << topCascade->m_id
783  << ", \"n_tot\": \"" << topCascade->m_pairsOfTimesAndMagnitudes.size() << '"'
784  << ", \"t_end\" : " << topCascade->m_timeOfLastTweet
785  << '}';
786  // Add porperties to the whole message and pop the last element of the queue
787  std::string msg_properties = os.str();
788  propertiesToSend.push_back(msg_properties);
789  }
790  this->m_priorityQueue.pop();
791  // if priority queue is not empty, one affects another one cascade
792  // which is at the top of que priority queue
793  if (!(this->m_priorityQueue.empty()))
794  {
795  topCascade = m_priorityQueue.top();
796  }
797  else
798  {
799  break;
800  }
801  }
802  }
803  return propertiesToSend;
804  }
805 
806  } //End of namespace cascade
807 
808 }
Definition: tweetoscopeCollectorParams.hpp:344
Cascade & operator=(Cascade &&process)=default
Overload operateur = to construct a new Cascade object by recopying the right value.
std::vector< std::pair< timestamp, int > > getpairsOfTimesAndMagnitudes() const
Get the pair (Times, magnitude) from a Cascade object.
Definition: tweetoscopeCollectorParams.hpp:471
bool operator<(const cascade_ref &ref_other_cascade) const
Overload operator < to compare the cascade object from its sharepointer reference with another refere...
Definition: tweetoscopeCollectorParams.hpp:508
void operator+=(std::pair< tweet, std::string > &elt)
Overload operator += to add a tweet and its key to a cascade.
Definition: tweetoscopeCollectorParams.hpp:492
Cascade(Cascade &&process)=default
Construct a new Cascade object by displacement (move)
void addTweetToCascade(const tweet &twt, const std::string &key)
Add a tweet to a cascade object.
Definition: tweetoscopeCollectorParams.hpp:485
std::string getMsg() const
Get the Msg object.
Definition: tweetoscopeCollectorParams.hpp:468
~Cascade()
Destroy the Cascade object.
Definition: tweetoscopeCollectorParams.hpp:483
Cascade & operator=(const Cascade &process)=default
Overload operateur = to construct a new Cascade object by recopy.
timestamp getTimeOfFirstTweet() const
Get the Time Of First Tweet object.
Definition: tweetoscopeCollectorParams.hpp:469
std::string getId() const
Get the Id object.
Definition: tweetoscopeCollectorParams.hpp:467
Cascade(const tweet &twt, const std::string &key)
Construct a new Cascade object from a tweet (std::string) and a key (std::string)
Definition: tweetoscopeCollectorParams.hpp:474
friend std::ostream & operator<<(std::ostream &os, std::vector< std::pair< timestamp, int > > &time_magnitude)
Overload operator << to add time and magnitude to an output to print.
Definition: tweetoscopeCollectorParams.hpp:305
source::idf getSource() const
Get the Source object.
Definition: tweetoscopeCollectorParams.hpp:472
timestamp getTimeOfLastTweet() const
Get the Time Of Last Tweet object.
Definition: tweetoscopeCollectorParams.hpp:470
Cascade(const Cascade &process)=default
Construct a new Cascade object by copy.
Definition: tweetoscopeCollectorParams.hpp:520
void addToSymbolTable(const std::string &key, const cascade_wref &weak_ref_cascade)
Add a cascade of a tweet to the Symbole Table from its weak reference and the key of the tweet.
Definition: tweetoscopeCollectorParams.hpp:687
void addToFIFO(const int &pos, const cascade_wref &weak_ref_cascade)
Add a cascade of a tweet to the FIFO from its weak reference.
Definition: tweetoscopeCollectorParams.hpp:682
Processor(const Processor &process)=default
Construct a new Processor object by copy.
std::vector< std::string > sendTerminatedCascade(timestamp &end_time, const std::size_t &min_size)
Method to send properties which correspond to a terminated cascade.
Definition: tweetoscopeCollectorParams.hpp:766
std::vector< std::string > sendPartialCascade(const std::vector< std::size_t > &obs)
Method to send a serie which is a partial cascade.
Definition: tweetoscopeCollectorParams.hpp:702
std::map< std::string, cascade_wref > getSymbolTable() const
Get the Symbol Table object.
Definition: tweetoscopeCollectorParams.hpp:678
source::idf getSource() const
Get the Source object.
Definition: tweetoscopeCollectorParams.hpp:674
priority_queue getPriorityQueue() const
Get the Priority Queue object.
Definition: tweetoscopeCollectorParams.hpp:676
~Processor()
Destroy the Processor object.
Definition: tweetoscopeCollectorParams.hpp:672
Processor & operator=(Processor &&process)=default
Overload operateur = to construct a new Cascade object by recopying its right value.
void setSourceTime(const timestamp &src_time)
Set the Source Time object.
Definition: tweetoscopeCollectorParams.hpp:680
auto addToPriorityQueue(const cascade_ref &sh_ref_cascade)
Add a cascade of a tweet to the Priority Queue from its the share pointer of the reference.
Definition: tweetoscopeCollectorParams.hpp:692
std::map< timestamp, std::queue< cascade_wref > > getFIFO() const
Get the FIFO object.
Definition: tweetoscopeCollectorParams.hpp:677
timestamp getSourceTime() const
Get the Source Time object.
Definition: tweetoscopeCollectorParams.hpp:675
Processor(Processor &&process)=default
Construct a new Processor object by displacement.
friend std::ostream & operator<<(std::ostream &os, std::vector< std::pair< timestamp, int > > &time_magnitude)
Overload operator << to add time and magnitude to an output to print.
Definition: tweetoscopeCollectorParams.hpp:305
Processor & operator=(const Processor &process)=default
Overload operateur = to construct a new Cascade object by recopy.
Processor(const tweet &twt)
Construct a new Processor object from a tweet.
Definition: tweetoscopeCollectorParams.hpp:664
void decreasePriorityQueue(const priority_queue::handle_type &elt, const cascade_ref &sh_ref_cascade)
Remove a Cascade of the Priority Queue from the share pointer that references the cascade object.
Definition: tweetoscopeCollectorParams.hpp:697
CascadeRefComparator class.
Definition: tweetoscopeCollectorParams.hpp:325
bool operator()(cascade_ref ref_c1, cascade_ref ref_c2) const
Overload () operator.
Definition: tweetoscopeCollectorParams.hpp:337
collector class
Definition: tweetoscopeCollectorParams.hpp:75
collector(const std::string &config_filename)
Construct a new collector object.
Definition: tweetoscopeCollectorParams.hpp:112
Cascade class.
Definition: tweetoscopeCollectorParams.hpp:64
Kafka class.
Definition: tweetoscopeCollectorParams.hpp:37
Times class.
Definition: tweetoscopeCollectorParams.hpp:54
Topic class.
Definition: tweetoscopeCollectorParams.hpp:45
Structure tweet, objet with tweet parameters.
Definition: tweetoscopeCollectorParams.hpp:202