Skip to content

Commit bfa445e

Browse files
authored
fix: lost event on close (#15)
* fix: lost event on `close` * style: format python code (pep8)
1 parent 2f5f0ba commit bfa445e

2 files changed

Lines changed: 26 additions & 21 deletions

File tree

featureprobe/default_event_processor.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,14 @@ def __init__(self, context: Context):
103103
context.http_config.read_timeout)
104104

105105
self._event_repository = EventRepository()
106-
handler_thread = threading.Thread(
106+
self._handler_thread = threading.Thread(
107107
target=self._handle_event, args=(
108108
self._events, self._event_repository), daemon=True)
109-
handler_thread.start()
109+
self._handler_thread.start()
110110

111111
self._executor = ThreadPoolExecutor(max_workers=5)
112-
self._scheduler = BackgroundScheduler(timezone=tzlocal.get_localzone())
112+
self._scheduler = BackgroundScheduler(
113+
timezone=tzlocal.get_localzone(), logger=self._logger)
113114
self._scheduler.start()
114115
self._scheduler.add_job(self.flush,
115116
trigger='interval',
@@ -121,22 +122,24 @@ def from_context(cls, context: Context) -> EventProcessor:
121122
return cls(context)
122123

123124
def push(self, event: Event):
124-
if not self._closed:
125-
try:
126-
self._events.put_nowait(EventAction(
127-
EventAction.Type.EVENT, event))
128-
except queue.Full:
129-
DefaultEventProcessor._logger.warning(
130-
DefaultEventProcessor._LOG_BUSY_EVENT)
131-
132-
def flush(self):
133-
if not self._closed:
134-
try:
135-
self._events.put_nowait(
136-
EventAction(EventAction.Type.FLUSH, None))
137-
except queue.Full:
138-
DefaultEventProcessor._logger.warning(
139-
DefaultEventProcessor._LOG_BUSY_EVENT)
125+
if self._closed:
126+
return
127+
try:
128+
self._events.put_nowait(EventAction(
129+
EventAction.Type.EVENT, event))
130+
except queue.Full:
131+
DefaultEventProcessor._logger.warning(
132+
DefaultEventProcessor._LOG_BUSY_EVENT)
133+
134+
def flush(self, block=False, timeout=None):
135+
if self._closed:
136+
return
137+
try:
138+
self._events.put(
139+
EventAction(EventAction.Type.FLUSH, None), block, timeout)
140+
except queue.Full:
141+
DefaultEventProcessor._logger.warning(
142+
DefaultEventProcessor._LOG_BUSY_EVENT)
140143

141144
def shutdown(self):
142145
self._do_shutdown()
@@ -165,8 +168,10 @@ def _handle_event(self, events: Queue, event_repo: EventRepository):
165168
def _do_shutdown(self):
166169
if self._closed:
167170
return
171+
172+
self.flush(block=True, timeout=2)
168173
self._closed = True
169-
self._process_flush(self._event_repository)
174+
self._handler_thread.join(2)
170175
self._scheduler.shutdown(wait=True)
171176
self._executor.shutdown(wait=True)
172177

featureprobe/pooling_synchronizer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def from_context(
5858
def sync(self):
5959
PoolingSynchronizer.__logger.info(
6060
'Starting FeatureProbe polling repository with interval %d ms'
61-
% self._refresh_interval.total_seconds() * 1000)
61+
% (self._refresh_interval.total_seconds() * 1000))
6262
self._poll()
6363
with self._lock:
6464
self._scheduler = BackgroundScheduler(

0 commit comments

Comments
 (0)