55import socket
66import logging
77import numbers
8+ import queue
89from threading import Thread , Lock
910from datetime import datetime
1011from functools import wraps
1112
12- from deeputil import Dummy
13+ from deeputil import Dummy , keeprunning
1314import structlog
1415
1516# stdlib to structlog handlers should be configured only once.
1617_GLOBAL_LOG_CONFIGURED = False
1718
19+ FORCE_FLUSH_Q_SIZE = 1
1820HOSTNAME = socket .gethostname ()
1921METRICS_STATE = {}
2022METRICS_STATE_LOCK = Lock ()
@@ -270,11 +272,18 @@ def _structlog_default_keys_processor(logger_class, log_method, event):
270272
271273 return event
272274
275+ @keeprunning ()
273276def dump_metrics (log , interval ):
274277 global METRICS_STATE
275278
279+ terminate = False
280+
276281 while True :
277- time .sleep (interval )
282+ try :
283+ log ._force_flush_q .get (block = True , timeout = interval )
284+ terminate = True
285+ except queue .Empty :
286+ pass
278287
279288 METRICS_STATE_LOCK .acquire ()
280289 m = METRICS_STATE
@@ -292,6 +301,9 @@ def dump_metrics(log, interval):
292301 fn = getattr (log , level )
293302 fn (event , type = 'metric' , __grouped__ = True , num = n , ** d )
294303
304+ if terminate :
305+ break
306+
295307def metrics_grouping_processor (logger_class , log_method , event ):
296308 if event .get ('type' ) == 'logged_metric' :
297309 event ['type' ] = 'metric'
@@ -459,6 +471,8 @@ def init_logger(
459471 level = getattr (logging , level .upper ())
460472 log .setLevel (level )
461473
474+ log ._force_flush_q = queue .Queue (maxsize = FORCE_FLUSH_Q_SIZE )
475+
462476 if metric_grouping_interval :
463477 keep_running = Thread (target = dump_metrics , args = (log , metric_grouping_interval ))
464478 keep_running .daemon = True
0 commit comments