Skip to content

Commit 19de0fa

Browse files
Conformance finding: Python-authored polyglot workflow stays leased under current sample smoke (#119)
1 parent 0ec19d5 commit 19de0fa

4 files changed

Lines changed: 186 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
## [Unreleased]
88

99
### Fixed
10+
- Workflow workers now report unhandled workflow-task execution errors back to
11+
the server instead of leaving the leased task pending until the lease or CLI
12+
wait times out. This lets the server observe and retry or fail the task
13+
promptly when command serialization or interceptor code raises after a task
14+
has been claimed.
1015
- Workflow-task completion now retries transient transport failures and server
1116
throttling/5xx rejections before preserving emitted commands or reporting a
1217
definite task failure, reducing stuck waiting runs when a signal-satisfied

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "durable-workflow"
7-
version = "0.4.54"
7+
version = "0.4.55"
88
description = "Python SDK for the Durable Workflow server (language-neutral HTTP protocol)"
99
readme = "README.md"
1010
requires-python = ">=3.10"

src/durable_workflow/worker.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1442,13 +1442,49 @@ async def _dispatch_workflow_task(self, task: dict[str, Any]) -> None:
14421442
try:
14431443
commands = await self._run_workflow_task(task)
14441444
outcome = "completed" if commands is not None else "failed"
1445-
except Exception:
1445+
except Exception as exc:
14461446
log.exception("unhandled error in workflow task execution")
1447+
await self._report_unhandled_workflow_task_error(task, exc)
1448+
outcome = "failed"
14471449
finally:
14481450
self._workflow_inflight = max(0, self._workflow_inflight - 1)
14491451
self._record_task_metrics("workflow", outcome, time.perf_counter() - task_start)
14501452
self._wf_semaphore.release()
14511453

1454+
async def _report_unhandled_workflow_task_error(
1455+
self,
1456+
task: dict[str, Any],
1457+
error: Exception,
1458+
) -> None:
1459+
task_id = _string_or_none(task.get("task_id"))
1460+
if task_id is None:
1461+
return
1462+
1463+
attempt = task.get("workflow_task_attempt", 1)
1464+
if not isinstance(attempt, int) or attempt < 1:
1465+
attempt = 1
1466+
1467+
try:
1468+
await self.client.fail_workflow_task(
1469+
task_id=task_id,
1470+
lease_owner=self.worker_id,
1471+
workflow_task_attempt=attempt,
1472+
message=(
1473+
"unhandled workflow task execution error: "
1474+
f"{str(error) or type(error).__name__}"
1475+
),
1476+
failure_type=type(error).__name__,
1477+
stack_trace="".join(
1478+
traceback.format_exception(type(error), error, error.__traceback__)
1479+
),
1480+
)
1481+
except Exception as report_error:
1482+
log.warning(
1483+
"failed to report unhandled workflow task error for %s: %s",
1484+
task_id,
1485+
report_error,
1486+
)
1487+
14521488
async def _poll_activity_tasks(self) -> None:
14531489
while not self._stop.is_set():
14541490
await self._act_semaphore.acquire()

tests/test_worker.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,31 @@ def run(self, ctx): # type: ignore[no-untyped-def]
5555
]
5656

5757

58+
@workflow.defn(name="two-cross-queue-wf")
59+
class TwoCrossQueueWorkflow:
60+
def run(self, ctx, request): # type: ignore[no-untyped-def]
61+
marker = yield ctx.schedule_activity(
62+
"external.marker",
63+
[request],
64+
queue="external-queue",
65+
)
66+
description = yield ctx.schedule_activity(
67+
"external.describe",
68+
[marker],
69+
queue="external-queue",
70+
)
71+
return {
72+
"marker": marker,
73+
"description": description,
74+
}
75+
76+
77+
@workflow.defn(name="unserializable-result-wf")
78+
class UnserializableResultWorkflow:
79+
def run(self, ctx): # type: ignore[no-untyped-def]
80+
return object()
81+
82+
5883
@workflow.defn(name="update-wf")
5984
class UpdateWorkflow:
6085
def __init__(self) -> None:
@@ -780,6 +805,124 @@ async def test_complete_on_resolved_activity(self, mock_client: AsyncMock) -> No
780805
assert commands[0]["result"]["codec"] == "json"
781806
assert serializer.decode(commands[0]["result"]["blob"], codec="json") == "done"
782807

