Skip to content

Commit 6f81474

Browse files
[cross-repo from server#289] Conformance finding: replay state query times out on current artifacts (#104)
1 parent f8ae02b commit 6f81474

4 files changed

Lines changed: 136 additions & 9 deletions

File tree

src/durable_workflow/errors.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,11 @@ def __init__(self, schedule_id: str) -> None:
122122
class QueryFailed(DurableWorkflowError):
123123
"""A workflow query was rejected or the workflow raised while handling it."""
124124

125+
def __init__(self, message: str, *, reason: str | None = None, body: object | None = None) -> None:
126+
super().__init__(message)
127+
self.reason = reason
128+
self.body = body
129+
125130

126131
class WorkflowPayloadDecodeError(DurableWorkflowError):
127132
"""A committed workflow history payload could not be decoded during replay."""
@@ -281,9 +286,12 @@ def _raise_for_status(status: int, body: object, *, context: str = "") -> None:
281286
if status == 401:
282287
raise Unauthorized(message or "unauthorized")
283288

289+
def query_failed(default: str) -> QueryFailed:
290+
return QueryFailed(message or default, reason=reason if isinstance(reason, str) else None, body=body)
291+
284292
if status == 404:
285293
if reason in ("query_not_found", "rejected_unknown_query"):
286-
raise QueryFailed(message or "query not found")
294+
raise query_failed("query not found")
287295
if reason == "schedule_not_found":
288296
raise ScheduleNotFound(context)
289297
if reason in ("instance_not_found", "workflow_not_found") or "workflow" in context.lower():
@@ -297,15 +305,24 @@ def _raise_for_status(status: int, body: object, *, context: str = "") -> None:
297305
raise ScheduleAlreadyExists(context)
298306
if reason == "duplicate_not_allowed":
299307
raise WorkflowAlreadyStarted(context)
300-
if reason in ("query_rejected", "query_worker_unavailable"):
301-
raise QueryFailed(message or "query rejected")
308+
if reason in (
309+
"query_rejected",
310+
"query_worker_unavailable",
311+
"query_worker_incompatible",
312+
"query_workflow_state_unavailable",
313+
):
314+
raise query_failed("query rejected")
302315
if reason == "update_rejected":
303316
raise UpdateRejected(message or "update rejected")
304317
raise ServerError(status, body)
305318

306319
if status == 504:
307-
if reason == "query_worker_timeout":
308-
raise QueryFailed(message or "query worker timed out")
320+
if reason in (
321+
"query_worker_timeout",
322+
"query_worker_execution_timeout",
323+
"query_task_not_claimed",
324+
):
325+
raise query_failed("query worker timed out")
309326
raise ServerError(status, body)
310327

311328
if status == 422:

src/durable_workflow/worker.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1245,11 +1245,17 @@ async def _run_query_task_core(self, task: dict[str, Any], *, client: Client | N
12451245
)
12461246
return "failed"
12471247
except QueryFailed as e:
1248-
reason = "rejected_unknown_query" if "unknown query" in str(e) else "query_rejected"
1248+
message = str(e)
1249+
if "unknown query" in message:
1250+
reason = "rejected_unknown_query"
1251+
elif message.startswith("workflow replay failed before query:"):
1252+
reason = "query_workflow_state_unavailable"
1253+
else:
1254+
reason = "query_rejected"
12491255
await self._fail_query_task(
12501256
query_task_id,
12511257
attempt,
1252-
str(e),
1258+
message,
12531259
reason=reason,
12541260
failure_type=type(e).__name__,
12551261
stack_trace=traceback.format_exc(),

tests/test_client.py

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -842,19 +842,91 @@ async def test_worker_routed_query_unavailable_raises_query_failed(self, client:
842842
resp = _mock_response(409, {"reason": "query_worker_unavailable", "message": "no worker"})
843843
with (
844844
patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp),
845-
pytest.raises(QueryFailed),
845+
pytest.raises(QueryFailed) as excinfo,
846+
):
847+
await client.query_workflow("wf-1", "status")
848+
849+
assert excinfo.value.reason == "query_worker_unavailable"
850+
851+
@pytest.mark.asyncio
852+
async def test_worker_routed_query_incompatible_raises_query_failed(self, client: Client) -> None:
853+
resp = _mock_response(
854+
409,
855+
{
856+
"reason": "query_worker_incompatible",
857+
"message": "no compatible worker supports this workflow type",
858+
},
859+
)
860+
with (
861+
patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp),
862+
pytest.raises(QueryFailed) as excinfo,
846863
):
847864
await client.query_workflow("wf-1", "status")
848865

866+
assert excinfo.value.reason == "query_worker_incompatible"
867+
868+
@pytest.mark.asyncio
869+
async def test_worker_routed_query_state_unavailable_raises_query_failed(self, client: Client) -> None:
870+
resp = _mock_response(
871+
409,
872+
{
873+
"reason": "query_workflow_state_unavailable",
874+
"message": "workflow state is not queryable yet",
875+
},
876+
)
877+
with (
878+
patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp),
879+
pytest.raises(QueryFailed) as excinfo,
880+
):
881+
await client.query_workflow("wf-1", "status")
882+
883+
assert excinfo.value.reason == "query_workflow_state_unavailable"
884+
849885
@pytest.mark.asyncio
850886
async def test_worker_routed_query_timeout_raises_query_failed(self, client: Client) -> None:
851887
resp = _mock_response(504, {"reason": "query_worker_timeout", "message": "timed out"})
852888
with (
853889
patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp),
854-
pytest.raises(QueryFailed),
890+
pytest.raises(QueryFailed) as excinfo,
891+
):
892+
await client.query_workflow("wf-1", "status")
893+
894+
assert excinfo.value.reason == "query_worker_timeout"
895+
896+
@pytest.mark.asyncio
897+
async def test_worker_routed_query_not_claimed_timeout_raises_query_failed(self, client: Client) -> None:
898+
resp = _mock_response(
899+
504,
900+
{
901+
"reason": "query_task_not_claimed",
902+
"message": "timed out waiting for a compatible worker to claim the query",
903+
},
904+
)
905+
with (
906+
patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp),
907+
pytest.raises(QueryFailed) as excinfo,
855908
):
856909
await client.query_workflow("wf-1", "status")
857910

