Coverage for Python_files/logger.py: 0%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import argparse, os, atexit
2import logging
3import json
4import time
5import textwrap
6from termcolor import colored
8from kafka import KafkaProducer, KafkaConsumer
10default_broker_list = "localhost:9091,localhost:9092"
11default_log_topic = "logs"
13class KafkaHandler(logging.Handler):
14 """Class to instantiate the kafka logging facility."""
16 def __init__(self, hostlist, topic='logs', tls=None):
17 """Initialize an instance of the kafka handler."""
18 logging.Handler.__init__(self)
19 self.producer = KafkaProducer(bootstrap_servers=hostlist,
20 value_serializer=lambda v: json.dumps(v).encode('utf-8'),
21 linger_ms=10)
22 self.topic = topic
23 self.record = None
26 def emit(self, record):
27 """Emit the provided record to the kafka_client producer."""
28 # drop kafka logging to avoid infinite recursion
29 if 'kafka.' in record.name:
30 return
32 try:
33 # apply the logger formatter
34 msg = self.format(record)
36 self.producer.send(self.topic, {
37 't': int(time.time()),
38 'source': record.name,
39 'level': record.levelname,
40 'message': msg})
41 self.flush(timeout=1.0)
42 except Exception:
43 logging.Handler.handleError(self, record)
45 def flush(self, timeout=None):
46 """Flush the objects."""
47 self.producer.flush(timeout=timeout)
49 def close(self):
50 """Close the producer and clean up."""
51 self.acquire()
52 try:
53 if self.producer:
54 self.producer.close()
56 logging.Handler.close(self)
57 finally:
58 self.release()
60def get_logger(name, debug=False, topic=default_log_topic, broker_list=default_broker_list, levels=[]):
61 logger = logging.getLogger(name)
62 if debug:
63 level = logging.DEBUG
64 else:
65 level = logging.INFO
66 logger.setLevel(level)
68 ch = logging.StreamHandler()
69 ch.setLevel(logging.DEBUG)
70 formatter = logging.Formatter('{} - %(levelname)-8s | %(message)s'.format(name))
71 ch.setFormatter(formatter)
72 logger.addHandler(ch)
74 ch = KafkaHandler(broker_list, topic = topic)
75 ch.setLevel(level)
76 formatter = logging.Formatter('%(message)s')
77 ch.setFormatter(formatter)
78 logger.addHandler(ch)
80 logger.info("Logging on.")
81 atexit.register(lambda: logger.info("Logging off."))
82 return logger
86class Logger:
87 default_columns = [
88 { "field": "t", "length": 5, "align": ">", "name": "time" },
89 { "field": "source", "length": 9, "align": "^", "name": "source" },
90 { "field": "level", "length": 8, "align": "^", "name": "level" },
91 { "field": "message", "length": None, "align": "<", "name": "message" },
92 ]
94 def __init__(self, columns=default_columns, log_sep=None, skip_line=True, levels={}, sources={}):
96 self.columns = columns
97 self.elastic_columns = {}
98 self.preprocessors = {}
99 self.postprocessors = {}
100 self.sep = "|"
101 self.header_sep = "="
102 self.log_sep = log_sep
103 self.skip_line = skip_line
104 self.levels = { l.upper() for l in levels }
105 self.sources = { s for s in sources }
107 for col in columns:
108 if col['length'] is None:
109 self.elastic_columns[col['field']] = col
111 self.headers = { col["field"]: col["name"] for col in self.columns }
112 self.print(self.headers, log_sep = self.header_sep)
114 def get_terminal_width(self):
115 return os.get_terminal_size()[0]
117 def draw_line(self, c):
118 print(c * self.get_terminal_width())
120 def add_preprocessor(self, field, f):
121 self.preprocessors[field] = f
123 def add_postprocessor(self, field, f):
124 self.postprocessors[field] = f
126 def accept_field(self, field, fields):
127 if len(fields) == 0:
128 return True
129 else:
130 return (field in fields)
132 def log(self, entry, log_sep = None):
133 if self.accept_field(entry['level'].upper(), self.levels) and \
134 self.accept_field(entry['source'], self.sources):
136 self.print(entry, log_sep)
138 def print(self, entry, log_sep = None):
139 if log_sep is None:
140 log_sep = self.log_sep
142 values = []
143 lsep = len(self.sep)
145 remaining_width = self.get_terminal_width()
146 for col in self.columns:
147 val = entry.get(col['field'], "---")
148 f = self.preprocessors.get(col["field"], None)
149 if f is not None:
150 val = f(val)
151 val = str(val)
153 if col['length'] is not None:
154 col['length'] = max(len(val), col['length'])
155 remaining_width -= col['length']
156 remaining_width -= 2 + lsep
157 values.append(val)
158 remaining_width = int(remaining_width / len(self.elastic_columns))
160 all_lines = []
161 format_strings = []
162 max_lines_nb = 0
163 for i,col in enumerate(self.columns):
164 width = col['length'] if col['length'] is not None else remaining_width
165 lines = values[i].split("\n")
166 lines = [ l for line in lines for l in textwrap.wrap(line,width)]
167 format_string ="{{:{}{}}}".format(col['align'], width)
168 format_strings.append(format_string)
169 max_lines_nb = max(max_lines_nb, len(lines))
170 all_lines.append(lines)
172 if self.skip_line: max_lines_nb += 1
174 for i in range(max_lines_nb):
175 for j,col in enumerate(self.columns):
176 lines = all_lines[j]
177 if i < len(lines):
178 field = lines[i]
179 else:
180 field = ""
181 f = self.postprocessors.get(col["field"], None)
182 msg = format_strings[j].format(field)
183 if f is not None:
184 msg = f(field, msg)
185 print(self.sep + " " + msg + " ", end = '')
186 print()
188 if log_sep is not None:
189 self.draw_line(log_sep)
192def postprocess_color(colors, default_color = "white"):
193 def f(key, field):
194 color = colors.get(key, default_color)
195 if isinstance(color, list):
196 attrs = [] if len(color) == 1 else color[1:]
197 return colored(field, color[0], attrs = attrs)
198 elif isinstance(color, str):
199 return colored(field, color)
200 return f
202def preprocess_time():
203 t0 = int(time.time())
204 return (lambda t: t - t0)
206if __name__ == '__main__':
207 parser = argparse.ArgumentParser(formatter_class = argparse.RawTextHelpFormatter)
208 parser.add_argument('--broker-list', type=str,
209 help="The broker list. It could be either \n"
210 " - a filepath : containing a comma separated list"
211 " of brokers\n"
212 " - a comma separated list of brokers, e.g. "
213 " localhost:9091,localhost:9092\n",
214 required=True)
215 parser.add_argument('--logs_topic', type=str,
216 default=default_log_topic,
217 help='The topic for listening to logs')
218 parser.add_argument('--levels',
219 nargs='+',
220 help='list of displayed levels (empty = all)',
221 default = [])
222 parser.add_argument('--sources',
223 nargs='+',
224 help='list of displayed sources (empty = all)',
225 default = [])
226 args = parser.parse_args()
228 consumer = KafkaConsumer(args.logs_topic,
229 bootstrap_servers=args.broker_list)
232 logger = Logger(levels=args.levels, sources=args.sources)
234 logger.add_preprocessor("t", preprocess_time())
236 logger.add_postprocessor("level", postprocess_color({
237 "DEBUG": [ "grey" ],
238 "INFO": [ "green" ],
239 "WARNING": [ "yellow" ],
240 "ERROR": [ "red" ],
241 "CRITICAL": [ "red", "blink"]
242 }))
244 logger.add_postprocessor("source", postprocess_color({
245 "collector": [ "blue" ],
246 "estimator": [ "magenta" ],
247 "predictor": [ "yellow" ],
248 "learner": [ "cyan" ]
249 }))
251 for m in consumer:
252 msg = json.loads(m.value.decode('utf8'))
253 logger.log(msg)