Skip to content

Commit 56775eb

Browse files
committed
fix: address copilot feedback
Signed-off-by: Samantha Coyle <sam@diagrid.io>
1 parent 3179c7d commit 56775eb

3 files changed

Lines changed: 72 additions & 8 deletions

File tree

ext/dapr-ext-workflow/dapr/ext/workflow/aio/mcp.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,17 +102,29 @@ async def connect(self, mcpserver_name: str) -> None:
102102
)
103103
break
104104
except Exception as exc: # noqa: BLE001 — classified by helper
105-
if not _is_transient_schedule_error(exc) or time.monotonic() >= deadline:
105+
if not _is_transient_schedule_error(exc):
106+
raise
107+
sleep_for = min(
108+
_SCHEDULE_RETRY_INTERVAL_SECONDS, deadline - time.monotonic()
109+
)
110+
if sleep_for <= 0:
106111
raise
107112
logger.debug(
108113
'schedule_new_workflow returned transient error %s; retrying', exc
109114
)
110-
await asyncio.sleep(_SCHEDULE_RETRY_INTERVAL_SECONDS)
115+
await asyncio.sleep(sleep_for)
111116

112-
remaining = max(deadline - time.monotonic(), 1.0)
117+
remaining = deadline - time.monotonic()
118+
if remaining <= 0:
119+
raise RuntimeError(
120+
f"ListTools workflow for MCPServer '{mcpserver_name}' "
121+
f'timed out after {self._timeout}s'
122+
)
123+
# wait_for_workflow_completion treats timeout=0 as "wait forever",
124+
# so floor the gRPC timeout at 1s when sub-second remaining survives.
113125
state = await self._wf_client.wait_for_workflow_completion(
114126
instance_id=instance_id,
115-
timeout_in_seconds=int(remaining),
127+
timeout_in_seconds=max(int(remaining), 1),
116128
fetch_payloads=True,
117129
)
118130

ext/dapr-ext-workflow/dapr/ext/workflow/mcp.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,17 +245,29 @@ def connect(self, mcpserver_name: str) -> None:
245245
)
246246
break
247247
except Exception as exc: # noqa: BLE001 — classified by helper
248-
if not _is_transient_schedule_error(exc) or time.monotonic() >= deadline:
248+
if not _is_transient_schedule_error(exc):
249+
raise
250+
sleep_for = min(
251+
_SCHEDULE_RETRY_INTERVAL_SECONDS, deadline - time.monotonic()
252+
)
253+
if sleep_for <= 0:
249254
raise
250255
logger.debug(
251256
'schedule_new_workflow returned transient error %s; retrying', exc
252257
)
253-
time.sleep(_SCHEDULE_RETRY_INTERVAL_SECONDS)
258+
time.sleep(sleep_for)
254259

255-
remaining = max(deadline - time.monotonic(), 1.0)
260+
remaining = deadline - time.monotonic()
261+
if remaining <= 0:
262+
raise RuntimeError(
263+
f"ListTools workflow for MCPServer '{mcpserver_name}' "
264+
f'timed out after {self._timeout}s'
265+
)
266+
# wait_for_workflow_completion treats timeout=0 as "wait forever",
267+
# so floor the gRPC timeout at 1s when sub-second remaining survives.
256268
state = self._wf_client.wait_for_workflow_completion(
257269
instance_id=instance_id,
258-
timeout_in_seconds=int(remaining),
270+
timeout_in_seconds=max(int(remaining), 1),
259271
fetch_payloads=True,
260272
)
261273

ext/dapr-ext-workflow/tests/test_mcp_client.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,28 @@ def test_deadline_exhausted_raises_last_error(self):
469469
with self.assertRaises(grpc.RpcError):
470470
mcp_client.connect('weather')
471471

472+
def test_budget_exhausted_after_schedule_succeeds(self):
473+
"""If retries burn the budget but schedule eventually succeeds, raise
474+
without calling wait_for_workflow_completion (timeout=0 means
475+
'wait forever' in the underlying client)."""
476+
mock_wf = MagicMock()
477+
mock_wf.schedule_new_workflow.side_effect = [
478+
_StubRpcError(grpc.StatusCode.CANCELLED),
479+
'inst-1',
480+
]
481+
482+
mcp_client = DaprMCPClient(timeout_in_seconds=1, wf_client=mock_wf)
483+
# monotonic: 0.0 → deadline = 1.0; 0.4 → sleep_for = 0.5 (still in budget);
484+
# 2.0 → post-loop remaining = -1.0 → raise.
485+
with patch('dapr.ext.workflow.mcp.time.sleep'), patch(
486+
'dapr.ext.workflow.mcp.time.monotonic',
487+
side_effect=[0.0, 0.4, 2.0],
488+
):
489+
with self.assertRaises(RuntimeError) as ctx:
490+
mcp_client.connect('weather')
491+
self.assertIn('timed out', str(ctx.exception))
492+
mock_wf.wait_for_workflow_completion.assert_not_called()
493+
472494

473495
class TestAioDaprMCPClientConnectRetry(unittest.IsolatedAsyncioTestCase):
474496
"""Async counterpart of TestDaprMCPClientConnectRetry."""
@@ -504,6 +526,24 @@ async def test_deadline_exhausted_raises(self):
504526
with self.assertRaises(grpc.RpcError):
505527
await mcp_client.connect('weather')
506528

529+
async def test_budget_exhausted_after_schedule_succeeds(self):
530+
"""Async mirror of the fail-fast-after-schedule-success guard."""
531+
mock_wf = AsyncMock()
532+
mock_wf.schedule_new_workflow.side_effect = [
533+
_StubRpcError(grpc.StatusCode.CANCELLED),
534+
'inst-1',
535+
]
536+
537+
mcp_client = AioDaprMCPClient(timeout_in_seconds=1, wf_client=mock_wf)
538+
with patch('dapr.ext.workflow.aio.mcp.asyncio.sleep', new=AsyncMock()), patch(
539+
'dapr.ext.workflow.aio.mcp.time.monotonic',
540+
side_effect=[0.0, 0.4, 2.0],
541+
):
542+
with self.assertRaises(RuntimeError) as ctx:
543+
await mcp_client.connect('weather')
544+
self.assertIn('timed out', str(ctx.exception))
545+
mock_wf.wait_for_workflow_completion.assert_not_awaited()
546+
507547

508548
class TestMCPWorkflowPrefix(unittest.TestCase):
509549
"""Tests for the workflow naming constant."""

0 commit comments

Comments
 (0)