Skip to content

Commit c9a7f2a

Browse files
[cross-repo from server#267] Conformance: replay approve signal does not complete after activity state query (#98)
1 parent 70b260d commit c9a7f2a

3 files changed

Lines changed: 119 additions & 10 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
## [Unreleased]
88

99
### Fixed
10+
- Workflow-task completion now retries transient transport failures and server
11+
throttling/5xx rejections before preserving emitted commands or reporting a
12+
definite task failure, reducing stuck waiting runs when a signal-satisfied
13+
wait completes immediately after replay-driven query activity.
1014
- Ambiguous workflow-task completion failures no longer get reported back as
1115
durable task failures after commands have been produced. Definite server
1216
rejections are still treated as failed workflow tasks even when the

src/durable_workflow/worker.py

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@
9191
"task_not_leased",
9292
"workflow_task_attempt_mismatch",
9393
}
94+
_WORKFLOW_TASK_COMPLETION_MAX_ATTEMPTS = 3
95+
_WORKFLOW_TASK_COMPLETION_RETRY_DELAYS = (0.05, 0.2)
9496
_WORKER_WORKFLOW_FINGERPRINTS: dict[tuple[str, str], str] = {}
9597

9698

@@ -141,6 +143,13 @@ def _should_fail_workflow_task_after_completion_error(error: BaseException) -> b
141143
return isinstance(error, DurableWorkflowError)
142144

143145