808+
@pytest.mark.asyncio
809+
async def test_cross_queue_second_activity_uses_completed_first_result(
810+
self, mock_client: AsyncMock
811+
) -> None:
812+
worker = Worker(mock_client, task_queue="workflow-queue", workflows=[TwoCrossQueueWorkflow], activities=[])
813+
marker = {"runtime": "external", "name": "Grace", "message": "hello"}
814+
task = {
815+
"task_id": "t-cross-queue-second",
816+
"workflow_type": "two-cross-queue-wf",
817+
"workflow_task_attempt": 2,
818+
"history_events": [
819+
{
820+
"event_type": "ActivityCompleted",
821+
"payload": {
822+
"sequence": 1,
823+
"activity_type": "external.marker",
824+
"payload_codec": "json",
825+
"result": serializer.envelope(marker, codec="json"),
826+
},
827+
},
828+
],
829+
"arguments": serializer.encode([{"name": "Grace"}], codec="json"),
830+
"payload_codec": "json",
831+
}
832+
833+
await worker._run_workflow_task(task)
834+
835+
mock_client.complete_workflow_task.assert_called_once()
836+
commands = mock_client.complete_workflow_task.call_args.kwargs["commands"]
837+
assert commands == [
838+
{
839+
"type": "schedule_activity",
840+
"activity_type": "external.describe",
841+
"queue": "external-queue",
842+
"arguments": commands[0]["arguments"],
843+
}
844+
]
845+
assert commands[0]["arguments"]["codec"] == "json"
846+
assert serializer.decode(commands[0]["arguments"]["blob"], codec="json") == [marker]
847+
848+
@pytest.mark.asyncio
849+
async def test_cross_queue_workflow_completes_after_second_activity(
850+
self, mock_client: AsyncMock
851+
) -> None:
852+
worker = Worker(mock_client, task_queue="workflow-queue", workflows=[TwoCrossQueueWorkflow], activities=[])
853+
marker = {"runtime": "external", "name": "Grace", "message": "hello"}
854+
description = {"runtime": "external", "description": "Grace handled by external activity"}
855+
task = {
856+
"task_id": "t-cross-queue-complete",
857+
"workflow_type": "two-cross-queue-wf",
858+
"workflow_task_attempt": 3,
859+
"history_events": [
860+
{
861+
"event_type": "ActivityCompleted",
862+
"payload": {
863+
"sequence": 1,
864+
"activity_type": "external.marker",
865+
"payload_codec": "json",
866+
"result": serializer.envelope(marker, codec="json"),
867+
},
868+
},
869+
{
870+
"event_type": "ActivityCompleted",
871+
"payload": {
872+
"sequence": 2,
873+
"activity_type": "external.describe",
874+
"payload_codec": "json",
875+
"result": serializer.envelope(description, codec="json"),
876+
},
877+
},
878+
],
879+
"arguments": serializer.encode([{"name": "Grace"}], codec="json"),
880+
"payload_codec": "json",
881+
}
882+
883+
await worker._run_workflow_task(task)
884+
885+
mock_client.complete_workflow_task.assert_called_once()
886+
commands = mock_client.complete_workflow_task.call_args.kwargs["commands"]
887+
assert commands[0]["type"] == "complete_workflow"
888+
assert commands[0]["result"]["codec"] == "json"
889+
assert serializer.decode(commands[0]["result"]["blob"], codec="json") == {
890+
"marker": marker,
891+
"description": description,
892+
}
893+
894+
@pytest.mark.asyncio
895+
async def test_dispatch_reports_unhandled_workflow_task_error(
896+
self, mock_client: AsyncMock
897+
) -> None:
898+
worker = Worker(
899+
mock_client,
900+
task_queue="q1",
901+
workflows=[UnserializableResultWorkflow],
902+
activities=[],
903+
worker_id="w-unserializable",
904+
)
905+
task = {
906+
"task_id": "t-unserializable",
907+
"workflow_type": "unserializable-result-wf",
908+
"workflow_task_attempt": 4,
909+
"history_events": [],
910+
"arguments": "[]",
911+
"payload_codec": "json",
912+
}
913+
914+
await worker._dispatch_workflow_task(task)
915+
916+
mock_client.complete_workflow_task.assert_not_called()
917+
mock_client.fail_workflow_task.assert_awaited_once()
918+
call_kwargs = mock_client.fail_workflow_task.await_args.kwargs
919+
assert call_kwargs["task_id"] == "t-unserializable"
920+
assert call_kwargs["workflow_task_attempt"] == 4
921+
assert call_kwargs["lease_owner"] == worker.worker_id
922+
assert call_kwargs["failure_type"] == "TypeError"
923+
assert "unhandled workflow task execution error" in call_kwargs["message"]
924+
assert "Object of type object" in call_kwargs["message"]
925+
783926
@pytest.mark.asyncio
784927
async def test_unknown_workflow_type_fails_task(self, mock_client: AsyncMock) -> None:
785928
worker = Worker(mock_client, task_queue="q1", workflows=[], activities=[])

0 commit comments

Comments
 (0)