forked from opendecree/decree-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync_watcher.py
More file actions
296 lines (250 loc) · 10.2 KB
/
async_watcher.py
File metadata and controls
296 lines (250 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
"""Asynchronous ConfigWatcher for live configuration subscriptions.
The async watcher subscribes to config changes via gRPC server-streaming
using asyncio. Registered fields are updated atomically and can be read
at any time via the .value property.
Usage::
async with AsyncConfigClient("localhost:9090", subject="myapp") as client:
async with client.watch("tenant-id") as watcher:
fee = watcher.field("payments.fee", float, default=0.01)
if fee:
print(f"Current fee: {fee.value}")
async for change in fee.changes():
print(f"{change.old_value} -> {change.new_value}")
"""
from __future__ import annotations
import asyncio
import logging
import random
import re
from collections.abc import AsyncIterator, Callable
from typing import Any, TypeVar
import grpc.aio
from opendecree._convert import typed_value_to_string
from opendecree._stubs import process_get_all_response
from opendecree._watcher_base import (
_DEFAULT_CHANGE_QUEUE_SIZE,
_RECONNECT_INITIAL,
_RECONNECT_MAX,
_RECONNECT_MULTIPLIER,
_WatchedFieldBase,
_validate_max_queue_size,
)
from opendecree.types import Change
logger = logging.getLogger("opendecree.async_watcher")
_CONTROL_CHARS_RE = re.compile(r"[^\x20-\x7E]")
T = TypeVar("T")
class AsyncWatchedField(_WatchedFieldBase[T]):
"""A live configuration field with a typed value (async variant).
Updated automatically by the watcher's asyncio task.
"""
def __init__(
self,
path: str,
type_: type[T],
default: T,
*,
max_queue_size: int = _DEFAULT_CHANGE_QUEUE_SIZE,
on_callback_error: Callable[[Exception], None] | None = None,
) -> None:
max_queue_size = _validate_max_queue_size(max_queue_size)
super().__init__(path, type_, default, on_callback_error=on_callback_error)
self._change_queue: asyncio.Queue[Change | None] = asyncio.Queue(maxsize=max_queue_size)
@property
def value(self) -> T:
"""The current value — always fresh."""
return self._value
def __repr__(self) -> str:
return f"AsyncWatchedField({self._path!r}, value={self._value!r})"
async def changes(self) -> AsyncIterator[Change]:
"""Async iterator that yields Change events for this field.
Yields Change objects until the watcher is stopped.
"""
while True:
change = await self._change_queue.get()
if change is None: # sentinel
return
yield change
def _update(self, raw_value: str | None, change: Change) -> None:
"""Update the field value from a raw string. Called by the watcher task."""
old, new = self._apply_raw(raw_value)
self._fire_callbacks(old, new)
self._enqueue_change(change)
def _load_initial(self, raw_value: str) -> None:
"""Set initial value from snapshot. No callbacks fired."""
self._apply_raw(raw_value)
def _stop(self) -> None:
"""Signal the changes() iterator to stop."""
self._enqueue_stop()
def _enqueue_change(self, change: Change) -> None:
"""Queue a change, dropping the oldest queued change if the queue is full."""
while True:
try:
self._change_queue.put_nowait(change)
return
except asyncio.QueueFull:
try:
self._change_queue.get_nowait()
except asyncio.QueueEmpty:
continue
self._dropped_changes += 1
def _enqueue_stop(self) -> None:
"""Queue the stop sentinel without failing on a full queue."""
while True:
try:
self._change_queue.put_nowait(None)
return
except asyncio.QueueFull:
try:
self._change_queue.get_nowait()
except asyncio.QueueEmpty:
continue
class AsyncConfigWatcher:
"""Watches a tenant's configuration for live changes (async variant).
Created via async_client.watch(). Use as an async context manager —
auto-starts on enter, auto-stops on exit.
"""
def __init__(
self,
stub: Any,
pb2: Any,
tenant_id: str,
timeout: float,
metadata: list[tuple[str, str]] | None = None,
) -> None:
self._stub = stub
self._pb2 = pb2
self._tenant_id = tenant_id
self._timeout = timeout
self._metadata: list[tuple[str, str]] = metadata or []
self._fields: dict[str, AsyncWatchedField] = {} # type: ignore[type-arg]
self._task: asyncio.Task | None = None # type: ignore[type-arg]
self._stopped = False
def field(
self,
path: str,
type_: type[T],
*,
default: T,
max_queue_size: int = _DEFAULT_CHANGE_QUEUE_SIZE,
on_callback_error: Callable[[Exception], None] | None = None,
) -> AsyncWatchedField[T]:
"""Register a field to watch.
Must be called before the watcher is started (before __aenter__).
Args:
path: Dot-separated field path (e.g., "payments.fee").
type_: Python type to convert values to (str, int, float, bool, timedelta).
default: Default value when the field is null or not set.
max_queue_size: Maximum number of unread changes buffered before
dropping the oldest queued change.
on_callback_error: Optional hook called with the exception when an
on_change callback raises. If not set, the exception is logged.
The hook may re-raise to terminate the watcher's background task.
Returns:
An AsyncWatchedField that tracks the live value.
"""
if self._task is not None:
raise RuntimeError("Cannot register fields after watcher has started")
watched = AsyncWatchedField(
path,
type_,
default,
max_queue_size=max_queue_size,
on_callback_error=on_callback_error,
)
self._fields[path] = watched
return watched
async def start(self) -> None:
"""Start watching — loads initial snapshot and subscribes to changes."""
if self._task is not None:
raise RuntimeError("Watcher already started")
await self._load_snapshot()
self._stopped = False
safe_id = _CONTROL_CHARS_RE.sub("", self._tenant_id)
self._task = asyncio.create_task(self._subscribe_loop(), name=f"decree-watcher-{safe_id}")
async def stop(self) -> None:
"""Stop watching and cancel the background task."""
self._stopped = True
if self._task is not None:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
for f in self._fields.values():
f._stop()
async def __aenter__(self) -> AsyncConfigWatcher:
await self.start()
return self
async def __aexit__(self, *exc: object) -> None:
await self.stop()
async def _load_snapshot(self) -> None:
"""Load the current config values as the initial snapshot."""
resp = await self._stub.GetConfig(
self._pb2.GetConfigRequest(tenant_id=self._tenant_id),
timeout=self._timeout,
metadata=self._metadata,
)
all_values = process_get_all_response(resp)
for path, watched in self._fields.items():
if path in all_values:
watched._load_initial(all_values[path])
async def _subscribe_loop(self) -> None:
"""Background task: subscribe to changes with auto-reconnect."""
backoff = _RECONNECT_INITIAL
field_paths = list(self._fields.keys())
while not self._stopped:
try:
stream = self._stub.Subscribe(
self._pb2.SubscribeRequest(
tenant_id=self._tenant_id,
field_paths=field_paths,
),
metadata=self._metadata,
)
backoff = _RECONNECT_INITIAL
async for response in stream:
if self._stopped:
return
self._process_change(response.change)
# Stream ended normally (server closed) — reconnect with backoff.
if not self._stopped:
jitter = random.uniform(0.5, 1.5)
await asyncio.sleep(backoff * jitter)
backoff = min(backoff * _RECONNECT_MULTIPLIER, _RECONNECT_MAX)
continue
except grpc.aio.AioRpcError as e:
if self._stopped:
return
code = e.code()
if code in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.INTERNAL):
jitter = random.uniform(0.5, 1.5)
sleep_time = backoff * jitter
logger.warning(
"Subscription lost (code=%s), reconnecting in %.1fs",
code,
sleep_time,
)
await asyncio.sleep(sleep_time)
backoff = min(backoff * _RECONNECT_MULTIPLIER, _RECONNECT_MAX)
else:
logger.error("Subscription failed with non-retryable error: %s", e)
return
except asyncio.CancelledError:
return
def _process_change(self, change: Any) -> None:
"""Process a single ConfigChange from the stream."""
field_path = change.field_path
watched = self._fields.get(field_path)
if watched is None:
return
old_raw = typed_value_to_string(change.old_value) if change.HasField("old_value") else None
new_raw = typed_value_to_string(change.new_value) if change.HasField("new_value") else None
sdk_change = Change(
field_path=field_path,
old_value=old_raw,
new_value=new_raw,
version=change.version,
changed_by=change.changed_by,
)
watched._update(new_raw, sdk_change)