Skip to content

Commit 8c010b9

Browse files
[cross-repo from server#267] Conformance: replay approve signal does not complete after activity state query (#95)
1 parent a07bcd6 commit 8c010b9

3 files changed

Lines changed: 157 additions & 24 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
## [Unreleased]
88

99
### Fixed
10-
- Workflow-task completion transport failures now cause the worker to report
11-
the task as failed instead of treating the local replay as completed. This
12-
lets the server re-dispatch quickly instead of leaving a workflow task leased
13-
until timeout when the completion request never records.
10+
- Ambiguous workflow-task completion failures no longer get reported back as
11+
durable task failures after commands have been produced. Definite server
12+
rejections are still treated as failed workflow tasks even when the
13+
best-effort failure report cannot be sent, but transport/ownership ambiguity
14+
preserves the emitted commands so replay-driven signal completion is not
15+
converted into a stuck failed task.
1416
- Repeated condition-wait openings for the same logical wait now replay through
1517
every matching signal before deciding whether the wait is still pending, so
1618
long-running signal/query workflows do not get stuck on the first signal.

src/durable_workflow/worker.py

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,15 @@
4545
Client,
4646
WorkflowExecution,
4747
)
48-
from .errors import ActivityCancelled, AvroNotInstalledError, NonRetryableError, QueryFailed, ServerError
48+
from .errors import (
49+
ActivityCancelled,
50+
AvroNotInstalledError,
51+
DurableWorkflowError,
52+
InvalidArgument,
53+
NonRetryableError,
54+
QueryFailed,
55+
ServerError,
56+
)
4957
from .external_storage import ExternalPayloadCache, ExternalStorageDriver
5058
from .interceptors import (
5159
ActivityInterceptorContext,
@@ -74,6 +82,15 @@
7482
"query_task_not_leased",
7583
"query_task_timed_out",
7684
}
85+
_WORKFLOW_TASK_COMPLETION_AMBIGUOUS_REJECTION_REASONS = {
86+
"lease_expired",
87+
"lease_owner_mismatch",
88+
"run_already_closed",
89+
"run_closed",
90+
"task_not_found",
91+
"task_not_leased",
92+
"workflow_task_attempt_mismatch",
93+
}
7794
_WORKER_WORKFLOW_FINGERPRINTS: dict[tuple[str, str], str] = {}
7895

7996

@@ -111,6 +128,19 @@ def _is_final_query_task_rejection(error: BaseException) -> bool:
111128
)
112129

113130

