Skip to content

Commit c7e5510

Browse files
Conformance: Python signal/query workflow stays leased before durable wait (#89)
1 parent e5cd07a commit c7e5510

3 files changed

Lines changed: 95 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
## [Unreleased]
88

99
### Fixed
10+
- Workflow-task completion transport failures now cause the worker to report
11+
the task as failed instead of treating the local replay as completed. This
12+
lets the server re-dispatch quickly instead of leaving a workflow task leased
13+
until timeout when the completion request never records.
1014
- Repeated condition-wait openings for the same logical wait now replay through
1115
every matching signal before deciding whether the wait is still pending, so
1216
long-running signal/query workflows do not get stuck on the first signal.

src/durable_workflow/worker.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,8 @@ async def _run_workflow_task_core(self, task: dict[str, Any]) -> list[dict[str,
775775
)
776776
except Exception as e:
777777
log.warning("failed to complete workflow update task %s: %s", task_id, e)
778+
await self._fail_workflow_task_after_completion_error(task_id, attempt, e)
779+
return None
778780
return [command]
779781

780782
try:
@@ -847,8 +849,32 @@ async def _run_workflow_task_core(self, task: dict[str, Any]) -> list[dict[str,
847849
)
848850
except Exception as e:
849851
log.warning("failed to complete workflow task %s: %s", task_id, e)
852+
await self._fail_workflow_task_after_completion_error(task_id, attempt, e)
853+
return None
850854
return commands
851855

856+
async def _fail_workflow_task_after_completion_error(
857+
self,
858+
task_id: str,
859+
attempt: int,
860+
error: Exception,
861+
) -> None:
862+
try:
863+
await self.client.fail_workflow_task(
864+
task_id=task_id,
865+
lease_owner=self.worker_id,
866+
workflow_task_attempt=attempt,
867+
message=f"workflow task completion failed after commands were produced: {error}",
868+
failure_type=type(error).__name__,
869+
stack_trace=traceback.format_exc(),
870+
)
871+
except Exception as fail_error:
872+
log.warning(
873+
"failed to report workflow task %s completion failure: %s",
874+
task_id,
875+
fail_error,
876+
)
877+
852878
async def _run_activity_task(self, task: dict[str, Any]) -> str:
853879
task_id: str = task["task_id"]
854880
attempt_id: str = task.get("activity_attempt_id") or task.get("attempt_id", "")

tests/test_worker.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,33 @@ async def test_schedule_activity_on_first_replay(self, mock_client: AsyncMock) -
501501
assert commands[0]["arguments"]["codec"] == "json"
502502
assert serializer.decode(commands[0]["arguments"]["blob"], codec="json") == ["hello"]
503503

504+
@pytest.mark.asyncio
505+
async def test_workflow_task_completion_error_fails_task_for_fast_redispatch(
506+
self, mock_client: AsyncMock
507+
) -> None:
508+
mock_client.complete_workflow_task.side_effect = TimeoutError("completion timed out")
509+
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])
510+
task = {
511+
"task_id": "t-complete-timeout",
512+
"workflow_type": "test-wf",
513+
"workflow_task_attempt": 2,
514+
"history_events": [],
515+
"arguments": '["hello"]',
516+
"payload_codec": "json",
517+
}
518+
519+
result = await worker._run_workflow_task(task)
520+
521+
assert result is None
522+
mock_client.complete_workflow_task.assert_awaited_once()
523+
mock_client.fail_workflow_task.assert_awaited_once()
524+
call_kwargs = mock_client.fail_workflow_task.await_args.kwargs
525+
assert call_kwargs["task_id"] == "t-complete-timeout"
526+
assert call_kwargs["workflow_task_attempt"] == 2
527+
assert call_kwargs["lease_owner"] == worker.worker_id
528+
assert call_kwargs["failure_type"] == "TimeoutError"
529+
assert "completion timed out" in call_kwargs["message"]
530+
504531
@pytest.mark.asyncio
505532
async def test_workflow_command_payload_warning_uses_client_policy(
506533
self, mock_client: AsyncMock, caplog: pytest.LogCaptureFixture
@@ -673,6 +700,44 @@ async def test_update_backed_workflow_task_completes_update_command(
673700
]
674701
mock_client.fail_workflow_task.assert_not_called()
675702

703+
@pytest.mark.asyncio
704+
async def test_update_task_completion_error_fails_task_for_fast_redispatch(
705+
self, mock_client: AsyncMock
706+
) -> None:
707+
mock_client.complete_workflow_task.side_effect = TimeoutError("update completion timed out")
708+
worker = Worker(mock_client, task_queue="q1", workflows=[UpdateWorkflow], activities=[])
709+
task = {
710+
"task_id": "t-update-timeout",
711+
"workflow_type": "update-wf",
712+
"workflow_task_attempt": 3,
713+
"workflow_update_id": "upd-worker-1",
714+
"workflow_wait_kind": "update",
715+
"history_events": [
716+
{
717+
"event_type": "UpdateAccepted",
718+
"payload": {
719+
"update_id": "upd-worker-1",
720+
"update_name": "increment",
721+
"arguments": serializer.encode([6], codec="json"),
722+
"payload_codec": "json",
723+
},
724+
},
725+
],
726+
"arguments": "[]",
727+
"payload_codec": "json",
728+
}
729+
730+
result = await worker._run_workflow_task(task)
731+
732+
assert result is None
733+
mock_client.complete_workflow_task.assert_awaited_once()
734+
mock_client.fail_workflow_task.assert_awaited_once()
735+
call_kwargs = mock_client.fail_workflow_task.await_args.kwargs
736+
assert call_kwargs["task_id"] == "t-update-timeout"
737+
assert call_kwargs["workflow_task_attempt"] == 3
738+
assert call_kwargs["failure_type"] == "TimeoutError"
739+
assert "update completion timed out" in call_kwargs["message"]
740+
676741
@pytest.mark.asyncio
677742
async def test_query_task_executes_registered_query(self, mock_client: AsyncMock) -> None:
678743
worker = Worker(mock_client, task_queue="q1", workflows=[QueryWorkflow], activities=[])

0 commit comments

Comments
 (0)