911+
assert excinfo.value.reason == "query_task_not_claimed"
912+
913+
@pytest.mark.asyncio
914+
async def test_worker_routed_query_execution_timeout_raises_query_failed(self, client: Client) -> None:
915+
resp = _mock_response(
916+
504,
917+
{
918+
"reason": "query_worker_execution_timeout",
919+
"message": "worker leased the query but did not complete it",
920+
},
921+
)
922+
with (
923+
patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp),
924+
pytest.raises(QueryFailed) as excinfo,
925+
):
926+
await client.query_workflow("wf-1", "status")
927+
928+
assert excinfo.value.reason == "query_worker_execution_timeout"
929+
858930

859931
class TestListWorkflows:
860932
@pytest.mark.asyncio

tests/test_worker.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,16 @@ def run(self, ctx): # type: ignore[no-untyped-def]
7979
return self.status
8080

8181

82+
@workflow.defn(name="query-state-unavailable-wf")
83+
class QueryStateUnavailableWorkflow:
84+
@workflow.query("status")
85+
def status_query(self) -> dict[str, str]:
86+
return {"status": "ready"}
87+
88+
def run(self, ctx): # type: ignore[no-untyped-def]
89+
raise RuntimeError("state not ready")
90+
91+
8292
@workflow.defn(name="counter-query-wf")
8393
class CounterQueryWorkflow:
8494
def __init__(self) -> None:
@@ -1130,6 +1140,28 @@ async def test_query_task_reports_unknown_query(self, mock_client: AsyncMock) ->
11301140
assert call_kwargs["query_task_attempt"] == 2
11311141
assert call_kwargs["reason"] == "rejected_unknown_query"
11321142

1143+
@pytest.mark.asyncio
1144+
async def test_query_task_reports_state_unavailable_when_replay_fails(self, mock_client: AsyncMock) -> None:
1145+
worker = Worker(mock_client, task_queue="q1", workflows=[QueryStateUnavailableWorkflow], activities=[])
1146+
task = {
1147+
"query_task_id": "qt-state-unavailable",
1148+
"query_task_attempt": 1,
1149+
"workflow_type": "query-state-unavailable-wf",
1150+
"query_name": "status",
1151+
"history_events": [],
1152+
"workflow_arguments": serializer.envelope([], codec="json"),
1153+
"query_arguments": serializer.envelope([], codec="json"),
1154+
"payload_codec": "json",
1155+
}
1156+
1157+
outcome = await worker._run_query_task(task)
1158+
1159+
assert outcome == "failed"
1160+
mock_client.fail_query_task.assert_awaited_once()
1161+
call_kwargs = mock_client.fail_query_task.call_args.kwargs
1162+
assert call_kwargs["query_task_id"] == "qt-state-unavailable"
1163+
assert call_kwargs["reason"] == "query_workflow_state_unavailable"
1164+
11331165
@pytest.mark.asyncio
11341166
@pytest.mark.parametrize(
11351167
"reason",

0 commit comments

Comments
 (0)