Skip to content

Commit bd73b7f

Browse files
committed
fix(ext-workflow): honor DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES on workflow channels
Workflow activity payloads over gRPC's 4 MiB default raised RESOURCE_EXHAUSTED because the durabletask channel ignored the message size limit. dapr#1024 plumbed DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES through the Dapr API channel only; the workflow worker and clients still fell back to the gRPC default. Add a get_grpc_channel_options helper that resolves the limit from an explicit max_grpc_message_length kwarg, then the env var, then None, and thread it through WorkflowRuntime, DaprWorkflowClient, and the async DaprWorkflowClient. Unlike dapr#1024, set both send and receive limits symmetrically since workflow payloads cross the channel in both directions. Reuses the existing setting; no new env var. Signed-off-by: Martez Killens <5050479+tezizzm@users.noreply.github.com>
1 parent c3b6bd7 commit bd73b7f

8 files changed

Lines changed: 284 additions & 5 deletions

File tree

dapr/ext/workflow/aio/dapr_workflow_client.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from dapr.ext.workflow._durabletask import client
2929
from dapr.ext.workflow._durabletask.aio import client as aioclient
3030
from dapr.ext.workflow.logger import Logger, LoggerOptions
31-
from dapr.ext.workflow.util import getAddress
31+
from dapr.ext.workflow.util import get_grpc_channel_options, getAddress
3232
from dapr.ext.workflow.workflow_context import Workflow
3333
from dapr.ext.workflow.workflow_state import WorkflowState
3434

@@ -49,7 +49,17 @@ def __init__(
4949
host: Optional[str] = None,
5050
port: Optional[str] = None,
5151
logger_options: Optional[LoggerOptions] = None,
52+
max_grpc_message_length: Optional[int] = None,
5253
):
54+
"""Initializes the async workflow client.
55+
56+
Args:
57+
max_grpc_message_length: Maximum gRPC message size in bytes for the
58+
workflow channel, applied symmetrically to both send and receive
59+
directions. Precedence: this kwarg, then the
60+
``DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` env var, then the
61+
gRPC default (4 MiB) when neither is set.
62+
"""
5363
address = getAddress(host, port)
5464

