Skip to content

Commit 65de401

Browse files
zeevdrclaude
andauthored
fix(watcher): bound WatchedField change queue to prevent unbounded memory growth (#110)
Add max_queue_size parameter (default 1024) to WatchedField and AsyncWatchedField. When the queue is full, the oldest entry is dropped (drop-oldest strategy) and dropped_changes is incremented. A warning is logged on each drop. Matches the TypeScript SDK's queue overflow semantics. Closes #100 Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 98298d9 commit 65de401

4 files changed

Lines changed: 243 additions & 22 deletions

File tree

sdk/src/opendecree/async_watcher.py

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import logging
2323
import random
2424
import re
25+
from collections import deque
2526
from collections.abc import AsyncIterator, Callable
2627
from typing import Any, TypeVar
2728

@@ -41,6 +42,8 @@
4142

4243
_CONTROL_CHARS_RE = re.compile(r"[^\x20-\x7E]")
4344

45+
_DEFAULT_MAX_QUEUE_SIZE = 1024
46+
4447
T = TypeVar("T")
4548

4649

@@ -57,15 +60,26 @@ def __init__(
5760
default: T,
5861
*,
5962
on_callback_error: Callable[[Exception], None] | None = None,
63+
max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE,
6064
) -> None:
6165
super().__init__(path, type_, default, on_callback_error=on_callback_error)
62-
self._change_queue: asyncio.Queue[Change | None] = asyncio.Queue()
66+
self._max_queue_size = max_queue_size
67+
self._dropped_changes = 0
68+
# _change_queue is a deque used as a bounded FIFO. _queue_event gates
69+
# the async changes() iterator when the deque is empty.
70+
self._change_queue: deque[Change | None] = deque()
71+
self._queue_event = asyncio.Event()
6372

6473
@property
6574
def value(self) -> T:
6675
"""The current value — always fresh."""
6776
return self._value
6877

78+
@property
79+
def dropped_changes(self) -> int:
80+
"""Number of changes dropped because the queue was full."""
81+
return self._dropped_changes
82+
6983
def __repr__(self) -> str:
7084
return f"AsyncWatchedField({self._path!r}, value={self._value!r})"
7185

@@ -75,7 +89,14 @@ async def changes(self) -> AsyncIterator[Change]:
7589
Yields Change objects until the watcher is stopped.
7690
"""
7791
while True:
78-
change = await self._change_queue.get()
92+
await self._queue_event.wait()
93+
if not self._change_queue:
94+
# Spurious wake — clear and re-wait.
95+
self._queue_event.clear()
96+
continue
97+
change = self._change_queue.popleft()
98+
if not self._change_queue:
99+
self._queue_event.clear()
79100
if change is None: # sentinel
80101
return
81102
yield change
@@ -84,15 +105,27 @@ def _update(self, raw_value: str | None, change: Change) -> None:
84105
"""Update the field value from a raw string. Called by the watcher task."""
85106
old, new = self._apply_raw(raw_value)
86107
self._fire_callbacks(old, new)
87-
self._change_queue.put_nowait(change)
108+
if len(self._change_queue) >= self._max_queue_size:
109+
self._change_queue.popleft()
110+
self._dropped_changes += 1
111+
logger.warning(
112+
"AsyncWatchedField %r: change queue full (max=%d), oldest entry dropped "
113+
"(total dropped: %d)",
114+
self._path,
115+
self._max_queue_size,
116+
self._dropped_changes,
117+
)
118+
self._change_queue.append(change)
119+
self._queue_event.set()
88120

89121
def _load_initial(self, raw_value: str) -> None:
90122
"""Set initial value from snapshot. No callbacks fired."""
91123
self._apply_raw(raw_value)
92124

93125
def _stop(self) -> None:
94126
"""Signal the changes() iterator to stop."""
95-
self._change_queue.put_nowait(None)
127+
self._change_queue.append(None)
128+
self._queue_event.set()
96129

97130

98131
class AsyncConfigWatcher:
@@ -126,6 +159,7 @@ def field(
126159
*,
127160
default: T,
128161
on_callback_error: Callable[[Exception], None] | None = None,
162+
max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE,
129163
) -> AsyncWatchedField[T]:
130164
"""Register a field to watch.
131165
@@ -138,13 +172,18 @@ def field(
138172
on_callback_error: Optional hook called with the exception when an
139173
on_change callback raises. If not set, the exception is logged.
140174
The hook may re-raise to terminate the watcher's background task.
175+
max_queue_size: Maximum number of unread changes buffered. When the
176+
queue is full, the oldest entry is dropped and ``dropped_changes``
177+
is incremented. Default: 1024.
141178
142179
Returns:
143180
An AsyncWatchedField that tracks the live value.
144181
"""
145182
if self._task is not None:
146183
raise RuntimeError("Cannot register fields after watcher has started")
147-
watched = AsyncWatchedField(path, type_, default, on_callback_error=on_callback_error)
184+
watched = AsyncWatchedField(
185+
path, type_, default, on_callback_error=on_callback_error, max_queue_size=max_queue_size
186+
)
148187
self._fields[path] = watched
149188
return watched
150189

sdk/src/opendecree/watcher.py

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
from __future__ import annotations
1717

1818
import logging
19-
import queue
2019
import random
2120
import re
2221
import threading
2322
import time
23+
from collections import deque
2424
from collections.abc import Callable, Iterator
2525
from typing import Any, TypeVar
2626

@@ -38,6 +38,8 @@
3838

3939
logger = logging.getLogger("opendecree.watcher")
4040

41+
_DEFAULT_MAX_QUEUE_SIZE = 1024
42+
4143
_CONTROL_CHARS_RE = re.compile(r"[^\x20-\x7E]")
4244

4345
T = TypeVar("T")
@@ -56,17 +58,29 @@ def __init__(
5658
default: T,
5759
*,
5860
on_callback_error: Callable[[Exception], None] | None = None,
61+
max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE,
5962
) -> None:
6063
super().__init__(path, type_, default, on_callback_error=on_callback_error)
6164
self._lock = threading.Lock()
62-
self._change_queue: queue.Queue[Change] = queue.Queue()
65+
self._max_queue_size = max_queue_size
66+
self._dropped_changes = 0
67+
# _change_queue is a deque used as a bounded FIFO. _queue_cond is used
68+
# by changes() to block when the deque is empty.
69+
self._change_queue: deque[Change] = deque()
70+
self._queue_cond = threading.Condition(threading.Lock())
6371

6472
@property
6573
def value(self) -> T:
6674
"""The current value — always fresh, thread-safe."""
6775
with self._lock:
6876
return self._value
6977

78+
@property
79+
def dropped_changes(self) -> int:
80+
"""Number of changes dropped because the queue was full."""
81+
with self._queue_cond:
82+
return self._dropped_changes
83+
7084
def __repr__(self) -> str:
7185
return f"WatchedField({self._path!r}, value={self.value!r})"
7286

@@ -77,10 +91,10 @@ def changes(self) -> Iterator[Change]:
7791
Yields Change objects with old_value and new_value as strings.
7892
"""
7993
while True:
80-
try:
81-
change = self._change_queue.get(timeout=1.0)
82-
except queue.Empty:
83-
continue
94+
with self._queue_cond:
95+
while not self._change_queue:
96+
self._queue_cond.wait(timeout=1.0)
97+
change = self._change_queue.popleft()
8498
if change is _SENTINEL_CHANGE:
8599
return
86100
yield change
@@ -90,7 +104,19 @@ def _update(self, raw_value: str | None, change: Change) -> None:
90104
with self._lock:
91105
old, new = self._apply_raw(raw_value)
92106
self._fire_callbacks(old, new)
93-
self._change_queue.put(change)
107+
with self._queue_cond:
108+
if len(self._change_queue) >= self._max_queue_size:
109+
self._change_queue.popleft()
110+
self._dropped_changes += 1
111+
logger.warning(
112+
"WatchedField %r: change queue full (max=%d), oldest entry dropped "
113+
"(total dropped: %d)",
114+
self._path,
115+
self._max_queue_size,
116+
self._dropped_changes,
117+
)
118+
self._change_queue.append(change)
119+
self._queue_cond.notify()
94120

95121
def _load_initial(self, raw_value: str) -> None:
96122
"""Set initial value from snapshot. No callbacks fired."""
@@ -99,7 +125,9 @@ def _load_initial(self, raw_value: str) -> None:
99125

100126
def _stop(self) -> None:
101127
"""Signal the changes() iterator to stop."""
102-
self._change_queue.put(_SENTINEL_CHANGE)
128+
with self._queue_cond:
129+
self._change_queue.append(_SENTINEL_CHANGE)
130+
self._queue_cond.notify()
103131

104132

105133
# Sentinel to signal the changes() iterator to stop.
@@ -130,6 +158,7 @@ def field(
130158
*,
131159
default: T,
132160
on_callback_error: Callable[[Exception], None] | None = None,
161+
max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE,
133162
) -> WatchedField[T]:
134163
"""Register a field to watch.
135164
@@ -142,13 +171,18 @@ def field(
142171
on_callback_error: Optional hook called with the exception when an
143172
on_change callback raises. If not set, the exception is logged.
144173
The hook may re-raise to terminate the watcher's background loop.
174+
max_queue_size: Maximum number of unread changes buffered. When the
175+
queue is full, the oldest entry is dropped and ``dropped_changes``
176+
is incremented. Default: 1024.
145177
146178
Returns:
147179
A WatchedField that tracks the live value.
148180
"""
149181
if self._thread is not None:
150182
raise RuntimeError("Cannot register fields after watcher has started")
151-
watched = WatchedField(path, type_, default, on_callback_error=on_callback_error)
183+
watched = WatchedField(
184+
path, type_, default, on_callback_error=on_callback_error, max_queue_size=max_queue_size
185+
)
152186
self._fields[path] = watched
153187
return watched
154188

sdk/tests/test_async_watcher.py

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,15 @@ def test_update_null_resets_to_default(self):
7272
@pytest.mark.asyncio
7373
async def test_changes_iterator(self):
7474
f = AsyncWatchedField("x", str, "")
75+
f._load_initial("a")
7576

7677
c1 = Change(field_path="x", old_value="a", new_value="b", version=1)
7778
c2 = Change(field_path="x", old_value="b", new_value="c", version=2)
7879

79-
f._change_queue.put_nowait(c1)
80-
f._change_queue.put_nowait(c2)
81-
f._change_queue.put_nowait(None) # sentinel
80+
# Populate via the internal helpers (matching the production path).
81+
f._update("b", c1)
82+
f._update("c", c2)
83+
f._stop()
8284

8385
collected = [c async for c in f.changes()]
8486
assert len(collected) == 2
@@ -140,6 +142,72 @@ def bad_cb(old: int, new: int) -> None:
140142
assert len(errors) == 1
141143
assert isinstance(errors[0], RuntimeError)
142144

145+
# --- Bounded queue tests ---
146+
147+
def test_dropped_changes_starts_at_zero(self):
148+
f = AsyncWatchedField("x", str, "", max_queue_size=5)
149+
assert f.dropped_changes == 0
150+
151+
def test_queue_fills_without_dropping_below_limit(self):
152+
f = AsyncWatchedField("x", str, "", max_queue_size=3)
153+
for i in range(3):
154+
c = Change(field_path="x", old_value=str(i), new_value=str(i + 1), version=i)
155+
f._update(str(i + 1), c)
156+
157+
assert f.dropped_changes == 0
158+
assert len(f._change_queue) == 3
159+
160+
def test_oldest_entry_dropped_when_queue_full(self):
161+
f = AsyncWatchedField("x", str, "", max_queue_size=3)
162+
for i in range(5):
163+
c = Change(field_path="x", old_value=str(i), new_value=str(i + 1), version=i)
164+
f._update(str(i + 1), c)
165+
166+
assert f.dropped_changes == 2
167+
assert len(f._change_queue) == 3
168+
versions = [c.version for c in f._change_queue]
169+
assert versions == [2, 3, 4]
170+
171+
def test_drop_logs_warning(self, caplog):
172+
import logging
173+
174+
f = AsyncWatchedField("payments.fee", str, "", max_queue_size=2)
175+
with caplog.at_level(logging.WARNING, logger="opendecree.async_watcher"):
176+
for i in range(4):
177+
c = Change(
178+
field_path="payments.fee", old_value=str(i), new_value=str(i + 1), version=i
179+
)
180+
f._update(str(i + 1), c)
181+
182+
assert f.dropped_changes == 2
183+
warning_records = [r for r in caplog.records if "dropped" in r.message]
184+
assert len(warning_records) == 2
185+
assert "payments.fee" in warning_records[0].message
186+
187+
def test_max_queue_size_constructor_arg(self):
188+
f = AsyncWatchedField("x", str, "", max_queue_size=10)
189+
assert f._max_queue_size == 10
190+
191+
def test_default_max_queue_size(self):
192+
from opendecree.async_watcher import _DEFAULT_MAX_QUEUE_SIZE
193+
194+
f = AsyncWatchedField("x", str, "")
195+
assert f._max_queue_size == _DEFAULT_MAX_QUEUE_SIZE
196+
assert _DEFAULT_MAX_QUEUE_SIZE == 1024
197+
198+
@pytest.mark.asyncio
199+
async def test_changes_iterator_after_overflow(self):
200+
f = AsyncWatchedField("x", str, "", max_queue_size=2)
201+
for i in range(4):
202+
c = Change(field_path="x", old_value=str(i), new_value=str(i + 1), version=i)
203+
f._update(str(i + 1), c)
204+
f._stop()
205+
206+
collected = [c async for c in f.changes()]
207+
assert len(collected) == 2
208+
assert collected[0].version == 2
209+
assert collected[1].version == 3
210+
143211

144212
# --- AsyncConfigWatcher unit tests ---
145213

0 commit comments

Comments
 (0)