Skip to content

Commit 6cf8eb0

Browse files
[cross-repo from server#389] Conformance blocker: complete replay parity coverage beyond smoke (#152)
1 parent cedf283 commit 6cf8eb0

2 files changed

Lines changed: 261 additions & 8 deletions

File tree

src/durable_workflow/worker.py

Lines changed: 102 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -179,20 +179,75 @@ def _signal_arguments_envelope_from_export(
179179
return envelope
180180

181181

182+
def _history_events_from_query_export(history: Any, history_export: Any) -> Any:
183+
if isinstance(history, list) and history:
184+
return history
185+
if not isinstance(history_export, Mapping):
186+
return history
187+
188+
raw_events = history_export.get("history_events")
189+
if not isinstance(raw_events, list):
190+
return history
191+
192+
events: list[Any] = []
193+
changed = False
194+
for raw_event in raw_events:
195+
if not isinstance(raw_event, Mapping):
196+
events.append(raw_event)
197+
continue
198+
199+
event = dict(raw_event)
200+
if "event_type" not in event:
201+
export_type = event.get("type")
202+
if isinstance(export_type, str) and export_type:
203+
event["event_type"] = export_type
204+
changed = True
205+
events.append(event)
206+
207+
if events:
208+
return events
209+
return history if isinstance(history, list) or not changed else events
210+
211+
212+
def _activity_result_by_sequence_from_export(
213+
history_export: Mapping[str, Any],
214+
) -> dict[int, Mapping[str, Any]]:
215+
raw_activities = history_export.get("activities")
216+
if not isinstance(raw_activities, list):
217+
return {}
218+
219+
results: dict[int, Mapping[str, Any]] = {}
220+
for raw_activity in raw_activities:
221+
if not isinstance(raw_activity, Mapping):
222+
continue
223+
sequence = raw_activity.get("sequence")
224+
if isinstance(sequence, str) and sequence.isdigit():
225+
sequence = int(sequence)
226+
if not isinstance(sequence, int):
227+
continue
228+
if raw_activity.get("result") is None:
229+
continue
230+
results.setdefault(sequence, raw_activity)
231+
232+
return results
233+
234+
182235
def _query_history_with_export_signal_arguments(
183236
history: Any,
184237
history_export: Any,
185238
*,
186239
default_codec: str | None,
187240
) -> Any:
188-
# Query-task history can carry compact SignalReceived rows; the full
189-
# signal payload bytes are still present in the accompanying export.
241+
# Query-task history may be omitted or compact when the server routes a
242+
# completed/in-flight query to a fresh worker. The attached export remains
243+
# the durable source for replay payloads needed to rebuild workflow state.
244+
history = _history_events_from_query_export(history, history_export)
190245
if not isinstance(history, list) or not isinstance(history_export, Mapping):
191246
return history
192247

193248
raw_signals = history_export.get("signals")
194249
if not isinstance(raw_signals, list):
195-
return history
250+
raw_signals = []
196251

197252
export_payloads = history_export.get("payloads")
198253
export_codec = (
@@ -224,7 +279,11 @@ def _query_history_with_export_signal_arguments(
224279
signals_by_name.setdefault(name, []).append(raw_signal)
225280

226281
if not signals_by_id and not signals_by_command_id and not signals_by_name:
227-
return history
282+
signals_available = False
283+
else:
284+
signals_available = True
285+
286+
activity_results_by_sequence = _activity_result_by_sequence_from_export(history_export)
228287

229288
name_offsets: dict[str, int] = {}
230289
enriched: list[Any] = []
@@ -234,12 +293,47 @@ def _query_history_with_export_signal_arguments(
234293
enriched.append(raw_event)
235294
continue
236295
event_type = raw_event.get("event_type") or raw_event.get("type")
296+
base_event = dict(raw_event)
297+
if "event_type" not in base_event and isinstance(event_type, str) and event_type:
298+
base_event["event_type"] = event_type
299+
changed = True
300+
raw_event = base_event
301+
raw_payload = raw_event.get("payload")
302+
if not isinstance(raw_payload, Mapping):
303+
enriched.append(raw_event)
304+
continue
305+
306+
if event_type == "ActivityCompleted":
307+
payload = dict(raw_payload)
308+
sequence = payload.get("sequence") or payload.get("workflow_sequence")
309+
if isinstance(sequence, str) and sequence.isdigit():
310+
sequence = int(sequence)
311+
activity = activity_results_by_sequence.get(sequence) if isinstance(sequence, int) else None
312+
if activity is not None:
313+
payload_changed = False
314+
if "result" not in payload and activity.get("result") is not None:
315+
payload["result"] = activity["result"]
316+
payload_changed = True
317+
if "payload_codec" not in payload and isinstance(activity.get("payload_codec"), str):
318+
payload["payload_codec"] = activity["payload_codec"]
319+
payload_changed = True
320+
if "activity_type" not in payload and isinstance(activity.get("activity_type"), str):
321+
payload["activity_type"] = activity["activity_type"]
322+
payload_changed = True
323+
if payload_changed:
324+
event = dict(raw_event)
325+
event["payload"] = payload
326+
enriched.append(event)
327+
changed = True
328+
continue
329+
enriched.append(raw_event)
330+
continue
331+
237332
if event_type != "SignalReceived":
238333
enriched.append(raw_event)
239334
continue
240-
raw_payload = raw_event.get("payload")
241-
if not isinstance(raw_payload, Mapping):
242-
enriched.append(dict(raw_event))
335+
if not signals_available:
336+
enriched.append(raw_event)
243337
continue
244338
signal: Mapping[str, Any] | None = None
245339
signal_id = raw_payload.get("signal_id")
@@ -278,7 +372,7 @@ def _query_history_with_export_signal_arguments(
278372
payload_changed = True
279373

280374
if not payload_changed:
281-
enriched.append(dict(raw_event))
375+
enriched.append(raw_event)
282376
continue
283377

284378
event = dict(raw_event)

tests/test_worker.py

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,32 @@ def run(self, ctx): # type: ignore[no-untyped-def]
135135
yield ctx.wait_condition(lambda: False, key="done")
136136

137137

138+
@workflow.defn(name="replay-query-snapshot-wf")
139+
class ReplayQuerySnapshotWorkflow:
140+
def __init__(self) -> None:
141+
self.activity_result: str | None = None
142+
self.approved_by: str | None = None
143+
self.finished = False
144+
145+
@workflow.signal("approve")
146+
def approve(self, approved_by: str) -> None:
147+
self.approved_by = approved_by
148+
149+
@workflow.query("state")
150+
def state(self) -> dict[str, object]:
151+
return {
152+
"activity_result": self.activity_result,
153+
"approved_by": self.approved_by,
154+
"finished": self.finished,
155+
}
156+
157+
def run(self, ctx): # type: ignore[no-untyped-def]
158+
self.activity_result = yield ctx.schedule_activity("load-state", [])
159+
yield ctx.wait_condition(lambda: self.approved_by is not None, key="approval")
160+
self.finished = True
161+
yield ctx.schedule_activity("after-signal", [self.approved_by])
162+
163+
138164
@workflow.defn(name="async-query-wf")
139165
class AsyncQueryWorkflow:
140166
@workflow.query("current")
@@ -1268,6 +1294,139 @@ async def test_query_task_replays_repeated_condition_wait_signal_arguments(
12681294
)
12691295
mock_client.fail_query_task.assert_not_called()
12701296

1297+
@pytest.mark.asyncio
1298+
async def test_query_task_replays_history_from_export_after_worker_restart(
1299+
self, mock_client: AsyncMock
1300+
) -> None:
1301+
worker = Worker(mock_client, task_queue="q1", workflows=[ReplayQuerySnapshotWorkflow], activities=[])
1302+
approval_arguments = serializer.encode(["alice"], codec="json")
1303+
task = {
1304+
"query_task_id": "qt-export-history",
1305+
"query_task_attempt": 1,
1306+
"workflow_type": "replay-query-snapshot-wf",
1307+
"workflow_id": "wf-replay-query",
1308+
"run_id": "run-replay-query",
1309+
"query_name": "state",
1310+
"history_events": [],
1311+
"history_export": {
1312+
"payloads": {"codec": "json"},
1313+
"history_events": [
1314+
{
1315+
"type": "ActivityCompleted",
1316+
"payload": {
1317+
"sequence": 1,
1318+
"activity_type": "load-state",
1319+
"payload_codec": "json",
1320+
"result": serializer.encode("loaded", codec="json"),
1321+
},
1322+
},
1323+
{
1324+
"type": "ConditionWaitOpened",
1325+
"payload": {
1326+
"sequence": 2,
1327+
"condition_wait_id": "wait-approval",
1328+
"condition_key": "approval",
1329+
},
1330+
},
1331+
{
1332+
"type": "SignalReceived",
1333+
"payload": {
1334+
"signal_id": "sig-approve",
1335+
"workflow_command_id": "cmd-approve",
1336+
"signal_name": "approve",
1337+
},
1338+
},
1339+
],
1340+
"signals": [
1341+
{
1342+
"id": "sig-approve",
1343+
"command_id": "cmd-approve",
1344+
"name": "approve",
1345+
"payload_codec": "json",
1346+
"arguments": approval_arguments,
1347+
},
1348+
],
1349+
},
1350+
"workflow_arguments": serializer.envelope([], codec="json"),
1351+
"query_arguments": serializer.envelope([], codec="json"),
1352+
"payload_codec": "json",
1353+
}
1354+
1355+
outcome = await worker._run_query_task(task)
1356+
1357+
assert outcome == "completed"
1358+
mock_client.complete_query_task.assert_awaited_once_with(
1359+
query_task_id="qt-export-history",
1360+
lease_owner=worker.worker_id,
1361+
query_task_attempt=1,
1362+
result={
1363+
"activity_result": "loaded",
1364+
"approved_by": "alice",
1365+
"finished": True,
1366+
},
1367+
codec="json",
1368+
workflow_id="wf-replay-query",
1369+
run_id="run-replay-query",
1370+
query_name="state",
1371+
)
1372+
mock_client.fail_query_task.assert_not_called()
1373+
1374+
@pytest.mark.asyncio
1375+
async def test_query_task_enriches_compact_activity_completion_from_export(
1376+
self, mock_client: AsyncMock
1377+
) -> None:
1378+
worker = Worker(mock_client, task_queue="q1", workflows=[ReplayQuerySnapshotWorkflow], activities=[])
1379+
task = {
1380+
"query_task_id": "qt-compact-activity",
1381+
"query_task_attempt": 1,
1382+
"workflow_type": "replay-query-snapshot-wf",
1383+
"workflow_id": "wf-compact-activity",
1384+
"run_id": "run-compact-activity",
1385+
"query_name": "state",
1386+
"history_events": [
1387+
{
1388+
"event_type": "ActivityCompleted",
1389+
"payload": {
1390+
"sequence": 1,
1391+
"activity_type": "load-state",
1392+
},
1393+
},
1394+
],
1395+
"history_export": {
1396+
"payloads": {"codec": "json"},
1397+
"activities": [
1398+
{
1399+
"sequence": 1,
1400+
"activity_type": "load-state",
1401+
"payload_codec": "json",
1402+
"result": serializer.encode("loaded", codec="json"),
1403+
},
1404+
],
1405+
},
1406+
"workflow_arguments": serializer.envelope([], codec="json"),
1407+
"query_arguments": serializer.envelope([], codec="json"),
1408+
"payload_codec": "json",
1409+
}
1410+
1411+
outcome = await worker._run_query_task(task)
1412+
1413+
assert outcome == "completed"
1414+
mock_client.complete_query_task.assert_awaited_once_with(
1415+
query_task_id="qt-compact-activity",
1416+
lease_owner=worker.worker_id,
1417+
query_task_attempt=1,
1418+
result={
1419+
"activity_result": "loaded",
1420+
"approved_by": None,
1421+
"finished": False,
1422+
},
1423+
codec="json",
1424+
workflow_id="wf-compact-activity",
1425+
run_id="run-compact-activity",
1426+
query_name="state",
1427+
)
1428+
mock_client.fail_query_task.assert_not_called()
1429+
12711430
@pytest.mark.asyncio
12721431
async def test_query_task_awaits_async_query_result(self, mock_client: AsyncMock) -> None:
12731432
worker = Worker(mock_client, task_queue="q1", workflows=[AsyncQueryWorkflow], activities=[])

0 commit comments

Comments
 (0)