Skip to content

Commit 2ab4ecc

Browse files
authored
Add typed gRPC exceptions and capture trailing metadata
Add TimeoutError, ResourceExhaustedError, CancelledError, and UnimplementedError to the gRPC status code map so callers can catch by type instead of inspecting .code. DecreeError now captures trailing_metadata from the RpcError and exposes a retry_after timedelta parsed from google.rpc.RetryInfo packed in the grpc-status-details-bin trailer. Closes #54
1 parent 4f712fe commit 2ab4ecc

5 files changed

Lines changed: 141 additions & 10 deletions

File tree

sdk/pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ ignore_errors = true
7575
module = "grpc.*"
7676
ignore_missing_imports = true
7777

78+
[[tool.mypy.overrides]]
79+
module = "google.rpc.*"
80+
ignore_missing_imports = true
81+
7882
[tool.pytest.ini_options]
7983
testpaths = ["tests"]
8084
asyncio_mode = "auto"

sdk/src/opendecree/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,19 @@
1414
from opendecree.client import ConfigClient
1515
from opendecree.errors import (
1616
AlreadyExistsError,
17+
CancelledError,
1718
ChecksumMismatchError,
1819
DecreeError,
1920
IncompatibleServerError,
2021
InvalidArgumentError,
2122
LockedError,
2223
NotFoundError,
2324
PermissionDeniedError,
25+
ResourceExhaustedError,
26+
TimeoutError,
2427
TypeMismatchError,
2528
UnavailableError,
29+
UnimplementedError,
2630
)
2731
from opendecree.types import Change, ConfigValue, FieldUpdate, ServerVersion
2832
from opendecree.watcher import ConfigWatcher, WatchedField
@@ -35,6 +39,7 @@
3539
"AsyncConfigClient",
3640
"AsyncConfigWatcher",
3741
"AsyncWatchedField",
42+
"CancelledError",
3843
"Change",
3944
"ChecksumMismatchError",
4045
"ConfigClient",
@@ -47,10 +52,13 @@
4752
"LockedError",
4853
"NotFoundError",
4954
"PermissionDeniedError",
55+
"ResourceExhaustedError",
5056
"RetryConfig",
5157
"ServerVersion",
58+
"TimeoutError",
5259
"TypeMismatchError",
5360
"UnavailableError",
61+
"UnimplementedError",
5462
"WatchedField",
5563
"__version__",
5664
]

sdk/src/opendecree/errors.py

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,28 @@
55

66
from __future__ import annotations
77

8+
import datetime
9+
810
import grpc
11+
import grpc.aio
12+
from google.rpc import error_details_pb2, status_pb2
913

1014

1115
class DecreeError(Exception):
1216
"""Base exception for all OpenDecree SDK errors."""
1317

14-
def __init__(self, message: str, code: grpc.StatusCode | None = None) -> None:
15-
"""Create a DecreeError.
16-
17-
Args:
18-
message: Human-readable error description.
19-
code: The gRPC status code that caused this error, if any.
20-
"""
18+
def __init__(
19+
self,
20+
message: str,
21+
code: grpc.StatusCode | None = None,
22+
*,
23+
trailing_metadata: grpc.aio.Metadata | None = None,
24+
retry_after: datetime.timedelta | None = None,
25+
) -> None:
2126
super().__init__(message)
2227
self.code = code
28+
self.trailing_metadata = trailing_metadata
29+
self.retry_after = retry_after
2330

2431

2532
class NotFoundError(DecreeError):
@@ -58,6 +65,22 @@ class TypeMismatchError(DecreeError):
5865
"""Raised when a typed getter receives a value of the wrong type."""
5966

6067

