Skip to content

Commit 073a514

Browse files
authored
Accept custom interceptors in ConfigClient and AsyncConfigClient
- Adds `interceptors: list[Any] | None = None` to both `ConfigClient` and `AsyncConfigClient` for injecting logging, tracing, or metrics interceptors without monkeypatching - For `ConfigClient`, user interceptors are placed outermost in the `grpc.intercept_channel` chain (before the internal auth interceptor) - For `AsyncConfigClient`, interceptors are forwarded to the `grpc.aio` channel constructor Closes #68
1 parent 7f877d1 commit 073a514

5 files changed

Lines changed: 92 additions & 9 deletions

File tree

sdk/src/opendecree/_channel.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
from __future__ import annotations
44

5+
from typing import Any
6+
57
import grpc
68

79
_DEFAULT_KEEPALIVE_TIME_MS = 30000
@@ -99,6 +101,7 @@ def create_aio_channel(
99101
insecure: bool = True,
100102
credentials: grpc.ChannelCredentials | None = None,
101103
token: str | None = None,
104+
interceptors: list[Any] | None = None,
102105
max_send_message_length: int | None = None,
103106
max_recv_message_length: int | None = None,
104107
keepalive_time_ms: int = _DEFAULT_KEEPALIVE_TIME_MS,
@@ -127,6 +130,7 @@ def create_aio_channel(
127130
reconnect_backoff_initial_ms,
128131
reconnect_backoff_max_ms,
129132
)
133+
aio_interceptors = tuple(interceptors) if interceptors else ()
130134

131135
channel_creds: grpc.ChannelCredentials | None = credentials
132136
if channel_creds is None and not insecure:
@@ -137,6 +141,8 @@ def create_aio_channel(
137141
channel_creds = grpc.composite_channel_credentials(
138142
channel_creds, _token_call_credentials(token)
139143
)
140-
return grpc.aio.secure_channel(target, channel_creds, options=options)
144+
return grpc.aio.secure_channel(
145+
target, channel_creds, options=options, interceptors=aio_interceptors
146+
)
141147

142-
return grpc.aio.insecure_channel(target, options=options)
148+
return grpc.aio.insecure_channel(target, options=options, interceptors=aio_interceptors)

sdk/src/opendecree/async_client.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import warnings
1111
from datetime import timedelta
12-
from typing import TYPE_CHECKING, overload
12+
from typing import TYPE_CHECKING, Any, overload
1313

1414
if TYPE_CHECKING:
1515
from opendecree.async_watcher import AsyncConfigWatcher
@@ -54,6 +54,7 @@ def __init__(
5454
timeout: float = 10.0,
5555
retry: RetryConfig | None = None,
5656
check_version: bool = False,
57+
interceptors: list[Any] | None = None,
5758
) -> None:
5859
"""Create a new AsyncConfigClient.
5960
@@ -76,6 +77,9 @@ def __init__(
7677
check_version: When True, run :meth:`check_compatibility` lazily
7778
on the first RPC call. Raises :exc:`IncompatibleServerError`
7879
if the server version is outside the supported range.
80+
interceptors: Optional list of :class:`grpc.aio.ClientInterceptor`
81+
instances to inject (e.g., for logging, tracing, or metrics).
82+
Passed directly to the ``grpc.aio`` channel.
7983
"""
8084
self._timeout = timeout
8185
self._retry = retry if retry is not None else RetryConfig()
@@ -102,7 +106,11 @@ def __init__(
102106
subject=subject, role=role, tenant_id=tenant_id, token=metadata_token
103107
)
104108
self._channel = create_aio_channel(
105-
target, insecure=insecure, credentials=credentials, token=channel_token
109+
target,
110+
insecure=insecure,
111+
credentials=credentials,
112+
token=channel_token,
113+
interceptors=interceptors,
106114
)
107115

108116
cs_pb2, cs_grpc = ensure_stubs()

sdk/src/opendecree/client.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
import warnings
1515
from datetime import timedelta
16-
from typing import TYPE_CHECKING, overload
16+
from typing import TYPE_CHECKING, Any, overload
1717

1818
if TYPE_CHECKING:
1919
from opendecree.watcher import ConfigWatcher
@@ -57,6 +57,7 @@ def __init__(
5757
timeout: float = 10.0,
5858
retry: RetryConfig | None = None,
5959
check_version: bool = False,
60+
interceptors: list[Any] | None = None,
6061
) -> None:
6162
"""Create a new ConfigClient.
6263
@@ -79,6 +80,12 @@ def __init__(
7980
check_version: When True, run :meth:`check_compatibility` lazily
8081
on the first RPC call. Raises :exc:`IncompatibleServerError`
8182
if the server version is outside the supported range.
83+
interceptors: Optional list of
84+
:class:`grpc.UnaryUnaryClientInterceptor` /
85+
:class:`grpc.UnaryStreamClientInterceptor` instances to inject
86+
(e.g., for logging, tracing, or metrics). User-supplied
87+
interceptors are applied outermost (before the SDK's internal
88+
auth interceptor).
8289
"""
8390
self._timeout = timeout
8491
self._retry = retry if retry is not None else RetryConfig()
@@ -102,15 +109,16 @@ def __init__(
102109
metadata = _build_metadata(
103110
subject=subject, role=role, tenant_id=tenant_id, token=metadata_token
104111
)
105-
interceptors: list[grpc.UnaryUnaryClientInterceptor] = []
112+
# User interceptors are outermost; auth interceptor runs inside them.
113+
all_interceptors: list[Any] = list(interceptors) if interceptors else []
106114
if metadata:
107-
interceptors.append(AuthInterceptor(metadata))
115+
all_interceptors.append(AuthInterceptor(metadata))
108116

109117
channel = create_channel(
110118
target, insecure=insecure, credentials=credentials, token=channel_token
111119
)
112-
if interceptors:
113-
self._channel = grpc.intercept_channel(channel, *interceptors)
120+
if all_interceptors:
121+
self._channel = grpc.intercept_channel(channel, *all_interceptors)
114122
else:
115123
self._channel = channel
116124
self._raw_channel = channel # keep ref for close()

sdk/tests/test_async_client.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,3 +296,24 @@ async def _empty_stream():
296296
ctx = client.watch("t1")
297297
async with ctx as watcher:
298298
assert watcher is not None
299+
300+
def test_custom_interceptors_passed_to_channel(self):
301+
custom = MagicMock()
302+
with patch("opendecree.async_client.create_aio_channel") as mock_ch:
303+
mock_ch.return_value = MagicMock()
304+
AsyncConfigClient("localhost:9090", interceptors=[custom])
305+
assert mock_ch.call_args.kwargs["interceptors"] == [custom]
306+
307+
def test_no_interceptors_by_default(self):
308+
with patch("opendecree.async_client.create_aio_channel") as mock_ch:
309+
mock_ch.return_value = MagicMock()
310+
AsyncConfigClient("localhost:9090")
311+
assert mock_ch.call_args.kwargs.get("interceptors") is None
312+
313+
def test_multiple_custom_interceptors_preserved(self):
314+
a = MagicMock()
315+
b = MagicMock()
316+
with patch("opendecree.async_client.create_aio_channel") as mock_ch:
317+
mock_ch.return_value = MagicMock()
318+
AsyncConfigClient("localhost:9090", interceptors=[a, b])
319+
assert mock_ch.call_args.kwargs["interceptors"] == [a, b]

sdk/tests/test_client.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,3 +338,43 @@ def test_watch_returns_context(self):
338338
assert ctx is not None
339339
with ctx as watcher:
340340
assert watcher is not None
341+
342+
def test_custom_interceptors_passed_to_intercept_channel(self):
343+
custom = MagicMock(spec=grpc.UnaryUnaryClientInterceptor)
344+
with patch("opendecree.client.create_channel") as mock_ch:
345+
mock_channel = MagicMock()
346+
mock_ch.return_value = mock_channel
347+
with patch("opendecree.client.grpc.intercept_channel") as mock_intercept:
348+
mock_intercept.return_value = mock_channel
349+
opendecree.ConfigClient("localhost:9090", interceptors=[custom])
350+
args = mock_intercept.call_args[0]
351+
assert custom in args
352+
353+
def test_custom_interceptors_outermost(self):
354+
"""User interceptors must come before AuthInterceptor."""
355+
from opendecree._interceptors import AuthInterceptor
356+
357+
custom = MagicMock(spec=grpc.UnaryUnaryClientInterceptor)
358+
with patch("opendecree.client.create_channel") as mock_ch:
359+
mock_channel = MagicMock()
360+
mock_ch.return_value = mock_channel
361+
with patch("opendecree.client.grpc.intercept_channel") as mock_intercept:
362+
mock_intercept.return_value = mock_channel
363+
opendecree.ConfigClient("localhost:9090", subject="s", interceptors=[custom])
364+
args = mock_intercept.call_args[0]
365+
# args[0] is the channel; args[1:] are interceptors in order
366+
interceptors_in_order = args[1:]
367+
assert interceptors_in_order[0] is custom
368+
assert isinstance(interceptors_in_order[1], AuthInterceptor)
369+
370+
def test_custom_interceptors_no_auth(self):
371+
"""Custom interceptors work even when no auth metadata is set."""
372+
custom = MagicMock(spec=grpc.UnaryUnaryClientInterceptor)
373+
with patch("opendecree.client.create_channel") as mock_ch:
374+
mock_channel = MagicMock()
375+
mock_ch.return_value = mock_channel
376+
with patch("opendecree.client.grpc.intercept_channel") as mock_intercept:
377+
mock_intercept.return_value = mock_channel
378+
opendecree.ConfigClient("localhost:9090", role="", interceptors=[custom])
379+
args = mock_intercept.call_args[0]
380+
assert args[1] is custom

0 commit comments

Comments
 (0)