Coverage for Python_files/logger.py: 0%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

156 statements  

1import argparse, os, atexit 

2import logging 

3import json 

4import time 

5import textwrap 

6from termcolor import colored 

7 

8from kafka import KafkaProducer, KafkaConsumer 

9 

10default_broker_list = "localhost:9091,localhost:9092" 

11default_log_topic = "logs" 

12 

13class KafkaHandler(logging.Handler): 

14 """Class to instantiate the kafka logging facility.""" 

15 

16 def __init__(self, hostlist, topic='logs', tls=None): 

17 """Initialize an instance of the kafka handler.""" 

18 logging.Handler.__init__(self) 

19 self.producer = KafkaProducer(bootstrap_servers=hostlist, 

20 value_serializer=lambda v: json.dumps(v).encode('utf-8'), 

21 linger_ms=10) 

22 self.topic = topic 

23 self.record = None 

24 

25 

26 def emit(self, record): 

27 """Emit the provided record to the kafka_client producer.""" 

28 # drop kafka logging to avoid infinite recursion 

29 if 'kafka.' in record.name: 

30 return 

31 

32 try: 

33 # apply the logger formatter 

34 msg = self.format(record) 

35 

36 self.producer.send(self.topic, { 

37 't': int(time.time()), 

38 'source': record.name, 

39 'level': record.levelname, 

40 'message': msg}) 

41 self.flush(timeout=1.0) 

42 except Exception: 

43 logging.Handler.handleError(self, record) 

44 

45 def flush(self, timeout=None): 

46 """Flush the objects.""" 

47 self.producer.flush(timeout=timeout) 

48 

49 def close(self): 

50 """Close the producer and clean up.""" 

51 self.acquire() 

52 try: 

53 if self.producer: 

54 self.producer.close() 

55 

56 logging.Handler.close(self) 

57 finally: 

58 self.release() 

59 

60def get_logger(name, debug=False, topic=default_log_topic, broker_list=default_broker_list, levels=[]): 

61 logger = logging.getLogger(name) 

62 if debug: 

63 level = logging.DEBUG 

64 else: 

65 level = logging.INFO 

66 logger.setLevel(level) 

67 

68 ch = logging.StreamHandler() 

69 ch.setLevel(logging.DEBUG) 

70 formatter = logging.Formatter('{} - %(levelname)-8s | %(message)s'.format(name)) 

71 ch.setFormatter(formatter) 

72 logger.addHandler(ch) 

73 

74 ch = KafkaHandler(broker_list, topic = topic) 

75 ch.setLevel(level) 

76 formatter = logging.Formatter('%(message)s') 

77 ch.setFormatter(formatter) 

78 logger.addHandler(ch) 

79 

80 logger.info("Logging on.") 

81 atexit.register(lambda: logger.info("Logging off.")) 

82 return logger 

83 

84 

85 

86class Logger: 

87 default_columns = [ 

88 { "field": "t", "length": 5, "align": ">", "name": "time" }, 

89 { "field": "source", "length": 9, "align": "^", "name": "source" }, 

90 { "field": "level", "length": 8, "align": "^", "name": "level" }, 

91 { "field": "message", "length": None, "align": "<", "name": "message" }, 

92 ] 

93 

94 def __init__(self, columns=default_columns, log_sep=None, skip_line=True, levels={}, sources={}): 

95 

96 self.columns = columns 

97 self.elastic_columns = {} 

98 self.preprocessors = {} 

99 self.postprocessors = {} 

100 self.sep = "|" 

101 self.header_sep = "=" 

102 self.log_sep = log_sep 

103 self.skip_line = skip_line 

104 self.levels = { l.upper() for l in levels } 

105 self.sources = { s for s in sources } 

106 

107 for col in columns: 

108 if col['length'] is None: 

109 self.elastic_columns[col['field']] = col 

110 

111 self.headers = { col["field"]: col["name"] for col in self.columns } 

112 self.print(self.headers, log_sep = self.header_sep) 

113 

114 def get_terminal_width(self): 

115 return os.get_terminal_size()[0] 

116 

117 def draw_line(self, c): 

118 print(c * self.get_terminal_width()) 

119 

120 def add_preprocessor(self, field, f): 

121 self.preprocessors[field] = f 

122 

123 def add_postprocessor(self, field, f): 

124 self.postprocessors[field] = f 

125 

126 def accept_field(self, field, fields): 

127 if len(fields) == 0: 

128 return True 

129 else: 

130 return (field in fields) 

131 

