From 9f4099283d0652161f112328e8138747cbf17920 Mon Sep 17 00:00:00 2001 From: Martez Killens <5050479+tezizzm@users.noreply.github.com> Date: Tue, 9 Jun 2026 17:50:17 -0700 Subject: [PATCH] fix(ext-workflow): honor DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES on workflow channels Workflow activity payloads over gRPC's 4 MiB default raised RESOURCE_EXHAUSTED because the durabletask channel ignored the message size limit. #1024 plumbed DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES through the Dapr API channel only; the workflow worker and clients still fell back to the gRPC default. Add a get_grpc_channel_options helper that resolves the limit from an explicit max_grpc_message_length kwarg, then the env var, then None, and thread it through WorkflowRuntime, DaprWorkflowClient, and the async DaprWorkflowClient. Unlike #1024, set both send and receive limits symmetrically since workflow payloads cross the channel in both directions. Reuses the existing setting; no new env var. Signed-off-by: Martez Killens <5050479+tezizzm@users.noreply.github.com> --- .../ext/workflow/aio/dapr_workflow_client.py | 14 ++++- .../dapr/ext/workflow/dapr_workflow_client.py | 14 ++++- .../dapr/ext/workflow/util.py | 31 +++++++++- .../dapr/ext/workflow/workflow_runtime.py | 14 ++++- .../tests/test_workflow_client.py | 58 +++++++++++++++++ .../tests/test_workflow_client_aio.py | 58 +++++++++++++++++ .../tests/test_workflow_runtime.py | 62 +++++++++++++++++++ .../tests/test_workflow_util.py | 41 +++++++++++- 8 files changed, 287 insertions(+), 5 deletions(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py index b72be558d..97ff243f2 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py @@ -21,7 +21,7 @@ from dapr.ext.workflow._durabletask import client from dapr.ext.workflow._durabletask.aio import client as aioclient from dapr.ext.workflow.logger import Logger, LoggerOptions -from dapr.ext.workflow.util import getAddress +from dapr.ext.workflow.util import get_grpc_channel_options, getAddress from dapr.ext.workflow.workflow_context import Workflow from dapr.ext.workflow.workflow_state import WorkflowState from grpc.aio import AioRpcError @@ -49,7 +49,17 @@ def __init__( host: Optional[str] = None, port: Optional[str] = None, logger_options: Optional[LoggerOptions] = None, + max_grpc_message_length: Optional[int] = None, ): + """Initializes the async workflow client. + + Args: + max_grpc_message_length: Maximum gRPC message size in bytes for the + workflow channel, applied symmetrically to both send and receive + directions. Precedence: this kwarg, then the + ``DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` env var, then the + gRPC default (4 MiB) when neither is set. + """ address = getAddress(host, port) try: @@ -63,6 +73,7 @@ def __init__( if settings.DAPR_API_TOKEN: metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),) options = self._logger.get_options() + channel_options = get_grpc_channel_options(max_grpc_message_length) self.__obj = aioclient.AsyncTaskHubGrpcClient( host_address=uri.endpoint, metadata=metadata, @@ -70,6 +81,7 @@ def __init__( log_handler=options.log_handler, log_formatter=options.log_formatter, interceptors=[DaprClientTimeoutInterceptorAsync()], + channel_options=channel_options, ) async def schedule_new_workflow( diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index d732f7747..1135c094e 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -20,7 +20,7 @@ from dapr.ext.workflow._durabletask import client from dapr.ext.workflow.logger import Logger, LoggerOptions -from dapr.ext.workflow.util import getAddress +from dapr.ext.workflow.util import get_grpc_channel_options, getAddress from dapr.ext.workflow.workflow_context import Workflow from dapr.ext.workflow.workflow_state import WorkflowState from grpc import RpcError @@ -51,7 +51,17 @@ def __init__( host: Optional[str] = None, port: Optional[str] = None, logger_options: Optional[LoggerOptions] = None, + max_grpc_message_length: Optional[int] = None, ): + """Initializes the sync workflow client. + + Args: + max_grpc_message_length: Maximum gRPC message size in bytes for the + workflow channel, applied symmetrically to both send and receive + directions. Precedence: this kwarg, then the + ``DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` env var, then the + gRPC default (4 MiB) when neither is set. + """ address = getAddress(host, port) try: @@ -65,6 +75,7 @@ def __init__( if settings.DAPR_API_TOKEN: metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),) options = self._logger.get_options() + channel_options = get_grpc_channel_options(max_grpc_message_length) self.__obj = client.TaskHubGrpcClient( host_address=uri.endpoint, metadata=metadata, @@ -72,6 +83,7 @@ def __init__( log_handler=options.log_handler, log_formatter=options.log_formatter, interceptors=[DaprClientTimeoutInterceptor()], + channel_options=channel_options, ) def schedule_new_workflow( diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/util.py b/ext/dapr-ext-workflow/dapr/ext/workflow/util.py index 3199e2558..e4d65bc36 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/util.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/util.py @@ -13,11 +13,40 @@ limitations under the License. """ -from typing import Optional +from typing import Any, Optional, Sequence from dapr.conf import settings +def get_grpc_channel_options( + max_grpc_message_length: Optional[int] = None, +) -> Optional[Sequence[tuple[str, Any]]]: + """Resolves gRPC channel options for the workflow message-size limit. + + Resolution order: the explicit ``max_grpc_message_length`` kwarg, then + ``settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES``, else ``None``. + + Sets BOTH send and receive limits symmetrically because workflow activity + payloads cross the channel in both directions. ``None`` is returned when no + limit is configured, preserving the gRPC default behavior. + + Args: + max_grpc_message_length: Explicit max gRPC message size in bytes. Takes + precedence over the env-var setting when truthy. + + Returns: + A sequence of ``(option, value)`` tuples setting both send and receive + limits, or ``None`` when no limit is configured. + """ + size = max_grpc_message_length or settings.DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES or None + if size is None: + return None + return [ + ('grpc.max_send_message_length', size), + ('grpc.max_receive_message_length', size), + ] + + def getAddress(host: Optional[str] = None, port: Optional[str] = None) -> str: if not host and not port: address = settings.DAPR_GRPC_ENDPOINT or ( diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index f33622a15..d27b3c6a3 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -22,7 +22,7 @@ from dapr.ext.workflow._durabletask import task, worker from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext from dapr.ext.workflow.logger import Logger, LoggerOptions -from dapr.ext.workflow.util import getAddress +from dapr.ext.workflow.util import get_grpc_channel_options, getAddress from dapr.ext.workflow.workflow_activity_context import Activity, WorkflowActivityContext from dapr.ext.workflow.workflow_context import Workflow @@ -59,7 +59,17 @@ def __init__( maximum_concurrent_orchestration_work_items: Optional[int] = None, maximum_thread_pool_workers: Optional[int] = None, worker_ready_timeout: Optional[float] = None, + max_grpc_message_length: Optional[int] = None, ): + """Initializes the workflow runtime. + + Args: + max_grpc_message_length: Maximum gRPC message size in bytes for the + workflow channel, applied symmetrically to both send and receive + directions. Precedence: this kwarg, then the + ``DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES`` env var, then the + gRPC default (4 MiB) when neither is set. + """ self._logger = Logger('WorkflowRuntime', logger_options) self._worker_ready_timeout = 30.0 if worker_ready_timeout is None else worker_ready_timeout @@ -78,6 +88,7 @@ def __init__( if interceptors: all_interceptors.extend(interceptors) all_interceptors.append(DaprClientTimeoutInterceptor()) + channel_options = get_grpc_channel_options(max_grpc_message_length) self.__worker = worker.TaskHubGrpcWorker( host_address=uri.endpoint, metadata=metadata, @@ -85,6 +96,7 @@ def __init__( log_handler=options.log_handler, log_formatter=options.log_formatter, interceptors=all_interceptors, + channel_options=channel_options, concurrency_options=worker.ConcurrencyOptions( maximum_concurrent_activity_work_items=maximum_concurrent_activity_work_items, maximum_concurrent_orchestration_work_items=maximum_concurrent_orchestration_work_items, diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client.py b/ext/dapr-ext-workflow/tests/test_workflow_client.py index 88e227373..15dae73c8 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client.py @@ -23,6 +23,8 @@ from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext from grpc import RpcError +from dapr.conf import settings + mock_schedule_result = 'workflow001' mock_raise_event_result = 'event001' mock_terminate_result = 'terminate001' @@ -126,6 +128,62 @@ def test_timeout_interceptor_is_passed_to_client(self): self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptor) +class WorkflowClientChannelOptionsTest(unittest.TestCase): + @mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0) + def test_explicit_kwarg_sets_symmetric_channel_options(self): + with mock.patch( + 'dapr.ext.workflow._durabletask.client.TaskHubGrpcClient' + ) as mock_client_cls: + DaprWorkflowClient(max_grpc_message_length=8 * 1024 * 1024) + channel_options = mock_client_cls.call_args[1]['channel_options'] + self.assertEqual( + [ + ('grpc.max_send_message_length', 8 * 1024 * 1024), + ('grpc.max_receive_message_length', 8 * 1024 * 1024), + ], + channel_options, + ) + + @mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024) + def test_env_var_sets_symmetric_channel_options(self): + with mock.patch( + 'dapr.ext.workflow._durabletask.client.TaskHubGrpcClient' + ) as mock_client_cls: + DaprWorkflowClient() + channel_options = mock_client_cls.call_args[1]['channel_options'] + self.assertEqual( + [ + ('grpc.max_send_message_length', 16 * 1024 * 1024), + ('grpc.max_receive_message_length', 16 * 1024 * 1024), + ], + channel_options, + ) + + @mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024) + def test_kwarg_takes_precedence_over_env_var(self): + with mock.patch( + 'dapr.ext.workflow._durabletask.client.TaskHubGrpcClient' + ) as mock_client_cls: + DaprWorkflowClient(max_grpc_message_length=8 * 1024 * 1024) + channel_options = mock_client_cls.call_args[1]['channel_options'] + self.assertEqual( + [ + ('grpc.max_send_message_length', 8 * 1024 * 1024), + ('grpc.max_receive_message_length', 8 * 1024 * 1024), + ], + channel_options, + ) + + @mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0) + def test_neither_set_passes_none(self): + with mock.patch( + 'dapr.ext.workflow._durabletask.client.TaskHubGrpcClient' + ) as mock_client_cls: + DaprWorkflowClient() + channel_options = mock_client_cls.call_args[1]['channel_options'] + self.assertIsNone(channel_options) + + class WorkflowClientTest(unittest.TestCase): def mock_client_wf(ctx: DaprWorkflowContext, input): print(f'{input}') diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py b/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py index 729231e8d..3e25ef46b 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client_aio.py @@ -23,6 +23,8 @@ from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext from grpc.aio import AioRpcError +from dapr.conf import settings + mock_schedule_result = 'workflow001' mock_raise_event_result = 'event001' mock_terminate_result = 'terminate001' @@ -127,6 +129,62 @@ async def test_timeout_interceptor_is_passed_to_client(self): self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptorAsync) +class WorkflowClientAioChannelOptionsTest(unittest.TestCase): + @mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0) + def test_explicit_kwarg_sets_symmetric_channel_options(self): + with mock.patch( + 'dapr.ext.workflow._durabletask.aio.client.AsyncTaskHubGrpcClient' + ) as mock_client_cls: + DaprWorkflowClient(max_grpc_message_length=8 * 1024 * 1024) + channel_options = mock_client_cls.call_args[1]['channel_options'] + self.assertEqual( + [ + ('grpc.max_send_message_length', 8 * 1024 * 1024), + ('grpc.max_receive_message_length', 8 * 1024 * 1024), + ], + channel_options, + ) + + @mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024) + def test_env_var_sets_symmetric_channel_options(self): + with mock.patch( + 'dapr.ext.workflow._durabletask.aio.client.AsyncTaskHubGrpcClient' + ) as mock_client_cls: + DaprWorkflowClient() + channel_options = mock_client_cls.call_args[1]['channel_options'] + self.assertEqual( + [ + ('grpc.max_send_message_length', 16 * 1024 * 1024), + ('grpc.max_receive_message_length', 16 * 1024 * 1024), + ], + channel_options, + ) + + @mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024) + def test_kwarg_takes_precedence_over_env_var(self): + with mock.patch( + 'dapr.ext.workflow._durabletask.aio.client.AsyncTaskHubGrpcClient' + ) as mock_client_cls: + DaprWorkflowClient(max_grpc_message_length=8 * 1024 * 1024) + channel_options = mock_client_cls.call_args[1]['channel_options'] + self.assertEqual( + [ + ('grpc.max_send_message_length', 8 * 1024 * 1024), + ('grpc.max_receive_message_length', 8 * 1024 * 1024), + ], + channel_options, + ) + + @mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0) + def test_neither_set_passes_none(self): + with mock.patch( + 'dapr.ext.workflow._durabletask.aio.client.AsyncTaskHubGrpcClient' + ) as mock_client_cls: + DaprWorkflowClient() + channel_options = mock_client_cls.call_args[1]['channel_options'] + self.assertIsNone(channel_options) + + class WorkflowClientAioTest(unittest.IsolatedAsyncioTestCase): def mock_client_wf(ctx: DaprWorkflowContext, input): print(f'{input}') diff --git a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py index 32331a263..7384b0cff 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_runtime.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_runtime.py @@ -23,6 +23,8 @@ from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name from pydantic import BaseModel, ValidationError +from dapr.conf import settings + class Order(BaseModel): order_id: str @@ -829,3 +831,63 @@ def my_act(ctx, order: Optional[Order]): wrapper = self.fake_registry._activity_fns['optional_no_default_act'] self.assertIsNone(wrapper(mock.MagicMock(), None)) + + +class WorkflowRuntimeChannelOptionsTest(unittest.TestCase): + @mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0) + def test_explicit_kwarg_sets_symmetric_channel_options(self): + with mock.patch( + 'dapr.ext.workflow._durabletask.worker.TaskHubGrpcWorker' + ) as mock_worker_cls: + WorkflowRuntime(max_grpc_message_length=8 * 1024 * 1024) + channel_options = mock_worker_cls.call_args[1]['channel_options'] + self.assertEqual( + [ + ('grpc.max_send_message_length', 8 * 1024 * 1024), + ('grpc.max_receive_message_length', 8 * 1024 * 1024), + ], + channel_options, + ) + + @mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024) + def test_env_var_sets_symmetric_channel_options(self): + with mock.patch( + 'dapr.ext.workflow._durabletask.worker.TaskHubGrpcWorker' + ) as mock_worker_cls: + WorkflowRuntime() + channel_options = mock_worker_cls.call_args[1]['channel_options'] + self.assertEqual( + [ + ('grpc.max_send_message_length', 16 * 1024 * 1024), + ('grpc.max_receive_message_length', 16 * 1024 * 1024), + ], + channel_options, + ) + + @mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024) + def test_kwarg_takes_precedence_over_env_var(self): + with mock.patch( + 'dapr.ext.workflow._durabletask.worker.TaskHubGrpcWorker' + ) as mock_worker_cls: + WorkflowRuntime(max_grpc_message_length=8 * 1024 * 1024) + channel_options = mock_worker_cls.call_args[1]['channel_options'] + self.assertEqual( + [ + ('grpc.max_send_message_length', 8 * 1024 * 1024), + ('grpc.max_receive_message_length', 8 * 1024 * 1024), + ], + channel_options, + ) + + @mock.patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0) + def test_neither_set_passes_none(self): + with mock.patch( + 'dapr.ext.workflow._durabletask.worker.TaskHubGrpcWorker' + ) as mock_worker_cls: + WorkflowRuntime() + channel_options = mock_worker_cls.call_args[1]['channel_options'] + self.assertIsNone(channel_options) + + +if __name__ == '__main__': + unittest.main() diff --git a/ext/dapr-ext-workflow/tests/test_workflow_util.py b/ext/dapr-ext-workflow/tests/test_workflow_util.py index 28e92e6c5..6d570ac3e 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_util.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_util.py @@ -1,7 +1,7 @@ import unittest from unittest.mock import patch -from dapr.ext.workflow.util import getAddress +from dapr.ext.workflow.util import get_grpc_channel_options, getAddress from dapr.conf import settings @@ -28,3 +28,42 @@ def test_get_address_with_constructor_arguments_and_env_variable(self): @patch.object(settings, 'DAPR_GRPC_ENDPOINT', 'https://domain1.com:5000') def test_get_address_with_env_variable(self): self.assertEqual('https://domain1.com:5000', getAddress()) + + +class GetGrpcChannelOptionsTest(unittest.TestCase): + @patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0) + def test_explicit_kwarg_sets_both_directions(self): + options = get_grpc_channel_options(8 * 1024 * 1024) + self.assertEqual( + [ + ('grpc.max_send_message_length', 8 * 1024 * 1024), + ('grpc.max_receive_message_length', 8 * 1024 * 1024), + ], + options, + ) + + @patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024) + def test_env_var_sets_both_directions(self): + options = get_grpc_channel_options() + self.assertEqual( + [ + ('grpc.max_send_message_length', 16 * 1024 * 1024), + ('grpc.max_receive_message_length', 16 * 1024 * 1024), + ], + options, + ) + + @patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 16 * 1024 * 1024) + def test_kwarg_takes_precedence_over_env_var(self): + options = get_grpc_channel_options(8 * 1024 * 1024) + self.assertEqual( + [ + ('grpc.max_send_message_length', 8 * 1024 * 1024), + ('grpc.max_receive_message_length', 8 * 1024 * 1024), + ], + options, + ) + + @patch.object(settings, 'DAPR_GRPC_MAX_INBOUND_MESSAGE_SIZE_BYTES', 0) + def test_neither_set_returns_none(self): + self.assertIsNone(get_grpc_channel_options())