5565
try:
@@ -63,13 +73,15 @@ def __init__(
6373
if settings.DAPR_API_TOKEN:
6474
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
6575
options = self._logger.get_options()
76+
channel_options = get_grpc_channel_options(max_grpc_message_length)
6677
self.__obj = aioclient.AsyncTaskHubGrpcClient(
6778
host_address=uri.endpoint,
6879
metadata=metadata,
6980
secure_channel=uri.tls,
7081
log_handler=options.log_handler,
7182
log_formatter=options.log_formatter,
7283
interceptors=[DaprClientTimeoutInterceptorAsync()],
84+
channel_options=channel_options,
7385
)
7486

7587
async def schedule_new_workflow(

dapr/ext/workflow/dapr_workflow_client.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from dapr.conf.helpers import GrpcEndpoint
2828
from dapr.ext.workflow._durabletask import client
2929
from dapr.ext.workflow.logger import Logger, LoggerOptions
30-
from dapr.ext.workflow.util import getAddress
30+
from dapr.ext.workflow.util import get_grpc_channel_options, getAddress
3131
from dapr.ext.workflow.workflow_context import Workflow
3232
from dapr.ext.workflow.workflow_state import WorkflowState
3333

@@ -51,7 +51,17 @@ def __init__(
5151
host: Optional[str] = None,
5252
port: Optional[str] = None,
5353
logger_options: Optional[LoggerOptions] = None,
54+
max_grpc_message_length: Optional[int] = None,
5455
):
56+
"""Initializes the sync workflow client.
57+
58+
Args:
59+
max_grpc_message_length: Maximum gRPC message size in bytes for the
60+
workflow channel, applied symmetrically to both send and receive
61+
directions. Precedence: this kwarg, then the
62+
``DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` env var, then the
63+
gRPC default (4 MiB) when neither is set.
64+
"""
5565
address = getAddress(host, port)
5666

5767
try:
@@ -65,13 +75,15 @@ def __init__(
6575
if settings.DAPR_API_TOKEN:
6676
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
6777
options = self._logger.get_options()
78+
channel_options = get_grpc_channel_options(max_grpc_message_length)
6879
self.__obj = client.TaskHubGrpcClient(
6980
host_address=uri.endpoint,
7081
metadata=metadata,
7182
secure_channel=uri.tls,
7283
log_handler=options.log_handler,
7384
log_formatter=options.log_formatter,
7485
interceptors=[DaprClientTimeoutInterceptor()],
86+
channel_options=channel_options,
7587
)
7688

7789
def schedule_new_workflow(

dapr/ext/workflow/util.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,40 @@
1313
limitations under the License.
1414
"""
1515

16-
from typing import Optional
16+
from typing import Any, Optional, Sequence
1717

1818
from dapr.conf import settings
1919

2020

21+
def get_grpc_channel_options(
22+
max_grpc_message_length: Optional[int] = None,
23+
) -> Optional[Sequence[tuple[str, Any]]]:
24+
"""Resolves gRPC channel options for the workflow message-size limit.
25+
26+
Resolution order: the explicit ``max_grpc_message_length`` kwarg, then
27+
``settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES``, else ``None``.
28+
29+
Sets BOTH send and receive limits symmetrically because workflow activity
30+
payloads cross the channel in both directions. ``None`` is returned when no
31+
limit is configured, preserving the gRPC default behavior.
32+
33+
Args:
34+
max_grpc_message_length: Explicit max gRPC message size in bytes. Takes
35+
precedence over the env-var setting when truthy.
36+
37+
Returns:
38+
A sequence of ``(option, value)`` tuples setting both send and receive
39+
limits, or ``None`` when no limit is configured.
40+
"""
41+
size = max_grpc_message_length or settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES or None
42+
if size is None:
43+
return None
44+
return [
45+
('grpc.max_send_message_length', size),
46+
('grpc.max_receive_message_length', size),
47+
]
48+
49+
2150
def getAddress(host: Optional[str] = None, port: Optional[str] = None) -> str:
2251
if not host and not port:
2352
address = settings.DAPR_GRPC_ENDPOINT or (

dapr/ext/workflow/workflow_runtime.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from dapr.ext.workflow._durabletask.internal.shared import is_async_callable as _is_async_callable
3030
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
3131
from dapr.ext.workflow.logger import Logger, LoggerOptions
32-
from dapr.ext.workflow.util import getAddress
32+
from dapr.ext.workflow.util import get_grpc_channel_options, getAddress
3333
from dapr.ext.workflow.workflow_activity_context import Activity, WorkflowActivityContext
3434
from dapr.ext.workflow.workflow_context import Workflow
3535

@@ -118,7 +118,17 @@ def __init__(
118118
maximum_concurrent_orchestration_work_items: Optional[int] = None,
119119
maximum_thread_pool_workers: Optional[int] = None,
120120
worker_ready_timeout: Optional[float] = None,
121+
max_grpc_message_length: Optional[int] = None,
121122
):
123+
"""Initializes the workflow runtime.
124+
125+
Args:
126+
max_grpc_message_length: Maximum gRPC message size in bytes for the
127+
workflow channel, applied symmetrically to both send and receive
128+
directions. Precedence: this kwarg, then the
129+
``DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` env var, then the
130+
gRPC default (4 MiB) when neither is set.
131+
"""
122132
self._logger = Logger('WorkflowRuntime', logger_options)
123133
self._worker_ready_timeout = 30.0 if worker_ready_timeout is None else worker_ready_timeout
124134

@@ -137,13 +147,15 @@ def __init__(
137147
if interceptors:
138148
all_interceptors.extend(interceptors)
139149
all_interceptors.append(DaprClientTimeoutInterceptor())
150+
channel_options = get_grpc_channel_options(max_grpc_message_length)
140151
self.__worker = worker.TaskHubGrpcWorker(
141152
host_address=uri.endpoint,
142153
metadata=metadata,
143154
secure_channel=uri.tls,
144155
log_handler=options.log_handler,
145156
log_formatter=options.log_formatter,
146157
interceptors=all_interceptors,
158+
channel_options=channel_options,
147159
concurrency_options=worker.ConcurrencyOptions(
148160
maximum_concurrent_activity_work_items=maximum_concurrent_activity_work_items,
149161
maximum_concurrent_orchestration_work_items=maximum_concurrent_orchestration_work_items,

tests/ext/workflow/test_workflow_client.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from grpc import RpcError
2222

23+
from dapr.conf import settings
2324
from dapr.ext.workflow._durabletask import client
2425
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
2526
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
@@ -127,6 +128,62 @@ def test_timeout_interceptor_is_passed_to_client(self):
127128
self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptor)
128129

129130

131+
class WorkflowClientChannelOptionsTest(unittest.TestCase):
132+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0)
133+
def test_explicit_kwarg_sets_symmetric_channel_options(self):
134+
with mock.patch(
135+
'dapr.ext.workflow._durabletask.client.TaskHubGrpcClient'
136+
) as mock_client_cls:
137+
DaprWorkflowClient(max_grpc_message_length=8 * 1024 * 1024)
138+
channel_options = mock_client_cls.call_args[1]['channel_options']
139+
self.assertEqual(
140+
[
141+
('grpc.max_send_message_length', 8 * 1024 * 1024),
142+
('grpc.max_receive_message_length', 8 * 1024 * 1024),
143+
],
144+
channel_options,
145+
)
146+
147+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024)
148+
def test_env_var_sets_symmetric_channel_options(self):
149+
with mock.patch(
150+
'dapr.ext.workflow._durabletask.client.TaskHubGrpcClient'
151+
) as mock_client_cls:
152+
DaprWorkflowClient()
153+
channel_options = mock_client_cls.call_args[1]['channel_options']
154+
self.assertEqual(
155+
[
156+
('grpc.max_send_message_length', 16 * 1024 * 1024),
157+
('grpc.max_receive_message_length', 16 * 1024 * 1024),
158+
],
159+
channel_options,
160+
)
161+
162+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024)
163+
def test_kwarg_takes_precedence_over_env_var(self):
164+
with mock.patch(
165+
'dapr.ext.workflow._durabletask.client.TaskHubGrpcClient'
166+
) as mock_client_cls:
167+
DaprWorkflowClient(max_grpc_message_length=8 * 1024 * 1024)
168+
channel_options = mock_client_cls.call_args[1]['channel_options']
169+
self.assertEqual(
170+
[
171+
('grpc.max_send_message_length', 8 * 1024 * 1024),
172+
('grpc.max_receive_message_length', 8 * 1024 * 1024),
173+
],
174+
channel_options,
175+
)
176+
177+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0)
178+
def test_neither_set_passes_none(self):
179+
with mock.patch(
180+
'dapr.ext.workflow._durabletask.client.TaskHubGrpcClient'
181+
) as mock_client_cls:
182+
DaprWorkflowClient()
183+
channel_options = mock_client_cls.call_args[1]['channel_options']
184+
self.assertIsNone(channel_options)
185+
186+
130187
class WorkflowClientTest(unittest.TestCase):
131188
def mock_client_wf(ctx: DaprWorkflowContext, input):
132189
print(f'{input}')

tests/ext/workflow/test_workflow_client_aio.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from grpc.aio import AioRpcError
2222

23+
from dapr.conf import settings
2324
from dapr.ext.workflow._durabletask import client
2425
from dapr.ext.workflow.aio import DaprWorkflowClient
2526
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
@@ -128,6 +129,62 @@ async def test_timeout_interceptor_is_passed_to_client(self):
128129
self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptorAsync)
129130

130131

132+
class WorkflowClientAioChannelOptionsTest(unittest.TestCase):
133+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0)
134+
def test_explicit_kwarg_sets_symmetric_channel_options(self):
135+
with mock.patch(
136+
'dapr.ext.workflow._durabletask.aio.client.AsyncTaskHubGrpcClient'
137+
) as mock_client_cls:
138+
DaprWorkflowClient(max_grpc_message_length=8 * 1024 * 1024)
139+
channel_options = mock_client_cls.call_args[1]['channel_options']
140+
self.assertEqual(
141+
[
142+
('grpc.max_send_message_length', 8 * 1024 * 1024),
143+
('grpc.max_receive_message_length', 8 * 1024 * 1024),
144+
],
145+
channel_options,
146+
)
147+
148+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024)
149+
def test_env_var_sets_symmetric_channel_options(self):
150+
with mock.patch(
151+
'dapr.ext.workflow._durabletask.aio.client.AsyncTaskHubGrpcClient'
152+
) as mock_client_cls:
153+
DaprWorkflowClient()
154+
channel_options = mock_client_cls.call_args[1]['channel_options']
155+
self.assertEqual(
156+
[
157+
('grpc.max_send_message_length', 16 * 1024 * 1024),
158+
('grpc.max_receive_message_length', 16 * 1024 * 1024),
159+
],
160+
channel_options,
161+
)
162+
163+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024)
164+
def test_kwarg_takes_precedence_over_env_var(self):
165+
with mock.patch(
166+
'dapr.ext.workflow._durabletask.aio.client.AsyncTaskHubGrpcClient'
167+
) as mock_client_cls:
168+
DaprWorkflowClient(max_grpc_message_length=8 * 1024 * 1024)
169+
channel_options = mock_client_cls.call_args[1]['channel_options']
170+
self.assertEqual(
171+
[
172+
('grpc.max_send_message_length', 8 * 1024 * 1024),
173+
('grpc.max_receive_message_length', 8 * 1024 * 1024),
174+
],
175+
channel_options,
176+
)
177+
178+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0)
179+
def test_neither_set_passes_none(self):
180+
with mock.patch(
181+
'dapr.ext.workflow._durabletask.aio.client.AsyncTaskHubGrpcClient'
182+
) as mock_client_cls:
183+
DaprWorkflowClient()
184+
channel_options = mock_client_cls.call_args[1]['channel_options']
185+
self.assertIsNone(channel_options)
186+
187+
131188
class WorkflowClientAioTest(unittest.IsolatedAsyncioTestCase):
132189
def mock_client_wf(ctx: DaprWorkflowContext, input):
133190
print(f'{input}')

tests/ext/workflow/test_workflow_runtime.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import grpc
2121
from pydantic import BaseModel, ValidationError
2222

23+
from dapr.conf import settings
2324
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
2425
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext
2526
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name
@@ -830,3 +831,63 @@ def my_act(ctx, order: Optional[Order]):
830831
wrapper = self.fake_registry._activity_fns['optional_no_default_act']
831832

832833
self.assertIsNone(wrapper(mock.MagicMock(), None))
834+
835+
836+
class WorkflowRuntimeChannelOptionsTest(unittest.TestCase):
837+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0)
838+
def test_explicit_kwarg_sets_symmetric_channel_options(self):
839+
with mock.patch(
840+
'dapr.ext.workflow._durabletask.worker.TaskHubGrpcWorker'
841+
) as mock_worker_cls:
842+
WorkflowRuntime(max_grpc_message_length=8 * 1024 * 1024)
843+
channel_options = mock_worker_cls.call_args[1]['channel_options']
844+
self.assertEqual(
845+
[
846+
('grpc.max_send_message_length', 8 * 1024 * 1024),
847+
('grpc.max_receive_message_length', 8 * 1024 * 1024),
848+
],
849+
channel_options,
850+
)
851+
852+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024)
853+
def test_env_var_sets_symmetric_channel_options(self):
854+
with mock.patch(
855+
'dapr.ext.workflow._durabletask.worker.TaskHubGrpcWorker'
856+
) as mock_worker_cls:
857+
WorkflowRuntime()
858+
channel_options = mock_worker_cls.call_args[1]['channel_options']
859+
self.assertEqual(
860+
[
861+
('grpc.max_send_message_length', 16 * 1024 * 1024),
862+
('grpc.max_receive_message_length', 16 * 1024 * 1024),
863+
],
864+
channel_options,
865+
)
866+
867+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024)
868+
def test_kwarg_takes_precedence_over_env_var(self):
869+
with mock.patch(
870+
'dapr.ext.workflow._durabletask.worker.TaskHubGrpcWorker'
871+
) as mock_worker_cls:
872+
WorkflowRuntime(max_grpc_message_length=8 * 1024 * 1024)
873+
channel_options = mock_worker_cls.call_args[1]['channel_options']
874+
self.assertEqual(
875+
[
876+
('grpc.max_send_message_length', 8 * 1024 * 1024),
877+
('grpc.max_receive_message_length', 8 * 1024 * 1024),
878+
],
879+
channel_options,
880+
)
881+
882+
@mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0)
883+
def test_neither_set_passes_none(self):
884+
with mock.patch(
885+
'dapr.ext.workflow._durabletask.worker.TaskHubGrpcWorker'
886+
) as mock_worker_cls:
887+
WorkflowRuntime()
888+
channel_options = mock_worker_cls.call_args[1]['channel_options']
889+
self.assertIsNone(channel_options)
890+
891+
892+
if __name__ == '__main__':
893+
unittest.main()

0 commit comments

Comments
 (0)