Skip to content

Commit 2af9207

Browse files
zeevdrclaude
andcommitted
refactor(watcher): extract _WatchedFieldBase to consolidate sync and async field logic
The two WatchedField classes shared ~90% of their implementation but had no common base, causing drift (the sync version acquired a lock in _update but then read _value outside it for the callback; the async version had no lock at all). Extract _WatchedFieldBase with _apply_raw and _fire_callbacks helpers. Both subclasses call super().__init__, inherit path/on_change/__bool__, and delegate value-mutation to the base—with or without a lock as appropriate. The three _RECONNECT_* constants are also consolidated in the new module. Closes #60 Co-Authored-By: Claude <noreply@anthropic.com>
1 parent ad823bc commit 2af9207

3 files changed

Lines changed: 95 additions & 104 deletions

File tree

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
"""Shared base class for WatchedField and AsyncWatchedField."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
from collections.abc import Callable
7+
from typing import Generic, TypeVar
8+
9+
from opendecree._convert import convert_value
10+
11+
T = TypeVar("T")
12+
13+
_logger = logging.getLogger("opendecree.watcher")
14+
15+
_RECONNECT_INITIAL = 1.0
16+
_RECONNECT_MAX = 30.0
17+
_RECONNECT_MULTIPLIER = 2.0
18+
19+
20+
class _WatchedFieldBase(Generic[T]):
21+
"""Common state and helpers shared by WatchedField and AsyncWatchedField."""
22+
23+
def __init__(self, path: str, type_: type[T], default: T) -> None:
24+
self._path = path
25+
self._type = type_
26+
self._default = default
27+
self._value: T = default
28+
self._is_set = False
29+
self._callbacks: list[Callable[[T, T], None]] = []
30+
31+
@property
32+
def path(self) -> str:
33+
"""The field path this value tracks."""
34+
return self._path
35+
36+
def on_change(self, fn: Callable[[T, T], None]) -> Callable[[T, T], None]:
37+
"""Register a callback for value changes. Can be used as a decorator.
38+
39+
The callback receives (old_value, new_value).
40+
"""
41+
self._callbacks.append(fn)
42+
return fn
43+
44+
def __bool__(self) -> bool:
45+
"""Truthy based on the current value. False for False, 0, '', None."""
46+
return bool(self._value)
47+
48+
def _apply_raw(self, raw_value: str | None) -> tuple[T, T]:
49+
"""Set _value/_is_set from a raw string. Returns (old, new). Caller must lock if needed."""
50+
old = self._value
51+
if raw_value is not None:
52+
self._value = convert_value(raw_value, self._type) # type: ignore[assignment]
53+
self._is_set = True
54+
else:
55+
self._value = self._default
56+
self._is_set = False
57+
return old, self._value
58+
59+
def _fire_callbacks(self, old: T, new: T) -> None:
60+
"""Invoke registered callbacks when the value changes."""
61+
if old != new:
62+
for cb in self._callbacks:
63+
try:
64+
cb(old, new)
65+
except Exception:
66+
_logger.exception("Error in on_change callback for %s", self._path)

sdk/src/opendecree/async_watcher.py

Lines changed: 15 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -21,66 +21,44 @@
2121
import asyncio
2222
import logging
2323
import random
24-
from collections.abc import AsyncIterator, Callable
25-
from typing import Any, Generic, TypeVar
24+
from collections.abc import AsyncIterator
25+
from typing import Any, TypeVar
2626

2727
import grpc.aio
2828

29-
from opendecree._convert import convert_value, typed_value_to_string
29+
from opendecree._convert import typed_value_to_string
3030
from opendecree._stubs import process_get_all_response
31+
from opendecree._watcher_base import (
32+
_RECONNECT_INITIAL,
33+
_RECONNECT_MAX,
34+
_RECONNECT_MULTIPLIER,
35+
_WatchedFieldBase,
36+
)
3137
from opendecree.types import Change
3238

3339
logger = logging.getLogger("opendecree.async_watcher")
3440

3541
T = TypeVar("T")
3642

37-
# Default reconnect backoff parameters.
38-
_RECONNECT_INITIAL = 1.0
39-
_RECONNECT_MAX = 30.0
40-
_RECONNECT_MULTIPLIER = 2.0
4143

42-
43-
class AsyncWatchedField(Generic[T]):
44-
"""A live, thread-safe configuration field with a typed value (async variant).
44+
class AsyncWatchedField(_WatchedFieldBase[T]):
45+
"""A live configuration field with a typed value (async variant).
4546
4647
Updated automatically by the watcher's asyncio task.
4748
"""
4849

4950
def __init__(self, path: str, type_: type[T], default: T) -> None:
50-
self._path = path
51-
self._type = type_
52-
self._default = default
53-
self._value: T = default
54-
self._is_set = False
55-
self._callbacks: list[Callable[[T, T], None]] = []
51+
super().__init__(path, type_, default)
5652
self._change_queue: asyncio.Queue[Change | None] = asyncio.Queue()
5753

58-
@property
59-
def path(self) -> str:
60-
"""The field path this value tracks."""
61-
return self._path
62-
6354
@property
6455
def value(self) -> T:
6556
"""The current value — always fresh."""
6657
return self._value
6758

68-
def __bool__(self) -> bool:
69-
"""Truthy based on the current value. False for False, 0, '', None."""
70-
return bool(self._value)
71-
7259
def __repr__(self) -> str:
7360
return f"AsyncWatchedField({self._path!r}, value={self._value!r})"
7461

