Skip to content

Commit 364c38b

Browse files
zeevdrclaude
andauthored
feat(watcher): add on_callback_error hook to WatchedField and AsyncWatchedField (#97)
Previously, on_change callback exceptions were silently swallowed and only logged. Users had no way to opt into fail-loud behavior or custom error handling. Add on_callback_error: Callable[[Exception], None] | None = None to _WatchedFieldBase.__init__, WatchedField.__init__, and AsyncWatchedField.__init__. Thread the parameter through both ConfigWatcher.field() and AsyncConfigWatcher.field() so it's accessible at the call site. When set, the hook is called with the exception instead of logging. If the hook re-raises, the watcher loop terminates. Closes #61 Co-authored-by: Claude <noreply@anthropic.com>
1 parent 48ac008 commit 364c38b

5 files changed

Lines changed: 140 additions & 13 deletions

File tree

sdk/src/opendecree/_watcher_base.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,21 @@
2020
class _WatchedFieldBase(Generic[T]):
2121
"""Common state and helpers shared by WatchedField and AsyncWatchedField."""
2222

23-
def __init__(self, path: str, type_: type[T], default: T) -> None:
23+
def __init__(
24+
self,
25+
path: str,
26+
type_: type[T],
27+
default: T,
28+
*,
29+
on_callback_error: Callable[[Exception], None] | None = None,
30+
) -> None:
2431
self._path = path
2532
self._type = type_
2633
self._default = default
2734
self._value: T = default
2835
self._is_set = False
2936
self._callbacks: list[Callable[[T, T], None]] = []
37+
self._on_callback_error = on_callback_error
3038

3139
@property
3240
def path(self) -> str:
@@ -62,5 +70,8 @@ def _fire_callbacks(self, old: T, new: T) -> None:
6270
for cb in self._callbacks:
6371
try:
6472
cb(old, new)
65-
except Exception:
66-
_logger.exception("Error in on_change callback for %s", self._path)
73+
except Exception as exc:
74+
if self._on_callback_error is not None:
75+
self._on_callback_error(exc)
76+
else:
77+
_logger.exception("Error in on_change callback for %s", self._path)

sdk/src/opendecree/async_watcher.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import asyncio
2222
import logging
2323
import random
24-
from collections.abc import AsyncIterator
24+
from collections.abc import AsyncIterator, Callable
2525
from typing import Any, TypeVar
2626

2727
import grpc.aio
@@ -47,8 +47,15 @@ class AsyncWatchedField(_WatchedFieldBase[T]):
4747
Updated automatically by the watcher's asyncio task.
4848
"""
4949

50-
def __init__(self, path: str, type_: type[T], default: T) -> None:
51-
super().__init__(path, type_, default)
50+
def __init__(
51+
self,
52+
path: str,
53+
type_: type[T],
54+
default: T,
55+
*,
56+
on_callback_error: Callable[[Exception], None] | None = None,
57+
) -> None:
58+
super().__init__(path, type_, default, on_callback_error=on_callback_error)
5259
self._change_queue: asyncio.Queue[Change | None] = asyncio.Queue()
5360

5461
@property
@@ -109,7 +116,14 @@ def __init__(
109116
self._task: asyncio.Task | None = None # type: ignore[type-arg]
110117
self._stopped = False
111118

112-
def field(self, path: str, type_: type[T], *, default: T) -> AsyncWatchedField[T]:
119+
def field(
120+
self,
121+
path: str,
122+
type_: type[T],
123+
*,
124+
default: T,
125+
on_callback_error: Callable[[Exception], None] | None = None,
126+
) -> AsyncWatchedField[T]:
113127
"""Register a field to watch.
114128
115129
Must be called before the watcher is started (before __aenter__).
@@ -118,13 +132,16 @@ def field(self, path: str, type_: type[T], *, default: T) -> AsyncWatchedField[T
118132
path: Dot-separated field path (e.g., "payments.fee").
119133
type_: Python type to convert values to (str, int, float, bool, timedelta).
120134
default: Default value when the field is null or not set.
135+
on_callback_error: Optional hook called with the exception when an
136+
on_change callback raises. If not set, the exception is logged.
137+
The hook may re-raise to terminate the watcher's background task.
121138
122139
Returns:
123140
An AsyncWatchedField that tracks the live value.
124141
"""
125142
if self._task is not None:
126143
raise RuntimeError("Cannot register fields after watcher has started")
127-
watched = AsyncWatchedField(path, type_, default)
144+
watched = AsyncWatchedField(path, type_, default, on_callback_error=on_callback_error)
128145
self._fields[path] = watched
129146
return watched
130147

sdk/src/opendecree/watcher.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import random
2121
import threading
2222
import time
23-
from collections.abc import Iterator
23+
from collections.abc import Callable, Iterator
2424
from typing import Any, TypeVar
2525

2626
import grpc
@@ -46,8 +46,15 @@ class WatchedField(_WatchedFieldBase[T]):
4646
Attributes are updated automatically by the watcher's background thread.
4747
"""
4848

49-
def __init__(self, path: str, type_: type[T], default: T) -> None:
50-
super().__init__(path, type_, default)
49+
def __init__(
50+
self,
51+
path: str,
52+
type_: type[T],
53+
default: T,
54+
*,
55+
on_callback_error: Callable[[Exception], None] | None = None,
56+
) -> None:
57+
super().__init__(path, type_, default, on_callback_error=on_callback_error)
5158
self._lock = threading.Lock()
5259
self._change_queue: queue.Queue[Change] = queue.Queue()
5360

@@ -113,7 +120,14 @@ def __init__(self, stub: Any, pb2: Any, tenant_id: str, timeout: float) -> None:
113120
self._stream: grpc.Future | None = None
114121
self._stop_event = threading.Event()
115122

116-
def field(self, path: str, type_: type[T], *, default: T) -> WatchedField[T]:
123+
def field(
124+
self,
125+
path: str,
126+
type_: type[T],
127+
*,
128+
default: T,
129+
on_callback_error: Callable[[Exception], None] | None = None,
130+
) -> WatchedField[T]:
117131
"""Register a field to watch.
118132
119133
Must be called before the watcher is started (before __enter__).
@@ -122,13 +136,16 @@ def field(self, path: str, type_: type[T], *, default: T) -> WatchedField[T]:
122136
path: Dot-separated field path (e.g., "payments.fee").
123137
type_: Python type to convert values to (str, int, float, bool, timedelta).
124138
default: Default value when the field is null or not set.
139+
on_callback_error: Optional hook called with the exception when an
140+
on_change callback raises. If not set, the exception is logged.
141+
The hook may re-raise to terminate the watcher's background loop.
125142
126143
Returns:
127144
A WatchedField that tracks the live value.
128145
"""
129146
if self._thread is not None:
130147
raise RuntimeError("Cannot register fields after watcher has started")
131-
watched = WatchedField(path, type_, default)
148+
watched = WatchedField(path, type_, default, on_callback_error=on_callback_error)
132149
self._fields[path] = watched
133150
return watched
134151

sdk/tests/test_async_watcher.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,45 @@ def bad_cb(old: int, new: int) -> None:
101101
f._update("2", change) # should not raise
102102
assert f.value == 2
103103

104+
def test_on_callback_error_hook_is_called(self):
105+
errors: list[Exception] = []
106+
f = AsyncWatchedField("x", int, 0, on_callback_error=errors.append)
107+
f._load_initial("1")
108+
109+
@f.on_change
110+
def bad_cb(old: int, new: int) -> None:
111+
raise ValueError("boom")
112+
113+
change = Change(field_path="x", old_value="1", new_value="2", version=1)
114+
f._update("2", change)
115+
116+
assert len(errors) == 1
117+
assert isinstance(errors[0], ValueError)
118+
assert str(errors[0]) == "boom"
119+
assert f.value == 2
120+
121+
def test_on_callback_error_hook_via_field_method(self):
122+
errors: list[Exception] = []
123+
stub = MagicMock()
124+
pb2 = MagicMock()
125+
mock_resp = MagicMock()
126+
mock_resp.config.values = []
127+
stub.GetConfig = AsyncMock(return_value=mock_resp)
128+
129+
w = AsyncConfigWatcher(stub, pb2, "t1", timeout=5.0)
130+
f = w.field("x", int, default=0, on_callback_error=errors.append)
131+
f._load_initial("1")
132+
133+
@f.on_change
134+
def bad_cb(old: int, new: int) -> None:
135+
raise RuntimeError("fail")
136+
137+
change = Change(field_path="x", old_value="1", new_value="2", version=1)
138+
f._update("2", change)
139+
140+
assert len(errors) == 1
141+
assert isinstance(errors[0], RuntimeError)
142+
104143

105144
# --- AsyncConfigWatcher unit tests ---
106145

sdk/tests/test_watcher.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,49 @@ def bad_cb(old: int, new: int) -> None:
121121
f._update("2", change)
122122
assert f.value == 2
123123

124+
def test_on_callback_error_hook_is_called(self):
125+
errors: list[Exception] = []
126+
f = WatchedField("x", int, 0, on_callback_error=errors.append)
127+
f._load_initial("1")
128+
129+
@f.on_change
130+
def bad_cb(old: int, new: int) -> None:
131+
raise ValueError("boom")
132+
133+
from opendecree.types import Change
134+
135+
change = Change(field_path="x", old_value="1", new_value="2", version=1)
136+
f._update("2", change)
137+
138+
assert len(errors) == 1
139+
assert isinstance(errors[0], ValueError)
140+
assert str(errors[0]) == "boom"
141+
assert f.value == 2
142+
143+
def test_on_callback_error_hook_via_field_method(self):
144+
errors: list[Exception] = []
145+
stub = MagicMock()
146+
pb2 = MagicMock()
147+
mock_resp = MagicMock()
148+
mock_resp.config.values = []
149+
stub.GetConfig.return_value = mock_resp
150+
151+
w = ConfigWatcher(stub, pb2, "t1", timeout=5.0)
152+
f = w.field("x", int, default=0, on_callback_error=errors.append)
153+
f._load_initial("1")
154+
155+
@f.on_change
156+
def bad_cb(old: int, new: int) -> None:
157+
raise RuntimeError("fail")
158+
159+
from opendecree.types import Change
160+
161+
change = Change(field_path="x", old_value="1", new_value="2", version=1)
162+
f._update("2", change)
163+
164+
assert len(errors) == 1
165+
assert isinstance(errors[0], RuntimeError)
166+
124167

125168
# --- ConfigWatcher unit tests ---
126169

0 commit comments

Comments
 (0)