131+
def _should_fail_workflow_task_after_completion_error(error: BaseException) -> bool:
132+
if isinstance(error, InvalidArgument):
133+
return True
134+
135+
if isinstance(error, ServerError):
136+
if error.status >= 500 or error.status == 429:
137+
return False
138+
139+
return error.reason() not in _WORKFLOW_TASK_COMPLETION_AMBIGUOUS_REJECTION_REASONS
140+
141+
return isinstance(error, DurableWorkflowError)
142+
143+
114144
def _signal_arguments_envelope_from_export(
115145
signal: Mapping[str, Any],
116146
*,
@@ -775,8 +805,9 @@ async def _run_workflow_task_core(self, task: dict[str, Any]) -> list[dict[str,
775805
)
776806
except Exception as e:
777807
log.warning("failed to complete workflow update task %s: %s", task_id, e)
778-
await self._fail_workflow_task_after_completion_error(task_id, attempt, e)
779-
return None
808+
if _should_fail_workflow_task_after_completion_error(e):
809+
await self._report_workflow_task_after_completion_error(task_id, attempt, e)
810+
return None
780811
return [command]
781812

782813
try:
@@ -849,11 +880,12 @@ async def _run_workflow_task_core(self, task: dict[str, Any]) -> list[dict[str,
849880
)
850881
except Exception as e:
851882
log.warning("failed to complete workflow task %s: %s", task_id, e)
852-
await self._fail_workflow_task_after_completion_error(task_id, attempt, e)
853-
return None
883+
if _should_fail_workflow_task_after_completion_error(e):
884+
await self._report_workflow_task_after_completion_error(task_id, attempt, e)
885+
return None
854886
return commands
855887

856-
async def _fail_workflow_task_after_completion_error(
888+
async def _report_workflow_task_after_completion_error(
857889
self,
858890
task_id: str,
859891
attempt: int,

tests/test_worker.py

Lines changed: 113 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
Client,
2323
WorkflowExecution,
2424
)
25-
from durable_workflow.errors import ServerError
25+
from durable_workflow.errors import InvalidArgument, ServerError, Unauthorized, WorkflowNotFound
2626
from durable_workflow.interceptors import (
2727
ActivityHandler,
2828
ActivityInterceptorContext,
@@ -32,7 +32,7 @@
3232
WorkflowTaskHandler,
3333
WorkflowTaskInterceptorContext,
3434
)
35-
from durable_workflow.worker import Worker
35+
from durable_workflow.worker import Worker, _should_fail_workflow_task_after_completion_error
3636

3737

3838
@workflow.defn(name="test-wf")
@@ -185,6 +185,28 @@ def compatible_cluster_info(**overrides: object) -> dict[str, object]:
185185
return info
186186

187187

188+
class TestWorkflowTaskCompletionErrorClassification:
189+
@pytest.mark.parametrize(
190+
("error", "should_fail"),
191+
[
192+
(TimeoutError("completion timed out"), False),
193+
(RuntimeError("connection reset"), False),
194+
(ServerError(409, {"reason": "lease_expired"}), False),
195+
(ServerError(409, {"reason": "workflow_task_attempt_mismatch"}), False),
196+
(ServerError(429, {"reason": "rate_limited"}), False),
197+
(ServerError(503, {"reason": "server_busy"}), False),
198+
(ServerError(409, {"reason": "invalid_commands"}), True),
199+
(InvalidArgument("invalid command payload"), True),
200+
(Unauthorized("missing bearer token"), True),
201+
(WorkflowNotFound("wf-missing"), True),
202+
],
203+
)
204+
def test_classifies_definite_and_ambiguous_completion_errors(
205+
self, error: BaseException, should_fail: bool
206+
) -> None:
207+
assert _should_fail_workflow_task_after_completion_error(error) is should_fail
208+
209+
188210
class TestWorkerRegistration:
189211
@pytest.mark.asyncio
190212
async def test_register(self, mock_client: AsyncMock) -> None:
@@ -502,7 +524,7 @@ async def test_schedule_activity_on_first_replay(self, mock_client: AsyncMock) -
502524
assert serializer.decode(commands[0]["arguments"]["blob"], codec="json") == ["hello"]
503525

504526
@pytest.mark.asyncio
505-
async def test_workflow_task_completion_error_fails_task_for_fast_redispatch(
527+
async def test_workflow_task_ambiguous_completion_error_preserves_commands(
506528
self, mock_client: AsyncMock
507529
) -> None:
508530
mock_client.complete_workflow_task.side_effect = TimeoutError("completion timed out")
@@ -518,15 +540,96 @@ async def test_workflow_task_completion_error_fails_task_for_fast_redispatch(
518540

519541
result = await worker._run_workflow_task(task)
520542

543+
assert result is not None
544+
assert result[0]["type"] == "schedule_activity"
545+
mock_client.complete_workflow_task.assert_awaited_once()
546+
mock_client.fail_workflow_task.assert_not_called()
547+
548+
@pytest.mark.asyncio
549+
async def test_workflow_task_definite_completion_rejection_fails_task(
550+
self, mock_client: AsyncMock
551+
) -> None:
552+
mock_client.complete_workflow_task.side_effect = ServerError(409, {"reason": "invalid_commands"})
553+
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])
554+
task = {
555+
"task_id": "t-complete-invalid",
556+
"workflow_type": "test-wf",
557+
"workflow_task_attempt": 2,
558+
"history_events": [],
559+
"arguments": '["hello"]',
560+
"payload_codec": "json",
561+
}
562+
563+
result = await worker._run_workflow_task(task)
564+
565+
assert result is None
566+
mock_client.fail_workflow_task.assert_awaited_once()
567+
call_kwargs = mock_client.fail_workflow_task.await_args.kwargs
568+
assert call_kwargs["task_id"] == "t-complete-invalid"
569+
assert call_kwargs["workflow_task_attempt"] == 2
570+
assert call_kwargs["lease_owner"] == worker.worker_id
571+
assert call_kwargs["failure_type"] == "ServerError"
572+
assert "invalid_commands" in call_kwargs["message"]
573+
574+
@pytest.mark.parametrize(
575+
("completion_error", "failure_type", "message_fragment"),
576+
[
577+
(Unauthorized("missing bearer token"), "Unauthorized", "missing bearer token"),
578+
(WorkflowNotFound("wf-typed-missing"), "WorkflowNotFound", "wf-typed-missing"),
579+
],
580+
)
581+
@pytest.mark.asyncio
582+
async def test_workflow_task_typed_completion_rejection_fails_task(
583+
self,
584+
mock_client: AsyncMock,
585+
completion_error: Exception,
586+
failure_type: str,
587+
message_fragment: str,
588+
) -> None:
589+
mock_client.complete_workflow_task.side_effect = completion_error
590+
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])
591+
task = {
592+
"task_id": "t-complete-typed-rejection",
593+
"workflow_type": "test-wf",
594+
"workflow_task_attempt": 2,
595+
"history_events": [],
596+
"arguments": '["hello"]',
597+
"payload_codec": "json",
598+
}
599+
600+
result = await worker._run_workflow_task(task)
601+
521602
assert result is None
522603
mock_client.complete_workflow_task.assert_awaited_once()
523604
mock_client.fail_workflow_task.assert_awaited_once()
524605
call_kwargs = mock_client.fail_workflow_task.await_args.kwargs
525-
assert call_kwargs["task_id"] == "t-complete-timeout"
606+
assert call_kwargs["task_id"] == "t-complete-typed-rejection"
526607
assert call_kwargs["workflow_task_attempt"] == 2
527608
assert call_kwargs["lease_owner"] == worker.worker_id
528-
assert call_kwargs["failure_type"] == "TimeoutError"
529-
assert "completion timed out" in call_kwargs["message"]
609+
assert call_kwargs["failure_type"] == failure_type
610+
assert message_fragment in call_kwargs["message"]
611+
612+
@pytest.mark.asyncio
613+
async def test_workflow_task_definite_completion_rejection_stays_failed_when_report_fails(
614+
self, mock_client: AsyncMock
615+
) -> None:
616+
mock_client.complete_workflow_task.side_effect = ServerError(409, {"reason": "invalid_commands"})
617+
mock_client.fail_workflow_task.side_effect = RuntimeError("failure report unavailable")
618+
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])
619+
task = {
620+
"task_id": "t-complete-invalid-report-fails",
621+
"workflow_type": "test-wf",
622+
"workflow_task_attempt": 2,
623+
"history_events": [],
624+
"arguments": '["hello"]',
625+
"payload_codec": "json",
626+
}
627+
628+
result = await worker._run_workflow_task(task)
629+
630+
assert result is None
631+
mock_client.complete_workflow_task.assert_awaited_once()
632+
mock_client.fail_workflow_task.assert_awaited_once()
530633

