Skip to content

Commit b5cb777

Browse files
authored
Merge branch 'main' into feat/mcp-crd
2 parents feb86a5 + 7750593 commit b5cb777

6 files changed

Lines changed: 108 additions & 1 deletion

File tree

ext/dapr-ext-workflow/dapr/ext/workflow/aio/dapr_workflow_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from dapr.ext.workflow.workflow_state import WorkflowState
2727
from grpc.aio import AioRpcError
2828

29+
from dapr.aio.clients.grpc.interceptors import DaprClientTimeoutInterceptorAsync
2930
from dapr.clients import DaprInternalError
3031
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
3132
from dapr.conf import settings
@@ -68,6 +69,7 @@ def __init__(
6869
secure_channel=uri.tls,
6970
log_handler=options.log_handler,
7071
log_formatter=options.log_formatter,
72+
interceptors=[DaprClientTimeoutInterceptorAsync()],
7173
)
7274

7375
async def schedule_new_workflow(

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from grpc import RpcError
2727

2828
from dapr.clients import DaprInternalError
29+
from dapr.clients.grpc.interceptors import DaprClientTimeoutInterceptor
2930
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
3031
from dapr.conf import settings
3132
from dapr.conf.helpers import GrpcEndpoint
@@ -70,6 +71,7 @@ def __init__(
7071
secure_channel=uri.tls,
7172
log_handler=options.log_handler,
7273
log_formatter=options.log_formatter,
74+
interceptors=[DaprClientTimeoutInterceptor()],
7375
)
7476

7577
def schedule_new_workflow(

ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from dapr.ext.workflow.workflow_context import Workflow
2828

2929
from dapr.clients import DaprInternalError
30+
from dapr.clients.grpc.interceptors import DaprClientTimeoutInterceptor
3031
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
3132
from dapr.conf import settings
3233
from dapr.conf.helpers import GrpcEndpoint
@@ -73,13 +74,17 @@ def __init__(
7374
raise DaprInternalError(f'{error}') from error
7475

7576
options = self._logger.get_options()
77+
all_interceptors = []
78+
if interceptors:
79+
all_interceptors.extend(interceptors)
80+
all_interceptors.append(DaprClientTimeoutInterceptor())
7681
self.__worker = worker.TaskHubGrpcWorker(
7782
host_address=uri.endpoint,
7883
metadata=metadata,
7984
secure_channel=uri.tls,
8085
log_handler=options.log_handler,
8186
log_formatter=options.log_formatter,
82-
interceptors=interceptors,
87+
interceptors=all_interceptors,
8388
concurrency_options=worker.ConcurrencyOptions(
8489
maximum_concurrent_activity_work_items=maximum_concurrent_activity_work_items,
8590
maximum_concurrent_orchestration_work_items=maximum_concurrent_orchestration_work_items,

ext/dapr-ext-workflow/tests/test_workflow_client.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,21 @@ def _inner_get_orchestration_state(self, instance_id, state: client.Orchestratio
111111
)
112112

113113

114+
class WorkflowClientTimeoutInterceptorTest(unittest.TestCase):
115+
def test_timeout_interceptor_is_passed_to_client(self):
116+
with mock.patch(
117+
'dapr.ext.workflow._durabletask.client.TaskHubGrpcClient'
118+
) as mock_client_cls:
119+
DaprWorkflowClient()
120+
mock_client_cls.assert_called_once()
121+
call_kwargs = mock_client_cls.call_args[1]
122+
interceptors = call_kwargs['interceptors']
123+
self.assertEqual(len(interceptors), 1)
124+
from dapr.clients.grpc.interceptors import DaprClientTimeoutInterceptor
125+
126+
self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptor)
127+
128+
114129
class WorkflowClientTest(unittest.TestCase):
115130
def mock_client_wf(ctx: DaprWorkflowContext, input):
116131
print(f'{input}')
@@ -186,3 +201,5 @@ def test_client_functions(self):
186201

187202
actual_purge_result = wfClient.purge_workflow(instance_id=mock_instance_id)
188203
assert actual_purge_result == mock_purge_result
204+
actual_purge_result = wfClient.purge_workflow(instance_id=mock_instance_id)
205+
assert actual_purge_result == mock_purge_result

ext/dapr-ext-workflow/tests/test_workflow_client_aio.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,21 @@ def _inner_get_orchestration_state(self, instance_id, state: client.Orchestratio
112112
)
113113

114114

115+
class WorkflowClientAioTimeoutInterceptorTest(unittest.IsolatedAsyncioTestCase):
116+
async def test_timeout_interceptor_is_passed_to_client(self):
117+
with mock.patch(
118+
'dapr.ext.workflow._durabletask.aio.client.AsyncTaskHubGrpcClient'
119+
) as mock_client_cls:
120+
DaprWorkflowClient()
121+
mock_client_cls.assert_called_once()
122+
call_kwargs = mock_client_cls.call_args[1]
123+
interceptors = call_kwargs['interceptors']
124+
self.assertEqual(len(interceptors), 1)
125+
from dapr.aio.clients.grpc.interceptors import DaprClientTimeoutInterceptorAsync
126+
127+
self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptorAsync)
128+
129+
115130
class WorkflowClientAioTest(unittest.IsolatedAsyncioTestCase):
116131
def mock_client_wf(ctx: DaprWorkflowContext, input):
117132
print(f'{input}')
@@ -190,3 +205,5 @@ async def test_client_functions(self):
190205

191206
actual_purge_result = await wfClient.purge_workflow(instance_id=mock_instance_id)
192207
assert actual_purge_result == mock_purge_result
208+
actual_purge_result = await wfClient.purge_workflow(instance_id=mock_instance_id)
209+
assert actual_purge_result == mock_purge_result

ext/dapr-ext-workflow/tests/test_workflow_runtime.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from typing import List, Optional
1818
from unittest import mock
1919

20+
import grpc
2021
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
2122
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext
2223
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name
@@ -46,6 +47,63 @@ def add_named_activity(self, name: str, fn):
4647
self._activity_fns[name] = fn
4748

4849

50+
class WorkflowRuntimeTimeoutInterceptorTest(unittest.TestCase):
51+
def setUp(self):
52+
listActivities.clear()
53+
listOrchestrators.clear()
54+
self._registry_patch = mock.patch(
55+
'dapr.ext.workflow._durabletask.worker._Registry',
56+
return_value=FakeTaskHubGrpcWorker(),
57+
)
58+
self._registry_patch.start()
59+
60+
def tearDown(self):
61+
mock.patch.stopall()
62+
63+
def test_timeout_interceptor_is_added(self):
64+
with mock.patch(
65+
'dapr.ext.workflow._durabletask.worker.TaskHubGrpcWorker'
66+
) as mock_worker_cls:
67+
WorkflowRuntime()
68+
mock_worker_cls.assert_called_once()
69+
call_kwargs = mock_worker_cls.call_args[1]
70+
interceptors = call_kwargs['interceptors']
71+
self.assertEqual(len(interceptors), 1)
72+
from dapr.clients.grpc.interceptors import DaprClientTimeoutInterceptor
73+
74+
self.assertIsInstance(interceptors[0], DaprClientTimeoutInterceptor)
75+
76+
def test_timeout_interceptor_with_custom_interceptors(self):
77+
custom_interceptor = mock.MagicMock(spec=grpc.UnaryUnaryClientInterceptor)
78+
with mock.patch(
79+
'dapr.ext.workflow._durabletask.worker.TaskHubGrpcWorker'
80+
) as mock_worker_cls:
81+
WorkflowRuntime(interceptors=[custom_interceptor])
82+
call_kwargs = mock_worker_cls.call_args[1]
83+
interceptors = call_kwargs['interceptors']
84+
self.assertEqual(len(interceptors), 2)
85+
from dapr.clients.grpc.interceptors import DaprClientTimeoutInterceptor
86+
87+
self.assertIs(interceptors[0], custom_interceptor)
88+
self.assertIsInstance(interceptors[1], DaprClientTimeoutInterceptor)
89+
90+
def test_timeout_interceptor_preserves_custom_interceptor_order(self):
91+
custom1 = mock.MagicMock(spec=grpc.UnaryUnaryClientInterceptor)
92+
custom2 = mock.MagicMock(spec=grpc.UnaryStreamClientInterceptor)
93+
with mock.patch(
94+
'dapr.ext.workflow._durabletask.worker.TaskHubGrpcWorker'
95+
) as mock_worker_cls:
96+
WorkflowRuntime(interceptors=[custom1, custom2])
97+
call_kwargs = mock_worker_cls.call_args[1]
98+
interceptors = call_kwargs['interceptors']
99+
self.assertEqual(len(interceptors), 3)
100+
from dapr.clients.grpc.interceptors import DaprClientTimeoutInterceptor
101+
102+
self.assertIs(interceptors[0], custom1)
103+
self.assertIs(interceptors[1], custom2)
104+
self.assertIsInstance(interceptors[2], DaprClientTimeoutInterceptor)
105+
106+
49107
class WorkflowRuntimeTest(unittest.TestCase):
50108
def setUp(self):
51109
listActivities.clear()
@@ -765,3 +823,9 @@ def my_act(ctx, order: Optional[Order]):
765823
wrapper = self.fake_registry._activity_fns['optional_no_default_act']
766824

767825
self.assertIsNone(wrapper(mock.MagicMock(), None))
826+
wrapper = self.fake_registry._activity_fns['optional_no_default_act']
827+
828+
self.assertIsNone(wrapper(mock.MagicMock(), None))
829+
wrapper = self.fake_registry._activity_fns['optional_no_default_act']
830+
831+
self.assertIsNone(wrapper(mock.MagicMock(), None))

0 commit comments

Comments
 (0)