Skip to content

Commit bdf038b

Browse files
[cross-repo from server#389] Conformance blocker: complete replay parity coverage beyond smoke (#153)
1 parent 1facc77 commit bdf038b

3 files changed

Lines changed: 89 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
`NamespaceDescription.deleted`.
1313

1414
### Fixed
15+
- Worker query tasks now replay from the bundled history export when the
16+
inline task history is empty or truncated, so cold query replay reconstructs
17+
activity-derived state instead of answering from a fresh workflow instance.
1518
- Python parent workflows now decode successful child workflow completions from
1619
the server's documented `ChildRunCompleted.output` history payload, while
1720
still accepting the older `result` alias. This prevents completed child

src/durable_workflow/worker.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,29 @@ def _query_history_with_export_signal_arguments(
383383
return enriched if changed else history
384384

385385

386+
def _query_history_events(
387+
history: Any,
388+
history_export: Any,
389+
*,
390+
default_codec: str | None,
391+
) -> Any:
392+
if isinstance(history, list):
393+
events = history
394+
else:
395+
events = []
396+
397+
if isinstance(history_export, Mapping):
398+
export_events = history_export.get("history_events")
399+
if isinstance(export_events, list) and len(export_events) > len(events):
400+
events = export_events
401+
402+
return _query_history_with_export_signal_arguments(
403+
events,
404+
history_export,
405+
default_codec=default_codec,
406+
)
407+
408+
386409
def _callable_fingerprint_payload(value: object) -> str:
387410
if isinstance(value, staticmethod | classmethod):
388411
value = value.__func__
@@ -1289,7 +1312,7 @@ async def _run_query_task_core(self, task: dict[str, Any], *, client: Client | N
12891312
return "failed"
12901313

12911314
result_codec = _command_payload_codec(codec)
1292-
history = _query_history_with_export_signal_arguments(
1315+
history = _query_history_events(
12931316
task.get("history_events", []),
12941317
task.get("history_export"),
12951318
default_codec=codec,

tests/test_worker.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,20 @@ def run(self, ctx): # type: ignore[no-untyped-def]
108108
return self.status
109109

110110

111+
@workflow.defn(name="activity-query-wf")
112+
class ActivityQueryWorkflow:
113+
def __init__(self) -> None:
114+
self.activity_result: str | None = None
115+
116+
@workflow.query("state")
117+
def state(self) -> dict[str, str | None]:
118+
return {"activity_result": self.activity_result}
119+
120+
def run(self, ctx): # type: ignore[no-untyped-def]
121+
self.activity_result = yield ctx.schedule_activity("load", [])
122+
return self.activity_result
123+
124+
111125
@workflow.defn(name="query-state-unavailable-wf")
112126
class QueryStateUnavailableWorkflow:
113127
@workflow.query("status")
@@ -1207,6 +1221,54 @@ async def test_query_task_replays_signal_arguments_from_history_export(
12071221
)
12081222
mock_client.fail_query_task.assert_not_called()
12091223

1224+
@pytest.mark.asyncio
1225+
async def test_query_task_uses_export_history_when_inline_history_is_empty(
1226+
self, mock_client: AsyncMock
1227+
) -> None:
1228+
worker = Worker(mock_client, task_queue="q1", workflows=[ActivityQueryWorkflow], activities=[])
1229+
activity_result = serializer.envelope("loaded", codec="json")
1230+
task = {
1231+
"query_task_id": "qt-export-history",
1232+
"query_task_attempt": 1,
1233+
"workflow_type": "activity-query-wf",
1234+
"workflow_id": "wf-export-history",
1235+
"run_id": "run-export-history",
1236+
"query_name": "state",
1237+
"history_events": [],
1238+
"history_export": {
1239+
"payloads": {"codec": "json"},
1240+
"history_events": [
1241+
{
1242+
"type": "ActivityCompleted",
1243+
"payload": {
1244+
"sequence": 1,
1245+
"activity_type": "load",
1246+
"payload_codec": "json",
1247+
"result": activity_result,
1248+
},
1249+
}
1250+
],
1251+
},
1252+
"workflow_arguments": serializer.envelope([], codec="json"),
1253+
"query_arguments": serializer.envelope([], codec="json"),
1254+
"payload_codec": "json",
1255+
}
1256+
1257+
outcome = await worker._run_query_task(task)
1258+
1259+
assert outcome == "completed"
1260+
mock_client.complete_query_task.assert_awaited_once_with(
1261+
query_task_id="qt-export-history",
1262+
lease_owner=worker.worker_id,
1263+
query_task_attempt=1,
1264+
result={"activity_result": "loaded"},
1265+
codec="json",
1266+
workflow_id="wf-export-history",
1267+
run_id="run-export-history",
1268+
query_name="state",
1269+
)
1270+
mock_client.fail_query_task.assert_not_called()
1271+
12101272
@pytest.mark.asyncio
12111273
async def test_query_task_replays_repeated_condition_wait_signal_arguments(
12121274
self, mock_client: AsyncMock

0 commit comments

Comments
 (0)