Skip to content

Commit c906595

Browse files
author
root
committed
#76 implemented flush grouped metrics when it program terminates
1 parent 18ea7bf commit c906595

3 files changed

Lines changed: 25 additions & 7 deletions

File tree

basescript/basescript.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,18 @@ def __init__(self, args=None):
4545
if self.args.metric_grouping_interval is None:
4646
self.args.metric_grouping_interval = self.METRIC_GROUPING_INTERVAL
4747

48-
self.log = init_logger(
48+
log = init_logger(
4949
fmt=self.args.log_format,
5050
quiet=self.args.quiet,
5151
level=self.args.log_level,
5252
fpath=self.args.log_file,
5353
pre_hooks=self.define_log_pre_format_hooks(),
5454
post_hooks=self.define_log_post_format_hooks(),
5555
metric_grouping_interval=self.args.metric_grouping_interval
56-
).bind(name=self.args.name)
56+
)
57+
58+
self._flush_metrics_q = log._force_flush_q
59+
self.log = log.bind(name=self.args.name)
5760

5861
self.stats = Dummy()
5962

@@ -73,12 +76,13 @@ def start(self):
7376
raise
7477
except KeyboardInterrupt:
7578
self.log.warning("exited via keyboard interrupt")
76-
sys.exit(1)
7779
except:
7880
self.log.exception("exited start function")
7981
# set exit code so we know it did not end successfully
8082
# TODO different exit codes based on signals ?
81-
sys.exit(1)
83+
finally:
84+
self._flush_metrics_q.put(None, block=True)
85+
self._flush_metrics_q.put(None, block=True, timeout=1)
8286

8387
self.log.info("exited successfully")
8488

basescript/log.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,18 @@
55
import socket
66
import logging
77
import numbers
8+
import queue
89
from threading import Thread, Lock
910
from datetime import datetime
1011
from functools import wraps
1112

12-
from deeputil import Dummy
13+
from deeputil import Dummy, keeprunning
1314
import structlog
1415

1516
# stdlib to structlog handlers should be configured only once.
1617
_GLOBAL_LOG_CONFIGURED = False
1718

19+
FORCE_FLUSH_Q_SIZE = 1
1820
HOSTNAME = socket.gethostname()
1921
METRICS_STATE = {}
2022
METRICS_STATE_LOCK = Lock()
@@ -270,11 +272,18 @@ def _structlog_default_keys_processor(logger_class, log_method, event):
270272

271273
return event
272274

275+
@keeprunning()
273276
def dump_metrics(log, interval):
274277
global METRICS_STATE
275278

279+
terminate = False
280+
276281
while True:
277-
time.sleep(interval)
282+
try:
283+
log._force_flush_q.get(block=True, timeout=interval)
284+
terminate = True
285+
except queue.Empty:
286+
pass
278287

279288
METRICS_STATE_LOCK.acquire()
280289
m = METRICS_STATE
@@ -292,6 +301,9 @@ def dump_metrics(log, interval):
292301
fn = getattr(log, level)
293302
fn(event, type='metric', __grouped__=True, num=n, **d)
294303

304+
if terminate:
305+
break
306+
295307
def metrics_grouping_processor(logger_class, log_method, event):
296308
if event.get('type') == 'logged_metric':
297309
event['type'] = 'metric'
@@ -459,6 +471,8 @@ def init_logger(
459471
level = getattr(logging, level.upper())
460472
log.setLevel(level)
461473

474+
log._force_flush_q = queue.Queue(maxsize=FORCE_FLUSH_Q_SIZE)
475+
462476
if metric_grouping_interval:
463477
keep_running = Thread(target=dump_metrics, args=(log, metric_grouping_interval))
464478
keep_running.daemon = True

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def get_long_description():
2222

2323
long_description = get_long_description()
2424

25-
version = '0.2.6'
25+
version = '0.2.7'
2626
setup(
2727
name="basescript",
2828
version=version,

0 commit comments

Comments
 (0)