75-
def on_change(self, fn: Callable[[T, T], None]) -> Callable[[T, T], None]:
76-
"""Register a callback for value changes. Can be used as a decorator.
77-
78-
The callback receives (old_value, new_value) and is called from the
79-
watcher's asyncio task.
80-
"""
81-
self._callbacks.append(fn)
82-
return fn
83-
8462
async def changes(self) -> AsyncIterator[Change]:
8563
"""Async iterator that yields Change events for this field.
8664
@@ -94,28 +72,13 @@ async def changes(self) -> AsyncIterator[Change]:
9472

9573
def _update(self, raw_value: str | None, change: Change) -> None:
9674
"""Update the field value from a raw string. Called by the watcher task."""
97-
old = self._value
98-
if raw_value is not None:
99-
self._value = convert_value(raw_value, self._type) # type: ignore[assignment]
100-
self._is_set = True
101-
else:
102-
self._value = self._default
103-
self._is_set = False
104-
105-
new = self._value
106-
if old != new:
107-
for cb in self._callbacks:
108-
try:
109-
cb(old, new)
110-
except Exception:
111-
logger.exception("Error in on_change callback for %s", self._path)
112-
75+
old, new = self._apply_raw(raw_value)
76+
self._fire_callbacks(old, new)
11377
self._change_queue.put_nowait(change)
11478

11579
def _load_initial(self, raw_value: str) -> None:
11680
"""Set initial value from snapshot. No callbacks fired."""
117-
self._value = convert_value(raw_value, self._type) # type: ignore[assignment]
118-
self._is_set = True
81+
self._apply_raw(raw_value)
11982

12083
def _stop(self) -> None:
12184
"""Signal the changes() iterator to stop."""

sdk/src/opendecree/watcher.py

Lines changed: 14 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,68 +20,46 @@
2020
import random
2121
import threading
2222
import time
23-
from collections.abc import Callable, Iterator
24-
from typing import Any, Generic, TypeVar
23+
from collections.abc import Iterator
24+
from typing import Any, TypeVar
2525

2626
import grpc
2727

28-
from opendecree._convert import convert_value, typed_value_to_string
28+
from opendecree._convert import typed_value_to_string
2929
from opendecree._stubs import process_get_all_response
30+
from opendecree._watcher_base import (
31+
_RECONNECT_INITIAL,
32+
_RECONNECT_MAX,
33+
_RECONNECT_MULTIPLIER,
34+
_WatchedFieldBase,
35+
)
3036
from opendecree.types import Change
3137

3238
logger = logging.getLogger("opendecree.watcher")
3339

3440
T = TypeVar("T")
3541

36-
# Default reconnect backoff parameters.
37-
_RECONNECT_INITIAL = 1.0
38-
_RECONNECT_MAX = 30.0
39-
_RECONNECT_MULTIPLIER = 2.0
4042

41-
42-
class WatchedField(Generic[T]):
43+
class WatchedField(_WatchedFieldBase[T]):
4344
"""A live, thread-safe configuration field with a typed value.
4445
4546
Attributes are updated automatically by the watcher's background thread.
4647
"""
4748

4849
def __init__(self, path: str, type_: type[T], default: T) -> None:
49-
self._path = path
50-
self._type = type_
51-
self._default = default
52-
self._value: T = default
53-
self._is_set = False
50+
super().__init__(path, type_, default)
5451
self._lock = threading.Lock()
55-
self._callbacks: list[Callable[[T, T], None]] = []
5652
self._change_queue: queue.Queue[Change] = queue.Queue()
5753

58-
@property
59-
def path(self) -> str:
60-
"""The field path this value tracks."""
61-
return self._path
62-
6354
@property
6455
def value(self) -> T:
6556
"""The current value — always fresh, thread-safe."""
6657
with self._lock:
6758
return self._value
6859

69-
def __bool__(self) -> bool:
70-
"""Truthy based on the current value. False for False, 0, '', None."""
71-
return bool(self.value)
72-
7360
def __repr__(self) -> str:
7461
return f"WatchedField({self._path!r}, value={self.value!r})"
7562

76-
def on_change(self, fn: Callable[[T, T], None]) -> Callable[[T, T], None]:
77-
"""Register a callback for value changes. Can be used as a decorator.
78-
79-
The callback receives (old_value, new_value) and is called from the
80-
watcher's background thread.
81-
"""
82-
self._callbacks.append(fn)
83-
return fn
84-
8563
def changes(self) -> Iterator[Change]:
8664
"""Blocking iterator that yields Change events for this field.
8765
@@ -100,30 +78,14 @@ def changes(self) -> Iterator[Change]:
10078
def _update(self, raw_value: str | None, change: Change) -> None:
10179
"""Update the field value from a raw string. Called by the watcher thread."""
10280
with self._lock:
103-
old = self._value
104-
if raw_value is not None:
105-
self._value = convert_value(raw_value, self._type) # type: ignore[assignment]
106-
self._is_set = True
107-
else:
108-
self._value = self._default
109-
self._is_set = False
110-
111-
# Notify callbacks (outside the lock to avoid deadlocks).
112-
new = self._value
113-
if old != new:
114-
for cb in self._callbacks:
115-
try:
116-
cb(old, new)
117-
except Exception:
118-
logger.exception("Error in on_change callback for %s", self._path)
119-
81+
old, new = self._apply_raw(raw_value)
82+
self._fire_callbacks(old, new)
12083
self._change_queue.put(change)
12184

12285
def _load_initial(self, raw_value: str) -> None:
12386
"""Set initial value from snapshot. No callbacks fired."""
12487
with self._lock:
125-
self._value = convert_value(raw_value, self._type) # type: ignore[assignment]
126-
self._is_set = True
88+
self._apply_raw(raw_value)
12789

12890
def _stop(self) -> None:
12991
"""Signal the changes() iterator to stop."""

0 commit comments

Comments
 (0)