531634
@pytest.mark.asyncio
532635
async def test_workflow_command_payload_warning_uses_client_policy(
@@ -701,7 +804,7 @@ async def test_update_backed_workflow_task_completes_update_command(
701804
mock_client.fail_workflow_task.assert_not_called()
702805

703806
@pytest.mark.asyncio
704-
async def test_update_task_completion_error_fails_task_for_fast_redispatch(
807+
async def test_update_task_ambiguous_completion_error_preserves_command(
705808
self, mock_client: AsyncMock
706809
) -> None:
707810
mock_client.complete_workflow_task.side_effect = TimeoutError("update completion timed out")
@@ -729,14 +832,10 @@ async def test_update_task_completion_error_fails_task_for_fast_redispatch(
729832

730833
result = await worker._run_workflow_task(task)
731834

732-
assert result is None
835+
assert result is not None
836+
assert result[0]["type"] == "complete_update"
733837
mock_client.complete_workflow_task.assert_awaited_once()
734-
mock_client.fail_workflow_task.assert_awaited_once()
735-
call_kwargs = mock_client.fail_workflow_task.await_args.kwargs
736-
assert call_kwargs["task_id"] == "t-update-timeout"
737-
assert call_kwargs["workflow_task_attempt"] == 3
738-
assert call_kwargs["failure_type"] == "TimeoutError"
739-
assert "update completion timed out" in call_kwargs["message"]
838+
mock_client.fail_workflow_task.assert_not_called()
740839

741840
@pytest.mark.asyncio
742841
async def test_query_task_executes_registered_query(self, mock_client: AsyncMock) -> None:

0 commit comments

Comments
 (0)