Skip to content

Commit cf1d1f4

Browse files
zeevdrclaude
andcommitted
feat(channel): expose message-size and keepalive options
Add optional max_send_message_length and max_recv_message_length parameters to create_channel and create_aio_channel. When set, the corresponding grpc.max_send_message_length / grpc.max_receive_message_length options are appended; when omitted the gRPC default (4 MB) applies unchanged. Keepalive tuning constants (time, timeout, permit_without_calls, reconnect backoffs) are also promoted from hard-coded internals to keyword arguments with the same defaults, so callers can tune them without forking the factory. Closes #67 Co-Authored-By: Claude <noreply@anthropic.com>
1 parent f3d3e23 commit cf1d1f4

2 files changed

Lines changed: 134 additions & 12 deletions

File tree

sdk/src/opendecree/_channel.py

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,34 @@
44

55
import grpc
66

7-
# Default channel options for keepalive and reconnection.
8-
_DEFAULT_OPTIONS: list[tuple[str, int]] = [
9-
("grpc.keepalive_time_ms", 30000),
10-
("grpc.keepalive_timeout_ms", 10000),
11-
("grpc.keepalive_permit_without_calls", 1),
12-
("grpc.initial_reconnect_backoff_ms", 1000),
13-
("grpc.max_reconnect_backoff_ms", 30000),
14-
]
7+
_DEFAULT_KEEPALIVE_TIME_MS = 30000
8+
_DEFAULT_KEEPALIVE_TIMEOUT_MS = 10000
9+
_DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS = 1
10+
_DEFAULT_RECONNECT_BACKOFF_INITIAL_MS = 1000
11+
_DEFAULT_RECONNECT_BACKOFF_MAX_MS = 30000
12+
13+
14+
def _build_options(
15+
max_send_message_length: int | None,
16+
max_recv_message_length: int | None,
17+
keepalive_time_ms: int,
18+
keepalive_timeout_ms: int,
19+
keepalive_permit_without_calls: int,
20+
reconnect_backoff_initial_ms: int,
21+
reconnect_backoff_max_ms: int,
22+
) -> list[tuple[str, int]]:
23+
opts: list[tuple[str, int]] = [
24+
("grpc.keepalive_time_ms", keepalive_time_ms),
25+
("grpc.keepalive_timeout_ms", keepalive_timeout_ms),
26+
("grpc.keepalive_permit_without_calls", keepalive_permit_without_calls),
27+
("grpc.initial_reconnect_backoff_ms", reconnect_backoff_initial_ms),
28+
("grpc.max_reconnect_backoff_ms", reconnect_backoff_max_ms),
29+
]
30+
if max_send_message_length is not None:
31+
opts.append(("grpc.max_send_message_length", max_send_message_length))
32+
if max_recv_message_length is not None:
33+
opts.append(("grpc.max_receive_message_length", max_recv_message_length))
34+
return opts
1535

1636

