Skip to content

Commit 337a2d6

Browse files
authored
fix(watcher): apply backoff on clean stream close
When the server closed the Subscribe stream normally, the outer reconnect loop re-issued Subscribe immediately with no delay, causing a tight reconnect loop on a server FIN. Ports the backoff branch from async_watcher.py: on a clean stream close, the watcher sleeps backoff × jitter seconds (in 0.1 s intervals so the stop event is honoured) and grows the backoff the same way error reconnects do. Closes #51
1 parent d3d3085 commit 337a2d6

2 files changed

Lines changed: 68 additions & 0 deletions

File tree

sdk/src/opendecree/watcher.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,16 @@ def _subscribe_loop(self) -> None:
233233

234234
self._stream = None
235235

236+
# Stream ended normally (server closed) — reconnect with backoff.
237+
if not self._stop_event.is_set():
238+
jitter = random.uniform(0.5, 1.5)
239+
sleep_time = backoff * jitter
240+
logger.warning("Stream closed by server, reconnecting in %.1fs", sleep_time)
241+
deadline = time.monotonic() + sleep_time
242+
while time.monotonic() < deadline and not self._stop_event.is_set():
243+
time.sleep(0.1)
244+
backoff = min(backoff * _RECONNECT_MULTIPLIER, _RECONNECT_MAX)
245+
236246
except grpc.RpcError as e:
237247
if self._stop_event.is_set():
238248
return

sdk/tests/test_watcher.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,64 @@ def test_non_retryable_error_stops_loop(self):
265265
# Thread should have exited on its own due to non-retryable error.
266266
assert w._thread is None
267267

268+
def test_reconnects_after_clean_stream_close(self):
269+
"""Clean server-side stream close triggers a reconnect."""
270+
import unittest.mock as mock
271+
272+
w = self._make_watcher()
273+
w.field("fee", float, default=0.0)
274+
275+
call_count = 0
276+
277+
def _subscribe_side_effect(*args, **kwargs):
278+
nonlocal call_count
279+
call_count += 1
280+
if call_count == 1:
281+
return iter([]) # clean close — server FIN
282+
# Stop after the second call so the thread exits.
283+
w._stop_event.set()
284+
return iter([])
285+
286+
w._stub.Subscribe.side_effect = _subscribe_side_effect
287+
288+
with mock.patch("opendecree.watcher._RECONNECT_INITIAL", 0.2):
289+
w.start()
290+
time.sleep(1.0)
291+
w.stop()
292+
293+
assert call_count >= 2
294+
295+
def test_clean_close_applies_backoff(self):
296+
"""Reconnect after a clean close is delayed, not immediate."""
297+
import unittest.mock as mock
298+
299+
w = self._make_watcher()
300+
w.field("fee", float, default=0.0)
301+
302+
call_count = 0
303+
timestamps: list[float] = []
304+
305+
def _subscribe_side_effect(*args, **kwargs):
306+
nonlocal call_count
307+
call_count += 1
308+
timestamps.append(time.monotonic())
309+
if call_count == 1:
310+
return iter([]) # clean close
311+
w._stop_event.set()
312+
return iter([])
313+
314+
w._stub.Subscribe.side_effect = _subscribe_side_effect
315+
316+
with mock.patch("opendecree.watcher._RECONNECT_INITIAL", 0.2):
317+
w.start()
318+
time.sleep(1.0)
319+
w.stop()
320+
321+
assert len(timestamps) >= 2
322+
gap = timestamps[1] - timestamps[0]
323+
# Minimum gap is jitter_min (0.5) * RECONNECT_INITIAL (0.2) = 0.1s.
324+
assert gap >= 0.05
325+
268326
def test_stop_cancels_stream_and_joins_thread(self):
269327
"""stop() cancels the gRPC stream so the background thread exits cleanly."""
270328
import threading

0 commit comments

Comments
 (0)