68+
class TimeoutError(DecreeError):
69+
"""Raised when the operation deadline was exceeded."""
70+
71+
72+
class ResourceExhaustedError(DecreeError):
73+
"""Raised when a resource quota or rate limit is exceeded."""
74+
75+
76+
class CancelledError(DecreeError):
77+
"""Raised when the operation was cancelled."""
78+
79+
80+
class UnimplementedError(DecreeError):
81+
"""Raised when the server has not implemented the operation."""
82+
83+
6184
_STATUS_MAP: dict[grpc.StatusCode, type[DecreeError]] = {
6285
grpc.StatusCode.NOT_FOUND: NotFoundError,
6386
grpc.StatusCode.ALREADY_EXISTS: AlreadyExistsError,
@@ -67,9 +90,27 @@ class TypeMismatchError(DecreeError):
6790
grpc.StatusCode.PERMISSION_DENIED: PermissionDeniedError,
6891
grpc.StatusCode.UNAUTHENTICATED: PermissionDeniedError,
6992
grpc.StatusCode.UNAVAILABLE: UnavailableError,
93+
grpc.StatusCode.DEADLINE_EXCEEDED: TimeoutError,
94+
grpc.StatusCode.RESOURCE_EXHAUSTED: ResourceExhaustedError,
95+
grpc.StatusCode.CANCELLED: CancelledError,
96+
grpc.StatusCode.UNIMPLEMENTED: UnimplementedError,
7097
}
7198

7299