1737
def _token_call_credentials(token: str) -> grpc.CallCredentials:
@@ -30,6 +50,13 @@ def create_channel(
3050
insecure: bool = True,
3151
credentials: grpc.ChannelCredentials | None = None,
3252
token: str | None = None,
53+
max_send_message_length: int | None = None,
54+
max_recv_message_length: int | None = None,
55+
keepalive_time_ms: int = _DEFAULT_KEEPALIVE_TIME_MS,
56+
keepalive_timeout_ms: int = _DEFAULT_KEEPALIVE_TIMEOUT_MS,
57+
keepalive_permit_without_calls: int = _DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS,
58+
reconnect_backoff_initial_ms: int = _DEFAULT_RECONNECT_BACKOFF_INITIAL_MS,
59+
reconnect_backoff_max_ms: int = _DEFAULT_RECONNECT_BACKOFF_MAX_MS,
3360
) -> grpc.Channel:
3461
"""Create a gRPC channel with sensible defaults.
3562
@@ -38,7 +65,20 @@ def create_channel(
3865
``composite_channel_credentials`` so it is protected by the TLS layer.
3966
On an insecure channel the token is sent as a raw header — callers should
4067
warn the user before allowing this.
68+
69+
Pass *max_send_message_length* or *max_recv_message_length* (bytes) to
70+
override gRPC's 4 MB default, which can be too small for large JSON values.
4171
"""
72+
options = _build_options(
73+
max_send_message_length,
74+
max_recv_message_length,
75+
keepalive_time_ms,
76+
keepalive_timeout_ms,
77+
keepalive_permit_without_calls,
78+
reconnect_backoff_initial_ms,
79+
reconnect_backoff_max_ms,
80+
)
81+
4282
channel_creds: grpc.ChannelCredentials | None = credentials
4383
if channel_creds is None and not insecure:
4484
channel_creds = grpc.ssl_channel_credentials()
@@ -48,9 +88,9 @@ def create_channel(
4888
channel_creds = grpc.composite_channel_credentials(
4989
channel_creds, _token_call_credentials(token)
5090
)
51-
return grpc.secure_channel(target, channel_creds, options=_DEFAULT_OPTIONS)
91+
return grpc.secure_channel(target, channel_creds, options=options)
5292

53-
return grpc.insecure_channel(target, options=_DEFAULT_OPTIONS)
93+
return grpc.insecure_channel(target, options=options)
5494

5595

5696
def create_aio_channel(
@@ -59,6 +99,13 @@ def create_aio_channel(
5999
insecure: bool = True,
60100
credentials: grpc.ChannelCredentials | None = None,
61101
token: str | None = None,
102+
max_send_message_length: int | None = None,
103+
max_recv_message_length: int | None = None,
104+
keepalive_time_ms: int = _DEFAULT_KEEPALIVE_TIME_MS,
105+
keepalive_timeout_ms: int = _DEFAULT_KEEPALIVE_TIMEOUT_MS,
106+
keepalive_permit_without_calls: int = _DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS,
107+
reconnect_backoff_initial_ms: int = _DEFAULT_RECONNECT_BACKOFF_INITIAL_MS,
108+
reconnect_backoff_max_ms: int = _DEFAULT_RECONNECT_BACKOFF_MAX_MS,
62109
) -> grpc.aio.Channel:
63110
"""Create an async gRPC channel with sensible defaults.
64111
@@ -67,7 +114,20 @@ def create_aio_channel(
67114
``composite_channel_credentials`` so it is protected by the TLS layer.
68115
On an insecure channel the token is sent as a raw header — callers should
69116
warn the user before allowing this.
117+
118+
Pass *max_send_message_length* or *max_recv_message_length* (bytes) to
119+
override gRPC's 4 MB default, which can be too small for large JSON values.
70120
"""
121+
options = _build_options(
122+
max_send_message_length,
123+
max_recv_message_length,
124+
keepalive_time_ms,
125+
keepalive_timeout_ms,
126+
keepalive_permit_without_calls,
127+
reconnect_backoff_initial_ms,
128+
reconnect_backoff_max_ms,
129+
)
130+
71131
channel_creds: grpc.ChannelCredentials | None = credentials
72132
if channel_creds is None and not insecure:
73133
channel_creds = grpc.ssl_channel_credentials()
@@ -77,6 +137,6 @@ def create_aio_channel(
77137
channel_creds = grpc.composite_channel_credentials(
78138
channel_creds, _token_call_credentials(token)
79139
)
80-
return grpc.aio.secure_channel(target, channel_creds, options=_DEFAULT_OPTIONS)
140+
return grpc.aio.secure_channel(target, channel_creds, options=options)
81141

82-
return grpc.aio.insecure_channel(target, options=_DEFAULT_OPTIONS)
142+
return grpc.aio.insecure_channel(target, options=options)

sdk/tests/test_channel.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,37 @@ def test_insecure_with_token_does_not_use_composite(self):
6363
mock_insecure.assert_called_once()
6464
mock_comp.assert_not_called()
6565

66+
def test_message_size_options_included_when_set(self):
67+
with patch("opendecree._channel.grpc.insecure_channel") as mock:
68+
mock.return_value = MagicMock()
69+
create_channel(
70+
"localhost:9090",
71+
max_send_message_length=16 * 1024 * 1024,
72+
max_recv_message_length=32 * 1024 * 1024,
73+
)
74+
_, kwargs = mock.call_args
75+
opts = dict(kwargs["options"])
76+
assert opts["grpc.max_send_message_length"] == 16 * 1024 * 1024
77+
assert opts["grpc.max_receive_message_length"] == 32 * 1024 * 1024
78+
79+
def test_message_size_options_absent_by_default(self):
80+
with patch("opendecree._channel.grpc.insecure_channel") as mock:
81+
mock.return_value = MagicMock()
82+
create_channel("localhost:9090")
83+
_, kwargs = mock.call_args
84+
keys = [k for k, _ in kwargs["options"]]
85+
assert "grpc.max_send_message_length" not in keys
86+
assert "grpc.max_receive_message_length" not in keys
87+
88+
def test_keepalive_override(self):
89+
with patch("opendecree._channel.grpc.insecure_channel") as mock:
90+
mock.return_value = MagicMock()
91+
create_channel("localhost:9090", keepalive_time_ms=60000, keepalive_timeout_ms=5000)
92+
_, kwargs = mock.call_args
93+
opts = dict(kwargs["options"])
94+
assert opts["grpc.keepalive_time_ms"] == 60000
95+
assert opts["grpc.keepalive_timeout_ms"] == 5000
96+
6697

6798
class TestCreateAioChannel:
6899
def test_insecure(self):
@@ -119,3 +150,34 @@ def test_insecure_with_token_does_not_use_composite(self):
119150
create_aio_channel("localhost:9090", insecure=True, token="tok")
120151
mock_insecure.assert_called_once()
121152
mock_comp.assert_not_called()
153+
154+
def test_message_size_options_included_when_set(self):
155+
with patch("opendecree._channel.grpc.aio.insecure_channel") as mock:
156+
mock.return_value = MagicMock()
157+
create_aio_channel(
158+
"localhost:9090",
159+
max_send_message_length=16 * 1024 * 1024,
160+
max_recv_message_length=32 * 1024 * 1024,
161+
)
162+
_, kwargs = mock.call_args
163+
opts = dict(kwargs["options"])
164+
assert opts["grpc.max_send_message_length"] == 16 * 1024 * 1024
165+
assert opts["grpc.max_receive_message_length"] == 32 * 1024 * 1024
166+
167+
def test_message_size_options_absent_by_default(self):
168+
with patch("opendecree._channel.grpc.aio.insecure_channel") as mock:
169+
mock.return_value = MagicMock()
170+
create_aio_channel("localhost:9090")
171+
_, kwargs = mock.call_args
172+
keys = [k for k, _ in kwargs["options"]]
173+
assert "grpc.max_send_message_length" not in keys
174+
assert "grpc.max_receive_message_length" not in keys
175+
176+
def test_keepalive_override(self):
177+
with patch("opendecree._channel.grpc.aio.insecure_channel") as mock:
178+
mock.return_value = MagicMock()
179+
create_aio_channel("localhost:9090", keepalive_time_ms=60000, keepalive_timeout_ms=5000)
180+
_, kwargs = mock.call_args
181+
opts = dict(kwargs["options"])
182+
assert opts["grpc.keepalive_time_ms"] == 60000
183+
assert opts["grpc.keepalive_timeout_ms"] == 5000

0 commit comments

Comments
 (0)