146+
def _should_retry_workflow_task_completion_error(error: BaseException) -> bool:
147+
if isinstance(error, ServerError):
148+
return error.status >= 500 or error.status == 429
149+
150+
return not isinstance(error, DurableWorkflowError)
151+
152+
144153
def _signal_arguments_envelope_from_export(
145154
signal: Mapping[str, Any],
146155
*,
@@ -797,10 +806,9 @@ async def _run_workflow_task_core(self, task: dict[str, Any]) -> list[dict[str,
797806
command["type"],
798807
)
799808
try:
800-
await self.client.complete_workflow_task(
809+
await self._complete_workflow_task_with_retry(
801810
task_id=task_id,
802-
lease_owner=self.worker_id,
803-
workflow_task_attempt=attempt,
811+
attempt=attempt,
804812
commands=[command],
805813
)
806814
except Exception as e:
@@ -872,10 +880,9 @@ async def _run_workflow_task_core(self, task: dict[str, Any]) -> list[dict[str,
872880
[c["type"] for c in commands],
873881
)
874882
try:
875-
await self.client.complete_workflow_task(
883+
await self._complete_workflow_task_with_retry(
876884
task_id=task_id,
877-
lease_owner=self.worker_id,
878-
workflow_task_attempt=attempt,
885+
attempt=attempt,
879886
commands=commands,
880887
)
881888
except Exception as e:
@@ -885,6 +892,42 @@ async def _run_workflow_task_core(self, task: dict[str, Any]) -> list[dict[str,
885892
return None
886893
return commands
887894

895+
async def _complete_workflow_task_with_retry(
896+
self,
897+
*,
898+
task_id: str,
899+
attempt: int,
900+
commands: list[dict[str, Any]],
901+
) -> None:
902+
for completion_attempt in range(1, _WORKFLOW_TASK_COMPLETION_MAX_ATTEMPTS + 1):
903+
try:
904+
await self.client.complete_workflow_task(
905+
task_id=task_id,
906+
lease_owner=self.worker_id,
907+
workflow_task_attempt=attempt,
908+
commands=commands,
909+
)
910+
return
911+
except Exception as error:
912+
if (
913+
completion_attempt >= _WORKFLOW_TASK_COMPLETION_MAX_ATTEMPTS
914+
or not _should_retry_workflow_task_completion_error(error)
915+
):
916+
raise
917+
918+
delay_index = min(
919+
completion_attempt - 1,
920+
len(_WORKFLOW_TASK_COMPLETION_RETRY_DELAYS) - 1,
921+
)
922+
log.warning(
923+
"retrying workflow task %s completion after attempt %d/%d failed: %s",
924+
task_id,
925+
completion_attempt,
926+
_WORKFLOW_TASK_COMPLETION_MAX_ATTEMPTS,
927+
error,
928+
)
929+
await asyncio.sleep(_WORKFLOW_TASK_COMPLETION_RETRY_DELAYS[delay_index])
930+
888931
async def _report_workflow_task_after_completion_error(
889932
self,
890933
task_id: str,

tests/test_worker.py

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -527,10 +527,10 @@ async def test_schedule_activity_on_first_replay(self, mock_client: AsyncMock) -
527527
async def test_workflow_task_ambiguous_completion_error_preserves_commands(
528528
self, mock_client: AsyncMock
529529
) -> None:
530-
mock_client.complete_workflow_task.side_effect = TimeoutError("completion timed out")
530+
mock_client.complete_workflow_task.side_effect = ServerError(409, {"reason": "task_not_leased"})
531531
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])
532532
task = {
533-
"task_id": "t-complete-timeout",
533+
"task_id": "t-complete-not-leased",
534534
"workflow_type": "test-wf",
535535
"workflow_task_attempt": 2,
536536
"history_events": [],
@@ -545,6 +545,31 @@ async def test_workflow_task_ambiguous_completion_error_preserves_commands(
545545
mock_client.complete_workflow_task.assert_awaited_once()
546546
mock_client.fail_workflow_task.assert_not_called()
547547

548+
@pytest.mark.asyncio
549+
async def test_workflow_task_retries_transient_completion_error(
550+
self, mock_client: AsyncMock
551+
) -> None:
552+
mock_client.complete_workflow_task.side_effect = [
553+
TimeoutError("completion timed out"),
554+
{"outcome": "completed"},
555+
]
556+
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])
557+
task = {
558+
"task_id": "t-complete-retry",
559+
"workflow_type": "test-wf",
560+
"workflow_task_attempt": 2,
561+
"history_events": [],
562+
"arguments": '["hello"]',
563+
"payload_codec": "json",
564+
}
565+
566+
result = await worker._run_workflow_task(task)
567+
568+
assert result is not None
569+
assert result[0]["type"] == "schedule_activity"
570+
assert mock_client.complete_workflow_task.await_count == 2
571+
mock_client.fail_workflow_task.assert_not_called()
572+
548573
@pytest.mark.asyncio
549574
async def test_workflow_task_definite_completion_rejection_fails_task(
550575
self, mock_client: AsyncMock
@@ -807,10 +832,10 @@ async def test_update_backed_workflow_task_completes_update_command(
807832
async def test_update_task_ambiguous_completion_error_preserves_command(
808833
self, mock_client: AsyncMock
809834
) -> None:
810-
mock_client.complete_workflow_task.side_effect = TimeoutError("update completion timed out")
835+
mock_client.complete_workflow_task.side_effect = ServerError(409, {"reason": "task_not_leased"})
811836
worker = Worker(mock_client, task_queue="q1", workflows=[UpdateWorkflow], activities=[])
812837
task = {
813-
"task_id": "t-update-timeout",
838+
"task_id": "t-update-not-leased",
814839
"workflow_type": "update-wf",
815840
"workflow_task_attempt": 3,
816841
"workflow_update_id": "upd-worker-1",
@@ -837,6 +862,43 @@ async def test_update_task_ambiguous_completion_error_preserves_command(
837862
mock_client.complete_workflow_task.assert_awaited_once()
838863
mock_client.fail_workflow_task.assert_not_called()
839864

865+
@pytest.mark.asyncio
866+
async def test_update_task_retries_transient_completion_error(
867+
self, mock_client: AsyncMock
868+
) -> None:
869+
mock_client.complete_workflow_task.side_effect = [
870+
ServerError(503, {"reason": "server_busy"}),
871+
{"outcome": "completed"},
872+
]
873+
worker = Worker(mock_client, task_queue="q1", workflows=[UpdateWorkflow], activities=[])
874+
task = {
875+
"task_id": "t-update-retry",
876+
"workflow_type": "update-wf",
877+
"workflow_task_attempt": 3,
878+
"workflow_update_id": "upd-worker-1",
879+
"workflow_wait_kind": "update",
880+
"history_events": [
881+
{
882+
"event_type": "UpdateAccepted",
883+
"payload": {
884+
"update_id": "upd-worker-1",
885+
"update_name": "increment",
886+
"arguments": serializer.encode([6], codec="json"),
887+
"payload_codec": "json",
888+
},
889+
},
890+
],
891+
"arguments": "[]",
892+
"payload_codec": "json",
893+
}
894+
895+
result = await worker._run_workflow_task(task)
896+
897+
assert result is not None
898+
assert result[0]["type"] == "complete_update"
899+
assert mock_client.complete_workflow_task.await_count == 2
900+
mock_client.fail_workflow_task.assert_not_called()
901+
840902
@pytest.mark.asyncio
841903
async def test_query_task_executes_registered_query(self, mock_client: AsyncMock) -> None:
842904
worker = Worker(mock_client, task_queue="q1", workflows=[QueryWorkflow], activities=[])

0 commit comments

Comments
 (0)