Skip to content

Commit 10ab157

Browse files
zeevdrclaude
andauthored
fix(watcher): cancel gRPC stream in stop() to prevent thread leak (#76)
Storing the active stream as self._stream and calling cancel() before thread.join() ensures the background Subscribe call unblocks promptly. Without this, the daemon thread blocks indefinitely inside the gRPC iterator and only exits at process shutdown. Closes #46 Co-authored-by: Claude <noreply@anthropic.com>
1 parent 6752047 commit 10ab157

2 files changed

Lines changed: 45 additions & 2 deletions

File tree

sdk/src/opendecree/watcher.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ def __init__(self, stub: Any, pb2: Any, tenant_id: str, timeout: float) -> None:
148148
self._timeout = timeout
149149
self._fields: dict[str, WatchedField] = {} # type: ignore[type-arg]
150150
self._thread: threading.Thread | None = None
151+
self._stream: grpc.Future | None = None
151152
self._stop_event = threading.Event()
152153

153154
def field(self, path: str, type_: type[T], *, default: T) -> WatchedField[T]:
@@ -184,6 +185,8 @@ def start(self) -> None:
184185
def stop(self) -> None:
185186
"""Stop watching and clean up the background thread."""
186187
self._stop_event.set()
188+
if self._stream is not None:
189+
self._stream.cancel()
187190
if self._thread is not None:
188191
self._thread.join(timeout=5.0)
189192
self._thread = None
@@ -215,19 +218,21 @@ def _subscribe_loop(self) -> None:
215218

216219
while not self._stop_event.is_set():
217220
try:
218-
stream = self._stub.Subscribe(
221+
self._stream = self._stub.Subscribe(
219222
self._pb2.SubscribeRequest(
220223
tenant_id=self._tenant_id,
221224
field_paths=field_paths,
222225
),
223226
)
224227
backoff = _RECONNECT_INITIAL # reset on successful connect
225228

226-
for response in stream:
229+
for response in self._stream:
227230
if self._stop_event.is_set():
228231
return
229232
self._process_change(response.change)
230233

234+
self._stream = None
235+
231236
except grpc.RpcError as e:
232237
if self._stop_event.is_set():
233238
return

sdk/tests/test_watcher.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,3 +264,41 @@ def test_non_retryable_error_stops_loop(self):
264264

265265
# Thread should have exited on its own due to non-retryable error.
266266
assert w._thread is None
267+
268+
def test_stop_cancels_stream_and_joins_thread(self):
269+
"""stop() cancels the gRPC stream so the background thread exits cleanly."""
270+
import threading
271+
272+
w = self._make_watcher()
273+
w.field("fee", float, default=0.0)
274+
275+
# A blocking iterator that only unblocks when cancel() is called.
276+
cancelled = threading.Event()
277+
278+
class _BlockingIter:
279+
def __iter__(self):
280+
return self
281+
282+
def __next__(self):
283+
# Block until cancelled.
284+
cancelled.wait(timeout=10.0)
285+
raise StopIteration
286+
287+
def cancel(self):
288+
cancelled.set()
289+
290+
blocking_stream = _BlockingIter()
291+
w._stub.Subscribe.return_value = blocking_stream
292+
293+
w.start()
294+
time.sleep(0.1) # let the thread reach the blocking iterator
295+
296+
thread_ref = w._thread
297+
assert thread_ref is not None
298+
assert thread_ref.is_alive()
299+
300+
w.stop()
301+
302+
# Thread must have joined within the timeout.
303+
assert not thread_ref.is_alive()
304+
assert w._thread is None

0 commit comments

Comments
 (0)