Skip to content

Commit ef5c6b4

Browse files
[cross-repo from server#244] Conformance: run signals/queries against current published artifacts (#77)
1 parent 0eebd9c commit ef5c6b4

2 files changed

Lines changed: 112 additions & 1 deletion

File tree

src/durable_workflow/worker.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -979,6 +979,8 @@ async def _run_query_task_core(self, task: dict[str, Any]) -> str:
979979
external_storage=self.external_storage,
980980
external_storage_cache=self.external_storage_cache,
981981
)
982+
if inspect.isawaitable(result):
983+
result = await result
982984
except AvroNotInstalledError as e:
983985
await self._fail_query_task(
984986
query_task_id,
@@ -1034,7 +1036,49 @@ async def _run_query_task_core(self, task: dict[str, Any]) -> str:
10341036
e.reason(),
10351037
)
10361038
return "expired"
1037-
raise
1039+
server_reason = e.reason()
1040+
await self._fail_query_task(
1041+
query_task_id,
1042+
attempt,
1043+
str(e) or "query result completion was rejected by the server",
1044+
reason=server_reason if server_reason else "query_result_completion_failed",
1045+
failure_type=type(e).__name__,
1046+
stack_trace=traceback.format_exc(),
1047+
)
1048+
return "failed"
1049+
except AvroNotInstalledError as e:
1050+
await self._fail_query_task(
1051+
query_task_id,
1052+
attempt,
1053+
(
1054+
"cannot encode query result with codec 'avro': "
1055+
f"{e}. Reinstall durable-workflow with its runtime dependencies."
1056+
),
1057+
reason="query_result_encode_failed",
1058+
failure_type=type(e).__name__,
1059+
stack_trace=traceback.format_exc(),
1060+
)
1061+
return "failed"
1062+
except (TypeError, ValueError) as e:
1063+
await self._fail_query_task(
1064+
query_task_id,
1065+
attempt,
1066+
f"cannot encode query result with codec {result_codec!r}: {e}",
1067+
reason="query_result_encode_failed",
1068+
failure_type=type(e).__name__,
1069+
stack_trace=traceback.format_exc(),
1070+
)
1071+
return "failed"
1072+
except Exception as e:
1073+
await self._fail_query_task(
1074+
query_task_id,
1075+
attempt,
1076+
str(e) or "query result completion failed",
1077+
reason="query_result_completion_failed",
1078+
failure_type=type(e).__name__,
1079+
stack_trace=traceback.format_exc(),
1080+
)
1081+
return "failed"
10381082

10391083
return "completed"
10401084

tests/test_worker.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,17 @@ def run(self, ctx): # type: ignore[no-untyped-def]
7878
return self.status
7979

8080

81+
@workflow.defn(name="async-query-wf")
82+
class AsyncQueryWorkflow:
83+
@workflow.query("current")
84+
async def current(self) -> int:
85+
await asyncio.sleep(0)
86+
return 0
87+
88+
def run(self, ctx): # type: ignore[no-untyped-def]
89+
yield ctx.wait_condition(lambda: False)
90+
91+
8192
@activity.defn(name="test-act")
8293
def echo_activity(val: str) -> str:
8394
return f"result-{val}"
@@ -673,6 +684,35 @@ async def test_query_task_executes_registered_query(self, mock_client: AsyncMock
673684
)
674685
mock_client.fail_query_task.assert_not_called()
675686

687+
@pytest.mark.asyncio
688+
async def test_query_task_awaits_async_query_result(self, mock_client: AsyncMock) -> None:
689+
worker = Worker(mock_client, task_queue="q1", workflows=[AsyncQueryWorkflow], activities=[])
690+
task = {
691+
"query_task_id": "qt-async",
692+
"query_task_attempt": 1,
693+
"workflow_type": "async-query-wf",
694+
"query_name": "current",
695+
"history_events": [],
696+
"workflow_arguments": serializer.envelope([], codec="json"),
697+
"query_arguments": serializer.envelope([], codec="json"),
698+
"payload_codec": "json",
699+
}
700+
701+
outcome = await worker._run_query_task(task)
702+
703+
assert outcome == "completed"
704+
mock_client.complete_query_task.assert_awaited_once_with(
705+
query_task_id="qt-async",
706+
lease_owner=worker.worker_id,
707+
query_task_attempt=1,
708+
result=0,
709+
codec="json",
710+
workflow_id=None,
711+
run_id=None,
712+
query_name="current",
713+
)
714+
mock_client.fail_query_task.assert_not_called()
715+
676716
@pytest.mark.asyncio
677717
async def test_query_task_reports_unknown_query(self, mock_client: AsyncMock) -> None:
678718
worker = Worker(mock_client, task_queue="q1", workflows=[QueryWorkflow], activities=[])
@@ -723,6 +763,33 @@ async def test_query_task_completion_rejection_after_server_timeout_is_handled(
723763
mock_client.complete_query_task.assert_awaited_once()
724764
mock_client.fail_query_task.assert_not_called()
725765

766+
@pytest.mark.asyncio
767+
async def test_query_task_reports_query_result_completion_failure(
768+
self, mock_client: AsyncMock
769+
) -> None:
770+
worker = Worker(mock_client, task_queue="q1", workflows=[QueryWorkflow], activities=[])
771+
mock_client.complete_query_task.side_effect = TypeError("Object is not payload safe")
772+
task = {
773+
"query_task_id": "qt-result-encode-failure",
774+
"query_task_attempt": 1,
775+
"workflow_type": "query-wf",
776+
"query_name": "status",
777+
"history_events": [],
778+
"workflow_arguments": serializer.envelope([], codec="json"),
779+
"query_arguments": serializer.envelope([], codec="json"),
780+
"payload_codec": "json",
781+
}
782+
783+
outcome = await worker._run_query_task(task)
784+
785+
assert outcome == "failed"
786+
mock_client.fail_query_task.assert_awaited_once()
787+
call_kwargs = mock_client.fail_query_task.call_args.kwargs
788+
assert call_kwargs["query_task_id"] == "qt-result-encode-failure"
789+
assert call_kwargs["query_task_attempt"] == 1
790+
assert call_kwargs["reason"] == "query_result_encode_failed"
791+
assert call_kwargs["failure_type"] == "TypeError"
792+
726793

727794
class TestActivityTaskExecution:
728795
@pytest.mark.asyncio

0 commit comments

Comments
 (0)