-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathclient.py
More file actions
423 lines (353 loc) · 15.1 KB
/
client.py
File metadata and controls
423 lines (353 loc) · 15.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
"""Synchronous ConfigClient for OpenDecree.
The ConfigClient wraps the gRPC ConfigService with a Pythonic API:
- Overloaded get() for typed reads (str by default, or int/float/bool/timedelta)
- Context manager for clean channel lifecycle
- watch() factory for live config subscriptions (Phase 4)
- Automatic retry with exponential backoff
All writes send string values — the server coerces to the schema-defined type.
"""
from __future__ import annotations
import warnings
from datetime import timedelta
from typing import TYPE_CHECKING, overload
if TYPE_CHECKING:
from opendecree.watcher import ConfigWatcher
import grpc
from opendecree._channel import create_channel
from opendecree._compat import check_version_compatible, fetch_server_version
from opendecree._interceptors import AuthInterceptor, _build_metadata
from opendecree._retry import RetryConfig, with_retry, write_safe_config
from opendecree._stubs import (
ensure_stubs,
make_string_typed_value,
process_get_all_response,
process_get_response,
)
from opendecree.errors import map_grpc_error
from opendecree.types import FieldUpdate, ServerVersion
class ConfigClient:
"""Synchronous client for reading and writing OpenDecree configuration values.
Use as a context manager for clean channel lifecycle::
with ConfigClient("localhost:9090", subject="myapp") as client:
val = client.get("tenant-id", "payments.fee")
retries = client.get("tenant-id", "payments.retries", int)
"""
def __init__(
self,
target: str,
*,
subject: str | None = None,
role: str = "superadmin",
tenant_id: str | None = None,
token: str | None = None,
insecure: bool = True,
credentials: grpc.ChannelCredentials | None = None,
timeout: float = 10.0,
retry: RetryConfig | None = None,
) -> None:
"""Create a new ConfigClient.
Args:
target: gRPC server address (e.g., ``"localhost:9090"``).
subject: Identity for ``x-subject`` metadata header.
role: Role for ``x-role`` metadata header. Defaults to ``"superadmin"``.
tenant_id: Default tenant for ``x-tenant-id`` metadata header.
token: Bearer token. When set, metadata headers are not sent.
On a TLS channel the token is embedded via
``composite_channel_credentials`` and protected by the TLS
layer. On an insecure channel it travels in cleartext and a
``UserWarning`` is raised — prefer TLS in production.
insecure: Use plaintext (no TLS). Defaults to True for local dev.
Do not combine with *token* in production.
credentials: TLS channel credentials. Overrides *insecure*.
timeout: Default per-RPC timeout in seconds. Defaults to 10.
retry: Retry configuration. Defaults to ``RetryConfig()``.
Pass ``None`` to disable retry.
"""
self._timeout = timeout
self._retry = retry if retry is not None else RetryConfig()
tls_active = credentials is not None or not insecure
if token and not tls_active:
warnings.warn(
"Bearer token sent over insecure channel without TLS — "
"the token will be transmitted in cleartext. "
"Set insecure=False or provide credentials in production.",
UserWarning,
stacklevel=2,
)
# On a TLS channel, embed the token via composite_channel_credentials
# (protected by TLS). On an insecure channel, fall back to raw header.
channel_token = token if tls_active else None
metadata_token = token if not tls_active else None
metadata = _build_metadata(
subject=subject, role=role, tenant_id=tenant_id, token=metadata_token
)
interceptors: list[grpc.UnaryUnaryClientInterceptor] = []
if metadata:
interceptors.append(AuthInterceptor(metadata))
channel = create_channel(
target, insecure=insecure, credentials=credentials, token=channel_token
)
if interceptors:
self._channel = grpc.intercept_channel(channel, *interceptors)
else:
self._channel = channel
self._raw_channel = channel # keep ref for close()
cs_pb2, cs_grpc = ensure_stubs()
self._stub = cs_grpc.ConfigServiceStub(self._channel)
self._pb2 = cs_pb2
from opendecree._generated.centralconfig.v1 import (
server_service_pb2,
server_service_pb2_grpc,
)
self._version_stub = server_service_pb2_grpc.ServerServiceStub(self._channel)
self._version_pb2 = server_service_pb2
self._server_version: ServerVersion | None = None
def close(self) -> None:
"""Close the underlying gRPC channel."""
self._raw_channel.close()
def __enter__(self) -> ConfigClient:
return self
def __exit__(self, *exc: object) -> None:
self.close()
def get_server_version(self) -> ServerVersion:
"""Fetch the server's version, cached after first call.
Returns:
ServerVersion with version and commit strings.
Raises:
UnavailableError: If the server is unreachable.
"""
if self._server_version is None:
self._server_version = fetch_server_version(
self._version_stub, self._version_pb2, self._timeout
)
return self._server_version
@property
def server_version(self) -> ServerVersion:
"""Deprecated. Use :meth:`get_server_version` instead."""
warnings.warn(
"server_version property is deprecated; use get_server_version() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.get_server_version()
def check_compatibility(self) -> None:
"""Check that the server version is compatible with this SDK.
Fetches the server version (cached) and compares it against
``opendecree.SUPPORTED_SERVER_VERSION``.
Raises:
IncompatibleServerError: If the server version is outside the
supported range.
UnavailableError: If the server is unreachable.
"""
check_version_compatible(self.get_server_version().version)
# --- get() with @overload for type safety ---
@overload
def get(self, tenant_id: str, field_path: str) -> str: ...
@overload
def get(self, tenant_id: str, field_path: str, value_type: type[bool]) -> bool: ...
@overload
def get(self, tenant_id: str, field_path: str, value_type: type[int]) -> int: ...
@overload
def get(self, tenant_id: str, field_path: str, value_type: type[float]) -> float: ...
@overload
def get(self, tenant_id: str, field_path: str, value_type: type[timedelta]) -> timedelta: ...
@overload
def get(
self,
tenant_id: str,
field_path: str,
value_type: type[str],
*,
nullable: bool,
) -> str | None: ...
def get(
self,
tenant_id: str,
field_path: str,
value_type: type | None = None,
*,
nullable: bool = False,
) -> object:
"""Get a config value, optionally converting to a specific type.
Without a type argument, returns the raw string value.
With a type argument, converts and returns the typed value.
Args:
tenant_id: Tenant UUID.
field_path: Dot-separated field path (e.g., "payments.fee").
value_type: Target type (str, int, float, bool, timedelta). Defaults to str.
nullable: If True, return None for null/unset values instead of raising.
Returns:
The config value, converted to the requested type.
Raises:
NotFoundError: If the field has no value (and nullable is False).
TypeMismatchError: If the value cannot be converted to the requested type.
"""
target_type = value_type or str
def _call() -> object:
resp = self._stub.GetField(
self._pb2.GetFieldRequest(tenant_id=tenant_id, field_path=field_path),
timeout=self._timeout,
)
return process_get_response(resp, target_type, field_path, tenant_id, nullable)
try:
return with_retry(self._retry, _call)
except grpc.RpcError as e:
raise map_grpc_error(e) from e
def get_all(self, tenant_id: str) -> dict[str, str]:
"""Get all config values for a tenant.
Args:
tenant_id: Tenant UUID.
Returns:
A dict mapping field paths to their string values.
Raises:
NotFoundError: If the tenant does not exist.
"""
def _call() -> dict[str, str]:
resp = self._stub.GetConfig(
self._pb2.GetConfigRequest(tenant_id=tenant_id),
timeout=self._timeout,
)
return process_get_all_response(resp)
try:
return with_retry(self._retry, _call)
except grpc.RpcError as e:
raise map_grpc_error(e) from e
def set(
self,
tenant_id: str,
field_path: str,
value: str,
*,
description: str | None = None,
value_description: str | None = None,
expected_checksum: str | None = None,
idempotency_key: str | None = None,
) -> None:
"""Set a config value.
The value is sent as a string — the server coerces it to the
schema-defined type (integer, bool, etc.).
Args:
tenant_id: Tenant UUID.
field_path: Dot-separated field path (e.g., ``"payments.fee"``).
value: The value as a string.
description: Optional version-level description for the audit log.
value_description: Optional description stored with this specific value.
expected_checksum: When set, the server rejects the write if the
current value's checksum does not match (optimistic concurrency).
idempotency_key: When provided, the request is retried on
``DEADLINE_EXCEEDED`` in addition to ``UNAVAILABLE``. Use only
when the write is safe to apply more than once (e.g., the value
is a known constant and a duplicate apply is harmless). Without
this key, writes are only retried on ``UNAVAILABLE`` to avoid
double-applying after a server-side timeout.
Raises:
NotFoundError: If the field does not exist in the schema.
LockedError: If the field is locked.
InvalidArgumentError: If the value fails validation.
ChecksumMismatchError: If ``expected_checksum`` is set and does not match.
"""
retry_cfg = self._retry if idempotency_key is not None else write_safe_config(self._retry)
def _call() -> None:
self._stub.SetField(
self._pb2.SetFieldRequest(
tenant_id=tenant_id,
field_path=field_path,
value=make_string_typed_value(value),
description=description,
value_description=value_description,
expected_checksum=expected_checksum,
),
timeout=self._timeout,
)
try:
with_retry(retry_cfg, _call)
except grpc.RpcError as e:
raise map_grpc_error(e) from e
def set_many(
self,
tenant_id: str,
updates: list[FieldUpdate],
*,
description: str | None = None,
idempotency_key: str | None = None,
) -> None:
"""Atomically set multiple config values.
Args:
tenant_id: Tenant UUID.
updates: List of :class:`FieldUpdate` objects, each carrying a
field path, value, and optional per-field metadata.
description: Optional version-level description for the audit log.
idempotency_key: When provided, the request is retried on
``DEADLINE_EXCEEDED`` in addition to ``UNAVAILABLE``. See
``set()`` for details on retry semantics.
Raises:
NotFoundError: If a field does not exist in the schema.
LockedError: If any field is locked.
InvalidArgumentError: If any value fails validation.
ChecksumMismatchError: If any ``expected_checksum`` does not match.
"""
retry_cfg = self._retry if idempotency_key is not None else write_safe_config(self._retry)
def _call() -> None:
proto_updates = [
self._pb2.FieldUpdate(
field_path=u.field_path,
value=make_string_typed_value(u.value),
expected_checksum=u.expected_checksum,
value_description=u.value_description,
)
for u in updates
]
self._stub.SetFields(
self._pb2.SetFieldsRequest(
tenant_id=tenant_id,
updates=proto_updates,
description=description,
),
timeout=self._timeout,
)
try:
with_retry(retry_cfg, _call)
except grpc.RpcError as e:
raise map_grpc_error(e) from e
def set_null(
self,
tenant_id: str,
field_path: str,
*,
idempotency_key: str | None = None,
) -> None:
"""Set a config field to null.
Args:
tenant_id: Tenant UUID.
field_path: Dot-separated field path.
idempotency_key: When provided, the request is retried on
``DEADLINE_EXCEEDED`` in addition to ``UNAVAILABLE``. See
``set()`` for details on retry semantics.
Raises:
NotFoundError: If the field does not exist in the schema.
LockedError: If the field is locked.
"""
retry_cfg = self._retry if idempotency_key is not None else write_safe_config(self._retry)
def _call() -> None:
self._stub.SetField(
self._pb2.SetFieldRequest(
tenant_id=tenant_id,
field_path=field_path,
# No value field → server interprets as null.
),
timeout=self._timeout,
)
try:
with_retry(retry_cfg, _call)
except grpc.RpcError as e:
raise map_grpc_error(e) from e
def watch(self, tenant_id: str) -> ConfigWatcher:
"""Create a config watcher for a tenant.
Use as a context manager — auto-starts on enter, auto-stops on exit::
with client.watch("tenant-id") as watcher:
fee = watcher.field("payments.fee", float, default=0.01)
print(fee.value)
The watcher uses the client's gRPC channel and auth settings.
"""
from opendecree.watcher import ConfigWatcher
return ConfigWatcher(self._stub, self._pb2, tenant_id, self._timeout)