-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathwatcher.py
More file actions
301 lines (253 loc) · 10.9 KB
/
watcher.py
File metadata and controls
301 lines (253 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
"""Synchronous ConfigWatcher for live configuration subscriptions.
The watcher runs a background thread that subscribes to config changes via
gRPC server-streaming. Registered fields are updated atomically and can be
read at any time via the .value property.
Usage::
with ConfigClient("localhost:9090", subject="myapp") as client:
with client.watch("tenant-id") as watcher:
fee = watcher.field("payments.fee", float, default=0.01)
if fee:
print(f"Current fee: {fee.value}")
"""
from __future__ import annotations
import logging
import random
import re
import threading
import time
from collections import deque
from collections.abc import Callable, Iterator
from typing import Any, TypeVar
import grpc
from opendecree._convert import typed_value_to_string
from opendecree._stubs import process_get_all_response
from opendecree._watcher_base import (
_RECONNECT_INITIAL,
_RECONNECT_MAX,
_RECONNECT_MULTIPLIER,
_WatchedFieldBase,
)
from opendecree.types import Change
logger = logging.getLogger("opendecree.watcher")
_DEFAULT_MAX_QUEUE_SIZE = 1024
_CONTROL_CHARS_RE = re.compile(r"[^\x20-\x7E]")
T = TypeVar("T")
class WatchedField(_WatchedFieldBase[T]):
"""A live, thread-safe configuration field with a typed value.
Attributes are updated automatically by the watcher's background thread.
"""
def __init__(
self,
path: str,
type_: type[T],
default: T,
*,
on_callback_error: Callable[[Exception], None] | None = None,
max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE,
) -> None:
super().__init__(path, type_, default, on_callback_error=on_callback_error)
self._lock = threading.Lock()
self._max_queue_size = max_queue_size
self._dropped_changes = 0
# _change_queue is a deque used as a bounded FIFO. _queue_cond is used
# by changes() to block when the deque is empty.
self._change_queue: deque[Change] = deque()
self._queue_cond = threading.Condition(threading.Lock())
@property
def value(self) -> T:
"""The current value — always fresh, thread-safe."""
with self._lock:
return self._value
@property
def dropped_changes(self) -> int:
"""Number of changes dropped because the queue was full."""
with self._queue_cond:
return self._dropped_changes
def __repr__(self) -> str:
return f"WatchedField({self._path!r}, value={self.value!r})"
def changes(self) -> Iterator[Change]:
"""Blocking iterator that yields Change events for this field.
Blocks until a change arrives or the watcher is stopped.
Yields Change objects with old_value and new_value as strings.
"""
while True:
with self._queue_cond:
while not self._change_queue:
self._queue_cond.wait(timeout=1.0)
change = self._change_queue.popleft()
if change is _SENTINEL_CHANGE:
return
yield change
def _update(self, raw_value: str | None, change: Change) -> None:
"""Update the field value from a raw string. Called by the watcher thread."""
with self._lock:
old, new = self._apply_raw(raw_value)
self._fire_callbacks(old, new)
with self._queue_cond:
if len(self._change_queue) >= self._max_queue_size:
self._change_queue.popleft()
self._dropped_changes += 1
logger.warning(
"WatchedField %r: change queue full (max=%d), oldest entry dropped "
"(total dropped: %d)",
self._path,
self._max_queue_size,
self._dropped_changes,
)
self._change_queue.append(change)
self._queue_cond.notify()
def _load_initial(self, raw_value: str) -> None:
"""Set initial value from snapshot. No callbacks fired."""
with self._lock:
self._apply_raw(raw_value)
def _stop(self) -> None:
"""Signal the changes() iterator to stop."""
with self._queue_cond:
self._change_queue.append(_SENTINEL_CHANGE)
self._queue_cond.notify()
# Sentinel to signal the changes() iterator to stop.
_SENTINEL_CHANGE = Change(field_path="", old_value=None, new_value=None, version=-1)
class ConfigWatcher:
"""Watches a tenant's configuration for live changes.
Created via client.watch(). Use as a context manager — auto-starts on
enter, auto-stops on exit.
"""
def __init__(self, stub: Any, pb2: Any, tenant_id: str, timeout: float) -> None:
self._stub = stub
self._pb2 = pb2
self._tenant_id = tenant_id
self._timeout = timeout
self._fields: dict[str, WatchedField] = {} # type: ignore[type-arg]
self._thread: threading.Thread | None = None
self._stream: grpc.Future | None = None
self._stop_event = threading.Event()
def field(
self,
path: str,
type_: type[T],
*,
default: T,
on_callback_error: Callable[[Exception], None] | None = None,
max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE,
) -> WatchedField[T]:
"""Register a field to watch.
Must be called before the watcher is started (before __enter__).
Args:
path: Dot-separated field path (e.g., "payments.fee").
type_: Python type to convert values to (str, int, float, bool, timedelta).
default: Default value when the field is null or not set.
on_callback_error: Optional hook called with the exception when an
on_change callback raises. If not set, the exception is logged.
The hook may re-raise to terminate the watcher's background loop.
max_queue_size: Maximum number of unread changes buffered. When the
queue is full, the oldest entry is dropped and ``dropped_changes``
is incremented. Default: 1024.
Returns:
A WatchedField that tracks the live value.
"""
if self._thread is not None:
raise RuntimeError("Cannot register fields after watcher has started")
watched = WatchedField(
path, type_, default, on_callback_error=on_callback_error, max_queue_size=max_queue_size
)
self._fields[path] = watched
return watched
def start(self) -> None:
"""Start watching — loads initial snapshot and subscribes to changes."""
if self._thread is not None:
raise RuntimeError("Watcher already started")
self._load_snapshot()
self._stop_event.clear()
safe_id = _CONTROL_CHARS_RE.sub("", self._tenant_id)
self._thread = threading.Thread(
target=self._subscribe_loop, daemon=True, name=f"decree-watcher-{safe_id}"
)
self._thread.start()
def stop(self) -> None:
"""Stop watching and clean up the background thread."""
self._stop_event.set()
if self._stream is not None:
self._stream.cancel()
if self._thread is not None:
self._thread.join(timeout=5.0)
self._thread = None
for f in self._fields.values():
f._stop()
def __enter__(self) -> ConfigWatcher:
self.start()
return self
def __exit__(self, *exc: object) -> None:
self.stop()
def _load_snapshot(self) -> None:
"""Load the current config values as the initial snapshot."""
resp = self._stub.GetConfig(
self._pb2.GetConfigRequest(tenant_id=self._tenant_id),
timeout=self._timeout,
)
all_values = process_get_all_response(resp)
for path, watched in self._fields.items():
if path in all_values:
watched._load_initial(all_values[path])
def _subscribe_loop(self) -> None:
"""Background thread: subscribe to changes with auto-reconnect."""
backoff = _RECONNECT_INITIAL
field_paths = list(self._fields.keys())
while not self._stop_event.is_set():
try:
self._stream = self._stub.Subscribe(
self._pb2.SubscribeRequest(
tenant_id=self._tenant_id,
field_paths=field_paths,
),
)
backoff = _RECONNECT_INITIAL # reset on successful connect
for response in self._stream:
if self._stop_event.is_set():
return
self._process_change(response.change)
self._stream = None
# Stream ended normally (server closed) — reconnect with backoff.
if not self._stop_event.is_set():
jitter = random.uniform(0.5, 1.5)
sleep_time = backoff * jitter
logger.warning("Stream closed by server, reconnecting in %.1fs", sleep_time)
deadline = time.monotonic() + sleep_time
while time.monotonic() < deadline and not self._stop_event.is_set():
time.sleep(0.1)
backoff = min(backoff * _RECONNECT_MULTIPLIER, _RECONNECT_MAX)
except grpc.RpcError as e:
if self._stop_event.is_set():
return
code = e.code()
if code in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.INTERNAL):
jitter = random.uniform(0.5, 1.5)
sleep_time = backoff * jitter
logger.warning(
"Subscription lost (code=%s), reconnecting in %.1fs",
code,
sleep_time,
)
# Sleep in small intervals so we can check stop_event.
deadline = time.monotonic() + sleep_time
while time.monotonic() < deadline and not self._stop_event.is_set():
time.sleep(0.1)
backoff = min(backoff * _RECONNECT_MULTIPLIER, _RECONNECT_MAX)
else:
logger.error("Subscription failed with non-retryable error: %s", e)
return
def _process_change(self, change: Any) -> None:
"""Process a single ConfigChange from the stream."""
field_path = change.field_path
watched = self._fields.get(field_path)
if watched is None:
return
old_raw = typed_value_to_string(change.old_value) if change.HasField("old_value") else None
new_raw = typed_value_to_string(change.new_value) if change.HasField("new_value") else None
sdk_change = Change(
field_path=field_path,
old_value=old_raw,
new_value=new_raw,
version=change.version,
changed_by=change.changed_by,
)
watched._update(new_raw, sdk_change)