132 def log(self, entry, log_sep = None): 

133 if self.accept_field(entry['level'].upper(), self.levels) and \ 

134 self.accept_field(entry['source'], self.sources): 

135 

136 self.print(entry, log_sep) 

137 

138 def print(self, entry, log_sep = None): 

139 if log_sep is None: 

140 log_sep = self.log_sep 

141 

142 values = [] 

143 lsep = len(self.sep) 

144 

145 remaining_width = self.get_terminal_width() 

146 for col in self.columns: 

147 val = entry.get(col['field'], "---") 

148 f = self.preprocessors.get(col["field"], None) 

149 if f is not None: 

150 val = f(val) 

151 val = str(val) 

152 

153 if col['length'] is not None: 

154 col['length'] = max(len(val), col['length']) 

155 remaining_width -= col['length'] 

156 remaining_width -= 2 + lsep 

157 values.append(val) 

158 remaining_width = int(remaining_width / len(self.elastic_columns)) 

159 

160 all_lines = [] 

161 format_strings = [] 

162 max_lines_nb = 0 

163 for i,col in enumerate(self.columns): 

164 width = col['length'] if col['length'] is not None else remaining_width 

165 lines = values[i].split("\n") 

166 lines = [ l for line in lines for l in textwrap.wrap(line,width)] 

167 format_string ="{{:{}{}}}".format(col['align'], width) 

168 format_strings.append(format_string) 

169 max_lines_nb = max(max_lines_nb, len(lines)) 

170 all_lines.append(lines) 

171 

172 if self.skip_line: max_lines_nb += 1 

173 

174 for i in range(max_lines_nb): 

175 for j,col in enumerate(self.columns): 

176 lines = all_lines[j] 

177 if i < len(lines): 

178 field = lines[i] 

179 else: 

180 field = "" 

181 f = self.postprocessors.get(col["field"], None) 

182 msg = format_strings[j].format(field) 

183 if f is not None: 

184 msg = f(field, msg) 

185 print(self.sep + " " + msg + " ", end = '') 

186 print() 

187 

188 if log_sep is not None: 

189 self.draw_line(log_sep) 

190 

191 

192def postprocess_color(colors, default_color = "white"): 

193 def f(key, field): 

194 color = colors.get(key, default_color) 

195 if isinstance(color, list): 

196 attrs = [] if len(color) == 1 else color[1:] 

197 return colored(field, color[0], attrs = attrs) 

198 elif isinstance(color, str): 

199 return colored(field, color) 

200 return f 

201 

202def preprocess_time(): 

203 t0 = int(time.time()) 

204 return (lambda t: t - t0) 

205 

206if __name__ == '__main__': 

207 parser = argparse.ArgumentParser(formatter_class = argparse.RawTextHelpFormatter) 

208 parser.add_argument('--broker-list', type=str, 

209 help="The broker list. It could be either \n" 

210 " - a filepath : containing a comma separated list" 

211 " of brokers\n" 

212 " - a comma separated list of brokers, e.g. " 

213 " localhost:9091,localhost:9092\n", 

214 required=True) 

215 parser.add_argument('--logs_topic', type=str, 

216 default=default_log_topic, 

217 help='The topic for listening to logs') 

218 parser.add_argument('--levels', 

219 nargs='+', 

220 help='list of displayed levels (empty = all)', 

221 default = []) 

222 parser.add_argument('--sources', 

223 nargs='+', 

224 help='list of displayed sources (empty = all)', 

225 default = []) 

226 args = parser.parse_args() 

227 

228 consumer = KafkaConsumer(args.logs_topic, 

229 bootstrap_servers=args.broker_list) 

230 

231 

232 logger = Logger(levels=args.levels, sources=args.sources) 

233 

234 logger.add_preprocessor("t", preprocess_time()) 

235 

236 logger.add_postprocessor("level", postprocess_color({ 

237 "DEBUG": [ "grey" ], 

238 "INFO": [ "green" ], 

239 "WARNING": [ "yellow" ], 

240 "ERROR": [ "red" ], 

241 "CRITICAL": [ "red", "blink"] 

242 })) 

243 

244 logger.add_postprocessor("source", postprocess_color({ 

245 "collector": [ "blue" ], 

246 "estimator": [ "magenta" ], 

247 "predictor": [ "yellow" ], 

248 "learner": [ "cyan" ] 

249 })) 

250 

251 for m in consumer: 

252 msg = json.loads(m.value.decode('utf8')) 

253 logger.log(msg)