100+
def _parse_retry_after(metadata: grpc.aio.Metadata) -> datetime.timedelta | None:
101+
for key, value in metadata:
102+
if key != "grpc-status-details-bin" or not isinstance(value, bytes):
103+
continue
104+
rpc_status = status_pb2.Status()
105+
rpc_status.ParseFromString(value)
106+
for detail in rpc_status.details:
107+
retry_info = error_details_pb2.RetryInfo()
108+
if detail.Unpack(retry_info):
109+
d = retry_info.retry_delay
110+
return datetime.timedelta(seconds=d.seconds, microseconds=d.nanos // 1000)
111+
return None
112+
113+
73114
def map_grpc_error(err: grpc.RpcError) -> DecreeError:
74115
"""Convert a gRPC ``RpcError`` to a typed ``DecreeError``.
75116
@@ -78,5 +119,7 @@ def map_grpc_error(err: grpc.RpcError) -> DecreeError:
78119
"""
79120
code = err.code()
80121
details = err.details()
122+
trailing: grpc.aio.Metadata | None = getattr(err, "trailing_metadata", lambda: None)()
123+
retry_after = _parse_retry_after(trailing) if trailing else None
81124
exc_class = _STATUS_MAP.get(code, DecreeError)
82-
return exc_class(details or str(err), code)
125+
return exc_class(details or str(err), code, trailing_metadata=trailing, retry_after=retry_after)

sdk/tests/conftest.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@ class FakeRpcError(grpc.aio.AioRpcError):
1313
sync and async error handling.
1414
"""
1515

16-
def __init__(self, code: grpc.StatusCode, details: str = "test") -> None:
16+
def __init__(
17+
self,
18+
code: grpc.StatusCode,
19+
details: str = "test",
20+
trailing_metadata: grpc.aio.Metadata | None = None,
21+
) -> None:
1722
# AioRpcError.__init__ expects (code, initial_metadata, trailing_metadata,
1823
# details, debug_error_string).
19-
super().__init__(code, None, None, details, None) # type: ignore[arg-type]
24+
super().__init__(code, None, trailing_metadata, details, None) # type: ignore[arg-type]

sdk/tests/test_errors.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,39 @@
11
"""Tests for error mapping."""
22

3+
import datetime
4+
35
import grpc
6+
import grpc.aio
7+
from google.protobuf import any_pb2, duration_pb2
8+
from google.rpc import error_details_pb2, status_pb2
49

510
from opendecree.errors import (
611
AlreadyExistsError,
12+
CancelledError,
713
ChecksumMismatchError,
814
DecreeError,
915
LockedError,
1016
NotFoundError,
1117
PermissionDeniedError,
18+
ResourceExhaustedError,
19+
TimeoutError,
1220
UnavailableError,
21+
UnimplementedError,
1322
map_grpc_error,
1423
)
1524
from tests.conftest import FakeRpcError
1625

1726

27+
def _make_retry_metadata(seconds: int, nanos: int = 0) -> grpc.aio.Metadata:
28+
retry_info = error_details_pb2.RetryInfo(
29+
retry_delay=duration_pb2.Duration(seconds=seconds, nanos=nanos)
30+
)
31+
detail = any_pb2.Any()
32+
detail.Pack(retry_info)
33+
rpc_status = status_pb2.Status(details=[detail])
34+
return grpc.aio.Metadata(("grpc-status-details-bin", rpc_status.SerializeToString()))
35+
36+
1837
def test_not_found():
1938
err = map_grpc_error(FakeRpcError(grpc.StatusCode.NOT_FOUND, "gone"))
2039
assert isinstance(err, NotFoundError)
@@ -61,3 +80,55 @@ def test_unknown_code_falls_back():
6180
def test_empty_details():
6281
err = map_grpc_error(FakeRpcError(grpc.StatusCode.NOT_FOUND, ""))
6382
assert isinstance(err, NotFoundError)
83+
84+
85+
def test_deadline_exceeded_maps_to_timeout():
86+
err = map_grpc_error(FakeRpcError(grpc.StatusCode.DEADLINE_EXCEEDED))
87+
assert isinstance(err, TimeoutError)
88+
assert err.code == grpc.StatusCode.DEADLINE_EXCEEDED
89+
90+
91+
def test_resource_exhausted():
92+
err = map_grpc_error(FakeRpcError(grpc.StatusCode.RESOURCE_EXHAUSTED))
93+
assert isinstance(err, ResourceExhaustedError)
94+
95+
96+
def test_cancelled():
97+
err = map_grpc_error(FakeRpcError(grpc.StatusCode.CANCELLED))
98+
assert isinstance(err, CancelledError)
99+
100+
101+
def test_unimplemented():
102+
err = map_grpc_error(FakeRpcError(grpc.StatusCode.UNIMPLEMENTED))
103+
assert isinstance(err, UnimplementedError)
104+
105+
106+
def test_trailing_metadata_captured():
107+
meta = grpc.aio.Metadata(("x-custom", "value"))
108+
err = map_grpc_error(FakeRpcError(grpc.StatusCode.UNAVAILABLE, trailing_metadata=meta))
109+
assert err.trailing_metadata is not None
110+
assert dict(err.trailing_metadata).get("x-custom") == "value"
111+
112+
113+
def test_no_trailing_metadata():
114+
err = map_grpc_error(FakeRpcError(grpc.StatusCode.NOT_FOUND))
115+
assert err.trailing_metadata is None
116+
assert err.retry_after is None
117+
118+
119+
def test_retry_info_parsed():
120+
meta = _make_retry_metadata(seconds=5, nanos=500_000_000)
121+
err = map_grpc_error(FakeRpcError(grpc.StatusCode.RESOURCE_EXHAUSTED, trailing_metadata=meta))
122+
assert err.retry_after == datetime.timedelta(seconds=5, microseconds=500_000)
123+
124+
125+
def test_retry_info_nanos_precision():
126+
meta = _make_retry_metadata(seconds=0, nanos=250_000_000)
127+
err = map_grpc_error(FakeRpcError(grpc.StatusCode.DEADLINE_EXCEEDED, trailing_metadata=meta))
128+
assert err.retry_after == datetime.timedelta(microseconds=250_000)
129+
130+
131+
def test_no_retry_info_in_metadata():
132+
meta = grpc.aio.Metadata(("x-other", "val"))
133+
err = map_grpc_error(FakeRpcError(grpc.StatusCode.UNAVAILABLE, trailing_metadata=meta))
134+
assert err.retry_after is None

0 commit comments

Comments
 (0)