Skip to content

Commit 81257eb

Browse files
tezizzmCasperGN
andauthored
fix(ext-workflow): honor DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES on workflow channels (dapr#1085)
* fix(ext-workflow): honor DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES on workflow channels Workflow activity payloads over gRPC's 4 MiB default raised RESOURCE_EXHAUSTED because the durabletask channel ignored the message size limit. dapr#1024 plumbed DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES through the Dapr API channel only; the workflow worker and clients still fell back to the gRPC default. Add a get_grpc_channel_options helper that resolves the limit from an explicit max_grpc_message_length kwarg, then the env var, then None, and thread it through WorkflowRuntime, DaprWorkflowClient, and the async DaprWorkflowClient. Unlike dapr#1024, set both send and receive limits symmetrically since workflow payloads cross the channel in both directions. Reuses the existing setting; no new env var. Signed-off-by: Martez Killens <5050479+tezizzm@users.noreply.github.com> * Update ext/dapr-ext-workflow/dapr/ext/workflow/util.py Co-authored-by: Casper Nielsen <whopsec@protonmail.com> Signed-off-by: Martez Killens <tezizzm@users.noreply.github.com> * docs(ext-workflow): clarify max_grpc_message_length precedence Make explicit that a `0` value in either the kwarg or the DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES env var is treated as "no opinion" and falls through to the next source, consistent with the documented `global_settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES = 0` sentinel. Behavior is unchanged; this addresses PR review feedback asking for the precedence rule to be stated precisely. Signed-off-by: Martez Killens <tezizzm@users.noreply.github.com> --------- Signed-off-by: Martez Killens <5050479+tezizzm@users.noreply.github.com> Signed-off-by: Martez Killens <tezizzm@users.noreply.github.com> Co-authored-by: Martez Killens <5050479+tezizzm@users.noreply.github.com> Co-authored-by: Casper Nielsen <whopsec@protonmail.com>
1 parent c3b6bd7 commit 81257eb

8 files changed

Lines changed: 318 additions & 5 deletions

File tree

dapr/ext/workflow/aio/dapr_workflow_client.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from dapr.ext.workflow._durabletask import client
2929
from dapr.ext.workflow._durabletask.aio import client as aioclient
3030
from dapr.ext.workflow.logger import Logger, LoggerOptions
31-
from dapr.ext.workflow.util import getAddress
31+
from dapr.ext.workflow.util import get_grpc_channel_options, getAddress
3232
from dapr.ext.workflow.workflow_context import Workflow
3333
from dapr.ext.workflow.workflow_state import WorkflowState
3434

@@ -49,7 +49,22 @@ def __init__(
4949
host: Optional[str] = None,
5050
port: Optional[str] = None,
5151
logger_options: Optional[LoggerOptions] = None,
52+
max_grpc_message_length: Optional[int] = None,
5253
):
54+
"""Initializes the async workflow client.
55+
56+
Args:
57+
host: Dapr sidecar gRPC hostname. Defaults to
58+
``settings.DAPR_RUNTIME_HOST`` (or ``DAPR_GRPC_ENDPOINT`` when set).
59+
port: Dapr sidecar gRPC port. Defaults to ``settings.DAPR_GRPC_PORT``.
60+
logger_options: Configuration for the client's internal logger.
61+
max_grpc_message_length: Maximum gRPC message size in bytes for the
62+
workflow channel, applied symmetrically to both send and receive
63+
directions. Precedence: this kwarg (if non-zero), then the
64+
``DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` env var (if non-zero),
65+
then the gRPC default (4 MiB). ``0`` in either source means
66+
"no opinion" and falls through to the next source.
67+
"""
5368
address = getAddress(host, port)
5469

5570
try:
@@ -63,13 +78,15 @@ def __init__(
6378
if settings.DAPR_API_TOKEN:
6479
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
6580
options = self._logger.get_options()
81+
channel_options = get_grpc_channel_options(max_grpc_message_length)
6682
self.__obj = aioclient.AsyncTaskHubGrpcClient(
6783
host_address=uri.endpoint,
6884
metadata=metadata,
6985
secure_channel=uri.tls,
7086
log_handler=options.log_handler,
7187
log_formatter=options.log_formatter,
7288
interceptors=[DaprClientTimeoutInterceptorAsync()],
89+
channel_options=channel_options,
7390
)
7491

7592
async def schedule_new_workflow(

dapr/ext/workflow/dapr_workflow_client.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from dapr.conf.helpers import GrpcEndpoint
2828
from dapr.ext.workflow._durabletask import client
2929
from dapr.ext.workflow.logger import Logger, LoggerOptions
30-
from dapr.ext.workflow.util import getAddress
30+
from dapr.ext.workflow.util import get_grpc_channel_options, getAddress
3131
from dapr.ext.workflow.workflow_context import Workflow
3232
from dapr.ext.workflow.workflow_state import WorkflowState
3333

@@ -51,7 +51,22 @@ def __init__(
5151
host: Optional[str] = None,
5252
port: Optional[str] = None,
5353
logger_options: Optional[LoggerOptions] = None,
54+
max_grpc_message_length: Optional[int] = None,
5455
):
56+
"""Initializes the sync workflow client.
57+
58+
Args:
59+
host: Dapr sidecar gRPC hostname. Defaults to
60+
``settings.DAPR_RUNTIME_HOST`` (or ``DAPR_GRPC_ENDPOINT`` when set).
61+
port: Dapr sidecar gRPC port. Defaults to ``settings.DAPR_GRPC_PORT``.
62+
logger_options: Configuration for the client's internal logger.
63+
max_grpc_message_length: Maximum gRPC message size in bytes for the
64+
workflow channel, applied symmetrically to both send and receive
65+
directions. Precedence: this kwarg (if non-zero), then the
66+
``DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` env var (if non-zero),
67+
then the gRPC default (4 MiB). ``0`` in either source means
68+
"no opinion" and falls through to the next source.
69+
"""
5570
address = getAddress(host, port)
5671

5772
try:
@@ -65,13 +80,15 @@ def __init__(
6580
if settings.DAPR_API_TOKEN:
6681
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
6782
options = self._logger.get_options()
83+
channel_options = get_grpc_channel_options(max_grpc_message_length)
6884
self.__obj = client.TaskHubGrpcClient(
6985
host_address=uri.endpoint,
7086
metadata=metadata,
7187
secure_channel=uri.tls,
7288
log_handler=options.log_handler,
7389
log_formatter=options.log_formatter,
7490
interceptors=[DaprClientTimeoutInterceptor()],
91+
channel_options=channel_options,
7592
)
7693

7794
def schedule_new_workflow(

dapr/ext/workflow/util.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,46 @@
1313
limitations under the License.
1414
"""
1515

16-
from typing import Optional
16+
from typing import Any, Optional, Sequence
1717

1818
from dapr.conf import settings
1919

2020

21+
def get_grpc_channel_options(
22+
max_grpc_message_length: Optional[int] = None,
23+
) -> Optional[Sequence[tuple[str, Any]]]:
24+
"""Resolves gRPC channel options for the workflow message-size limit.
25+
26+
Precedence: explicit ``max_grpc_message_length`` kwarg (if non-zero),
27+
else ``settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` (if non-zero),
28+
else the gRPC default. A ``0`` in either source is interpreted as
29+
"no opinion / use default" and falls through to the next source — this
30+
matches ``global_settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES = 0``
31+
being the documented "unset" sentinel.
32+
33+
Sets BOTH send and receive limits symmetrically because workflow activity
34+
payloads cross the channel in both directions. Returns ``None`` when no
35+
explicit limit is configured so callers leave the channel unconfigured and
36+
the gRPC default applies.
37+
38+
Args:
39+
max_grpc_message_length: Explicit max gRPC message size in bytes.
40+
``0`` or ``None`` means "no opinion" and falls through to the
41+
env var.
42+
43+
Returns:
44+
A sequence of ``(option, value)`` tuples setting both send and receive
45+
limits, or ``None`` when no limit is configured.
46+
"""
47+
size = max_grpc_message_length or settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES
48+
if not size:
49+
return None
50+
return [
51+
('grpc.max_send_message_length', size),
52+
('grpc.max_receive_message_length', size),
53+
]
54+
55+
2156
def getAddress(host: Optional[str] = None, port: Optional[str] = None) -> str:
2257
if not host and not port:
2358
address = settings.DAPR_GRPC_ENDPOINT or (

dapr/ext/workflow/workflow_runtime.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from dapr.ext.workflow._durabletask.internal.shared import is_async_callable as _is_async_callable
3030
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
3131
from dapr.ext.workflow.logger import Logger, LoggerOptions
32-
from dapr.ext.workflow.util import getAddress
32+
from dapr.ext.workflow.util import get_grpc_channel_options, getAddress
3333
from dapr.ext.workflow.workflow_activity_context import Activity, WorkflowActivityContext
3434
from dapr.ext.workflow.workflow_context import Workflow
3535

@@ -118,7 +118,35 @@ def __init__(
118118
maximum_concurrent_orchestration_work_items: Optional[int] = None,
119119
maximum_thread_pool_workers: Optional[int] = None,
120120
worker_ready_timeout: Optional[float] = None,
121+
max_grpc_message_length: Optional[int] = None,
121122
):
123+
"""Initializes the workflow runtime.
124+
125+
Args:
126+
host: Dapr sidecar gRPC hostname. Defaults to
127+
``settings.DAPR_RUNTIME_HOST`` (or ``DAPR_GRPC_ENDPOINT`` when set).
128+
port: Dapr sidecar gRPC port. Defaults to ``settings.DAPR_GRPC_PORT``.
129+
logger_options: Configuration for the runtime's internal logger.
130+
interceptors: Additional client gRPC interceptors. The built-in
131+
``DaprClientTimeoutInterceptor`` is always appended after these.
132+
maximum_concurrent_activity_work_items: Maximum number of activity
133+
work items the worker dispatches concurrently. ``None`` lets the
134+
durabletask worker pick a default.
135+
maximum_concurrent_orchestration_work_items: Maximum number of
136+
orchestration work items the worker dispatches concurrently.
137+
``None`` lets the durabletask worker pick a default.
138+
maximum_thread_pool_workers: Size of the worker's thread pool for
139+
executing sync activities. ``None`` lets the durabletask worker
140+
pick a default.
141+
worker_ready_timeout: Seconds to wait in :meth:`start` for the
142+
worker's gRPC stream to be ready. Defaults to 30s.
143+
max_grpc_message_length: Maximum gRPC message size in bytes for the
144+
workflow channel, applied symmetrically to both send and receive
145+
directions. Precedence: this kwarg (if non-zero), then the
146+
``DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` env var (if non-zero),
147+
then the gRPC default (4 MiB). ``0`` in either source means
148+
"no opinion" and falls through to the next source.
149+
"""
122150
self._logger = Logger('WorkflowRuntime', logger_options)
123151
self._worker_ready_timeout = 30.0 if worker_ready_timeout is None else worker_ready_timeout
124152

@@ -137,13 +165,15 @@ def __init__(
137165
if interceptors:
138166
all_interceptors.extend(interceptors)
139167
all_interceptors.append(DaprClientTimeoutInterceptor())
168+
channel_options = get_grpc_channel_options(max_grpc_message_length)
140169
self.__worker = worker.TaskHubGrpcWorker(
141170
host_address=uri.endpoint,
142171
metadata=metadata,
143172
secure_channel=uri.tls,
144173
log_handler=options.log_handler,
145174
log_formatter=options.log_formatter,
146175
interceptors=all_interceptors,
176+
channel_options=channel_options,
147177
concurrency_options=worker.ConcurrencyOptions(
148178
maximum_concurrent_activity_work_items=maximum_concurrent_activity_work_items,
149179
maximum_concurrent_orchestration_work_items=maximum_concurrent_orchestration_work_items,

tests/ext/workflow/test_workflow_client.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from grpc import RpcError
2222

23+
from dapr.conf import settings
2324
from dapr.ext.workflow._durabletask import client
2425
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
2526
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
@@ -127,6 +128,62 @@ def test_timeout_interceptor_is_passed_to_client(self):
127128
self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptor)
128129

129130

131+
class WorkflowClientChannelOptionsTest(unittest.TestCase):
132+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0)
133+
def test_explicit_kwarg_sets_symmetric_channel_options(self):
134+
with mock.patch(
135+
'dapr.ext.workflow._durabletask.client.TaskHubGrpcClient'
136+
) as mock_client_cls:
137+
DaprWorkflowClient(max_grpc_message_length=8 * 1024 * 1024)
138+
channel_options = mock_client_cls.call_args[1]['channel_options']
139+
self.assertEqual(
140+
[
141+
('grpc.max_send_message_length', 8 * 1024 * 1024),
142+
('grpc.max_receive_message_length', 8 * 1024 * 1024),
143+
],
144+
channel_options,
145+
)
146+
147+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024)
148+
def test_env_var_sets_symmetric_channel_options(self):
149+
with mock.patch(
150+
'dapr.ext.workflow._durabletask.client.TaskHubGrpcClient'
151+
) as mock_client_cls:
152+
DaprWorkflowClient()
153+
channel_options = mock_client_cls.call_args[1]['channel_options']
154+
self.assertEqual(
155+
[
156+
('grpc.max_send_message_length', 16 * 1024 * 1024),
157+
('grpc.max_receive_message_length', 16 * 1024 * 1024),
158+
],
159+
channel_options,
160+
)
161+
162+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024)
163+
def test_kwarg_takes_precedence_over_env_var(self):
164+
with mock.patch(
165+
'dapr.ext.workflow._durabletask.client.TaskHubGrpcClient'
166+
) as mock_client_cls:
167+
DaprWorkflowClient(max_grpc_message_length=8 * 1024 * 1024)
168+
channel_options = mock_client_cls.call_args[1]['channel_options']
169+
self.assertEqual(
170+
[
171+
('grpc.max_send_message_length', 8 * 1024 * 1024),
172+
('grpc.max_receive_message_length', 8 * 1024 * 1024),
173+
],
174+
channel_options,
175+
)
176+
177+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0)
178+
def test_neither_set_passes_none(self):
179+
with mock.patch(
180+
'dapr.ext.workflow._durabletask.client.TaskHubGrpcClient'
181+
) as mock_client_cls:
182+
DaprWorkflowClient()
183+
channel_options = mock_client_cls.call_args[1]['channel_options']
184+
self.assertIsNone(channel_options)
185+
186+
130187
class WorkflowClientTest(unittest.TestCase):
131188
def mock_client_wf(ctx: DaprWorkflowContext, input):
132189
print(f'{input}')

tests/ext/workflow/test_workflow_client_aio.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from grpc.aio import AioRpcError
2222

23+
from dapr.conf import settings
2324
from dapr.ext.workflow._durabletask import client
2425
from dapr.ext.workflow.aio import DaprWorkflowClient
2526
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
@@ -128,6 +129,62 @@ async def test_timeout_interceptor_is_passed_to_client(self):
128129
self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptorAsync)
129130

130131

132+
class WorkflowClientAioChannelOptionsTest(unittest.TestCase):
133+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0)
134+
def test_explicit_kwarg_sets_symmetric_channel_options(self):
135+
with mock.patch(
136+
'dapr.ext.workflow._durabletask.aio.client.AsyncTaskHubGrpcClient'
137+
) as mock_client_cls:
138+
DaprWorkflowClient(max_grpc_message_length=8 * 1024 * 1024)
139+
channel_options = mock_client_cls.call_args[1]['channel_options']
140+
self.assertEqual(
141+
[
142+
('grpc.max_send_message_length', 8 * 1024 * 1024),
143+
('grpc.max_receive_message_length', 8 * 1024 * 1024),
144+
],
145+
channel_options,
146+
)
147+
148+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024)
149+
def test_env_var_sets_symmetric_channel_options(self):
150+
with mock.patch(
151+
'dapr.ext.workflow._durabletask.aio.client.AsyncTaskHubGrpcClient'
152+
) as mock_client_cls:
153+
DaprWorkflowClient()
154+
channel_options = mock_client_cls.call_args[1]['channel_options']
155+
self.assertEqual(
156+
[
157+
('grpc.max_send_message_length', 16 * 1024 * 1024),
158+
('grpc.max_receive_message_length', 16 * 1024 * 1024),
159+
],
160+
channel_options,
161+
)
162+
163+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024)
164+
def test_kwarg_takes_precedence_over_env_var(self):
165+
with mock.patch(
166+
'dapr.ext.workflow._durabletask.aio.client.AsyncTaskHubGrpcClient'
167+
) as mock_client_cls:
168+
DaprWorkflowClient(max_grpc_message_length=8 * 1024 * 1024)
169+
channel_options = mock_client_cls.call_args[1]['channel_options']
170+
self.assertEqual(
171+
[
172+
('grpc.max_send_message_length', 8 * 1024 * 1024),
173+
('grpc.max_receive_message_length', 8 * 1024 * 1024),
174+
],
175+
channel_options,
176+
)
177+
178+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0)
179+
def test_neither_set_passes_none(self):
180+
with mock.patch(
181+
'dapr.ext.workflow._durabletask.aio.client.AsyncTaskHubGrpcClient'
182+
) as mock_client_cls:
183+
DaprWorkflowClient()
184+
channel_options = mock_client_cls.call_args[1]['channel_options']
185+
self.assertIsNone(channel_options)
186+
187+
131188
class WorkflowClientAioTest(unittest.IsolatedAsyncioTestCase):
132189
def mock_client_wf(ctx: DaprWorkflowContext, input):
133190
print(f'{input}')

0 commit comments

Comments
 (0)