Skip to content

Commit a430517

Browse files
committed
Bound watched field change queues
1 parent 98298d9 commit a430517

5 files changed

Lines changed: 160 additions & 8 deletions

File tree

sdk/src/opendecree/_watcher_base.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@
1515
_RECONNECT_INITIAL = 1.0
1616
_RECONNECT_MAX = 30.0
1717
_RECONNECT_MULTIPLIER = 2.0
18+
_DEFAULT_CHANGE_QUEUE_SIZE = 1024
19+
20+
21+
def _validate_max_queue_size(max_queue_size: int) -> int:
22+
"""Validate a positive change queue size."""
23+
if max_queue_size <= 0:
24+
raise ValueError("max_queue_size must be greater than 0")
25+
return max_queue_size
1826

1927

2028
class _WatchedFieldBase(Generic[T]):
@@ -35,12 +43,18 @@ def __init__(
3543
self._is_set = False
3644
self._callbacks: list[Callable[[T, T], None]] = []
3745
self._on_callback_error = on_callback_error
46+
self._dropped_changes = 0
3847

3948
@property
4049
def path(self) -> str:
4150
"""The field path this value tracks."""
4251
return self._path
4352

53+
@property
54+
def dropped_changes(self) -> int:
55+
"""Number of queued changes dropped because the change queue was full."""
56+
return self._dropped_changes
57+
4458
def on_change(self, fn: Callable[[T, T], None]) -> Callable[[T, T], None]:
4559
"""Register a callback for value changes. Can be used as a decorator.
4660

sdk/src/opendecree/async_watcher.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030
from opendecree._convert import typed_value_to_string
3131
from opendecree._stubs import process_get_all_response
3232
from opendecree._watcher_base import (
33+
_DEFAULT_CHANGE_QUEUE_SIZE,
3334
_RECONNECT_INITIAL,
3435
_RECONNECT_MAX,
3536
_RECONNECT_MULTIPLIER,
3637
_WatchedFieldBase,
38+
_validate_max_queue_size,
3739
)
3840
from opendecree.types import Change
3941

@@ -56,10 +58,12 @@ def __init__(
5658
type_: type[T],
5759
default: T,
5860
*,
61+
max_queue_size: int = _DEFAULT_CHANGE_QUEUE_SIZE,
5962
on_callback_error: Callable[[Exception], None] | None = None,
6063
) -> None:
64+
max_queue_size = _validate_max_queue_size(max_queue_size)
6165
super().__init__(path, type_, default, on_callback_error=on_callback_error)
62-
self._change_queue: asyncio.Queue[Change | None] = asyncio.Queue()
66+
self._change_queue: asyncio.Queue[Change | None] = asyncio.Queue(maxsize=max_queue_size)
6367

6468
@property
6569
def value(self) -> T:
@@ -84,15 +88,40 @@ def _update(self, raw_value: str | None, change: Change) -> None:
8488
"""Update the field value from a raw string. Called by the watcher task."""
8589
old, new = self._apply_raw(raw_value)
8690
self._fire_callbacks(old, new)
87-
self._change_queue.put_nowait(change)
91+
self._enqueue_change(change)
8892

8993
def _load_initial(self, raw_value: str) -> None:
9094
"""Set initial value from snapshot. No callbacks fired."""
9195
self._apply_raw(raw_value)
9296

9397
def _stop(self) -> None:
9498
"""Signal the changes() iterator to stop."""
95-
self._change_queue.put_nowait(None)
99+
self._enqueue_stop()
100+
101+
def _enqueue_change(self, change: Change) -> None:
102+
"""Queue a change, dropping the oldest queued change if the queue is full."""
103+
while True:
104+
try:
105+
self._change_queue.put_nowait(change)
106+
return
107+
except asyncio.QueueFull:
108+
try:
109+
self._change_queue.get_nowait()
110+
except asyncio.QueueEmpty:
111+
continue
112+
self._dropped_changes += 1
113+
114+
def _enqueue_stop(self) -> None:
115+
"""Queue the stop sentinel without failing on a full queue."""
116+
while True:
117+
try:
118+
self._change_queue.put_nowait(None)
119+
return
120+
except asyncio.QueueFull:
121+
try:
122+
self._change_queue.get_nowait()
123+
except asyncio.QueueEmpty:
124+
continue
96125

97126

98127
class AsyncConfigWatcher:
@@ -125,6 +154,7 @@ def field(
125154
type_: type[T],
126155
*,
127156
default: T,
157+
max_queue_size: int = _DEFAULT_CHANGE_QUEUE_SIZE,
128158
on_callback_error: Callable[[Exception], None] | None = None,
129159
) -> AsyncWatchedField[T]:
130160
"""Register a field to watch.
@@ -135,6 +165,8 @@ def field(
135165
path: Dot-separated field path (e.g., "payments.fee").
136166
type_: Python type to convert values to (str, int, float, bool, timedelta).
137167
default: Default value when the field is null or not set.
168+
max_queue_size: Maximum number of unread changes buffered before
169+
dropping the oldest queued change.
138170
on_callback_error: Optional hook called with the exception when an
139171
on_change callback raises. If not set, the exception is logged.
140172
The hook may re-raise to terminate the watcher's background task.
@@ -144,7 +176,13 @@ def field(
144176
"""
145177
if self._task is not None:
146178
raise RuntimeError("Cannot register fields after watcher has started")
147-
watched = AsyncWatchedField(path, type_, default, on_callback_error=on_callback_error)
179+
watched = AsyncWatchedField(
180+
path,
181+
type_,
182+
default,
183+
max_queue_size=max_queue_size,
184+
on_callback_error=on_callback_error,
185+
)
148186
self._fields[path] = watched
149187
return watched
150188

sdk/src/opendecree/watcher.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@
2929
from opendecree._convert import typed_value_to_string
3030
from opendecree._stubs import process_get_all_response
3131
from opendecree._watcher_base import (
32+
_DEFAULT_CHANGE_QUEUE_SIZE,
3233
_RECONNECT_INITIAL,
3334
_RECONNECT_MAX,
3435
_RECONNECT_MULTIPLIER,
3536
_WatchedFieldBase,
37+
_validate_max_queue_size,
3638
)
3739
from opendecree.types import Change
3840

@@ -55,11 +57,13 @@ def __init__(
5557
type_: type[T],
5658
default: T,
5759
*,
60+
max_queue_size: int = _DEFAULT_CHANGE_QUEUE_SIZE,
5861
on_callback_error: Callable[[Exception], None] | None = None,
5962
) -> None:
63+
max_queue_size = _validate_max_queue_size(max_queue_size)
6064
super().__init__(path, type_, default, on_callback_error=on_callback_error)
6165
self._lock = threading.Lock()
62-
self._change_queue: queue.Queue[Change] = queue.Queue()
66+
self._change_queue: queue.Queue[Change] = queue.Queue(maxsize=max_queue_size)
6367

6468
@property
6569
def value(self) -> T:
@@ -90,7 +94,7 @@ def _update(self, raw_value: str | None, change: Change) -> None:
9094
with self._lock:
9195
old, new = self._apply_raw(raw_value)
9296
self._fire_callbacks(old, new)
93-
self._change_queue.put(change)
97+
self._enqueue_change(change)
9498

9599
def _load_initial(self, raw_value: str) -> None:
96100
"""Set initial value from snapshot. No callbacks fired."""
@@ -99,7 +103,32 @@ def _load_initial(self, raw_value: str) -> None:
99103

100104
def _stop(self) -> None:
101105
"""Signal the changes() iterator to stop."""
102-
self._change_queue.put(_SENTINEL_CHANGE)
106+
self._enqueue_stop()
107+
108+
def _enqueue_change(self, change: Change) -> None:
109+
"""Queue a change, dropping the oldest queued change if the queue is full."""
110+
while True:
111+
try:
112+
self._change_queue.put_nowait(change)
113+
return
114+
except queue.Full:
115+
try:
116+
self._change_queue.get_nowait()
117+
except queue.Empty:
118+
continue
119+
self._dropped_changes += 1
120+
121+
def _enqueue_stop(self) -> None:
122+
"""Queue the stop sentinel without blocking on a full queue."""
123+
while True:
124+
try:
125+
self._change_queue.put_nowait(_SENTINEL_CHANGE)
126+
return
127+
except queue.Full:
128+
try:
129+
self._change_queue.get_nowait()
130+
except queue.Empty:
131+
continue
103132

104133

105134
# Sentinel to signal the changes() iterator to stop.
@@ -129,6 +158,7 @@ def field(
129158
type_: type[T],
130159
*,
131160
default: T,
161+
max_queue_size: int = _DEFAULT_CHANGE_QUEUE_SIZE,
132162
on_callback_error: Callable[[Exception], None] | None = None,
133163
) -> WatchedField[T]:
134164
"""Register a field to watch.
@@ -139,6 +169,8 @@ def field(
139169
path: Dot-separated field path (e.g., "payments.fee").
140170
type_: Python type to convert values to (str, int, float, bool, timedelta).
141171
default: Default value when the field is null or not set.
172+
max_queue_size: Maximum number of unread changes buffered before
173+
dropping the oldest queued change.
142174
on_callback_error: Optional hook called with the exception when an
143175
on_change callback raises. If not set, the exception is logged.
144176
The hook may re-raise to terminate the watcher's background loop.
@@ -148,7 +180,13 @@ def field(
148180
"""
149181
if self._thread is not None:
150182
raise RuntimeError("Cannot register fields after watcher has started")
151-
watched = WatchedField(path, type_, default, on_callback_error=on_callback_error)
183+
watched = WatchedField(
184+
path,
185+
type_,
186+
default,
187+
max_queue_size=max_queue_size,
188+
on_callback_error=on_callback_error,
189+
)
152190
self._fields[path] = watched
153191
return watched
154192

sdk/tests/test_async_watcher.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,25 @@ async def test_changes_iterator(self):
8585
assert collected[0].new_value == "b"
8686
assert collected[1].new_value == "c"
8787

88+
def test_bounded_queue_drops_oldest_change(self):
89+
f = AsyncWatchedField("x", int, 0, max_queue_size=2)
90+
91+
c1 = Change(field_path="x", old_value="0", new_value="1", version=1)
92+
c2 = Change(field_path="x", old_value="1", new_value="2", version=2)
93+
c3 = Change(field_path="x", old_value="2", new_value="3", version=3)
94+
95+
f._update("1", c1)
96+
f._update("2", c2)
97+
f._update("3", c3)
98+
99+
assert f.dropped_changes == 1
100+
assert f._change_queue.get_nowait().version == 2
101+
assert f._change_queue.get_nowait().version == 3
102+
103+
def test_invalid_max_queue_size_raises(self):
104+
with pytest.raises(ValueError, match="max_queue_size"):
105+
AsyncWatchedField("x", int, 0, max_queue_size=0)
106+
88107
def test_repr(self):
89108
f = AsyncWatchedField("payments.fee", float, 0.01)
90109
assert "payments.fee" in repr(f)
@@ -161,6 +180,16 @@ def test_register_field(self):
161180
assert isinstance(f, AsyncWatchedField)
162181
assert f.value == 0.01
163182

183+
def test_register_field_with_max_queue_size(self):
184+
w = self._make_watcher()
185+
f = w.field("rate", int, default=0, max_queue_size=1)
186+
187+
f._update("1", Change(field_path="rate", old_value="0", new_value="1", version=1))
188+
f._update("2", Change(field_path="rate", old_value="1", new_value="2", version=2))
189+
190+
assert f.dropped_changes == 1
191+
assert f._change_queue.get_nowait().version == 2
192+
164193
@pytest.mark.asyncio
165194
async def test_cannot_register_after_start(self):
166195
w = self._make_watcher()

sdk/tests/test_watcher.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,27 @@ def test_changes_iterator(self):
101101
assert collected[0].new_value == "b"
102102
assert collected[1].new_value == "c"
103103

104+
def test_bounded_queue_drops_oldest_change(self):
105+
f = WatchedField("x", int, 0, max_queue_size=2)
106+
107+
from opendecree.types import Change
108+
109+
c1 = Change(field_path="x", old_value="0", new_value="1", version=1)
110+
c2 = Change(field_path="x", old_value="1", new_value="2", version=2)
111+
c3 = Change(field_path="x", old_value="2", new_value="3", version=3)
112+
113+
f._update("1", c1)
114+
f._update("2", c2)
115+
f._update("3", c3)
116+
117+
assert f.dropped_changes == 1
118+
assert f._change_queue.get_nowait().version == 2
119+
assert f._change_queue.get_nowait().version == 3
120+
121+
def test_invalid_max_queue_size_raises(self):
122+
with pytest.raises(ValueError, match="max_queue_size"):
123+
WatchedField("x", int, 0, max_queue_size=0)
124+
104125
def test_repr(self):
105126
f = WatchedField("payments.fee", float, 0.01)
106127
assert "payments.fee" in repr(f)
@@ -187,6 +208,18 @@ def test_register_field(self):
187208
assert isinstance(f, WatchedField)
188209
assert f.value == 0.01
189210

211+
def test_register_field_with_max_queue_size(self):
212+
w = self._make_watcher()
213+
f = w.field("payments.fee", int, default=0, max_queue_size=1)
214+
215+
from opendecree.types import Change
216+
217+
f._update("1", Change(field_path="payments.fee", old_value="0", new_value="1", version=1))
218+
f._update("2", Change(field_path="payments.fee", old_value="1", new_value="2", version=2))
219+
220+
assert f.dropped_changes == 1
221+
assert f._change_queue.get_nowait().version == 2
222+
190223
def test_cannot_register_after_start(self):
191224
w = self._make_watcher()
192225
# Mock Subscribe to return an empty iterator.

0 commit comments

Comments
 (0)