Skip to content

Commit 242e92f

Browse files
[cross-repo from server#313] Conformance blocker: expand signals and queries coverage beyond current smoke (#115)
1 parent 4e6b8e1 commit 242e92f

4 files changed

Lines changed: 122 additions & 16 deletions

File tree

src/durable_workflow/worker.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,6 @@ def _query_history_with_export_signal_arguments(
241241
if not isinstance(raw_payload, Mapping):
242242
enriched.append(dict(raw_event))
243243
continue
244-
if any(raw_payload.get(key) is not None for key in ("value", "input", "arguments")):
245-
enriched.append(dict(raw_event))
246-
continue
247-
248244
signal: Mapping[str, Any] | None = None
249245
signal_id = raw_payload.get("signal_id")
250246
if isinstance(signal_id, str) and signal_id:
@@ -265,14 +261,26 @@ def _query_history_with_export_signal_arguments(
265261
enriched.append(dict(raw_event))
266262
continue
267263

264+
payload = dict(raw_payload)
265+
payload_changed = False
266+
workflow_sequence = signal.get("workflow_sequence")
267+
if isinstance(workflow_sequence, int):
268+
payload.setdefault("workflow_sequence", workflow_sequence)
269+
payload_changed = True
270+
elif isinstance(workflow_sequence, str) and workflow_sequence.isdigit():
271+
payload.setdefault("workflow_sequence", int(workflow_sequence))
272+
payload_changed = True
273+
268274
envelope = _signal_arguments_envelope_from_export(signal, default_codec=signal_default_codec)
269-
if envelope is None:
275+
if envelope is not None:
276+
payload.setdefault("arguments", envelope)
277+
payload.setdefault("payload_codec", envelope.get("codec"))
278+
payload_changed = True
279+
280+
if not payload_changed:
270281
enriched.append(dict(raw_event))
271282
continue
272283

273-
payload = dict(raw_payload)
274-
payload["arguments"] = envelope
275-
payload.setdefault("payload_codec", envelope.get("codec"))
276284
event = dict(raw_event)
277285
event["payload"] = payload
278286
enriched.append(event)

src/durable_workflow/workflow.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2055,6 +2055,7 @@ def _replay_state(
20552055
event_types_by_sequence: dict[int, list[str]] = {}
20562056
details_by_sequence: dict[int, dict[str, Any]] = {}
20572057
resolved_sequences: set[int] = set()
2058+
condition_wait_ids_by_sequence: dict[int, str] = {}
20582059

20592060
for event in events:
20602061
event_type = _history_event_type(event)
@@ -2069,6 +2070,10 @@ def _replay_state(
20692070
details_by_sequence.setdefault(sequence, {}).update(_recorded_step_details(payload))
20702071
if _is_resolved_step_event(event_type, payload):
20712072
resolved_sequences.add(sequence)
2073+
if event_type == "ConditionWaitOpened":
2074+
wait_id = payload.get("condition_wait_id")
2075+
if isinstance(wait_id, str) and wait_id:
2076+
condition_wait_ids_by_sequence[sequence] = wait_id
20722077

20732078
workflow_start_time: datetime | None = None
20742079
for ev in events:
@@ -2307,6 +2312,12 @@ def _close_condition_wait(wait_id: Any) -> None:
23072312
elif etype == "SignalReceived":
23082313
signal_name = payload.get("signal_name")
23092314
if isinstance(signal_name, str) and signal_name:
2315+
workflow_sequence = _workflow_sequence(payload)
2316+
condition_wait_id = (
2317+
condition_wait_ids_by_sequence.get(workflow_sequence)
2318+
if workflow_sequence is not None
2319+
else None
2320+
)
23102321
pending_receivers.append(_PendingReceiver(
23112322
result_index=len(resolved_results),
23122323
kind="signal",
@@ -2321,7 +2332,7 @@ def _close_condition_wait(wait_id: Any) -> None:
23212332
external_storage=external_storage,
23222333
external_storage_cache=external_storage_cache,
23232334
),
2324-
condition_wait_id=_current_condition_wait_id(),
2335+
condition_wait_id=condition_wait_id or _current_condition_wait_id(),
23252336
))
23262337
elif etype == "UpdateApplied":
23272338
update_name = payload.get("update_name")

tests/test_wait_condition.py

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,18 @@
1111
)
1212

1313

14-
def _signal_received_event(name: str, args: list) -> dict:
14+
def _signal_received_event(name: str, args: list, workflow_sequence: int | None = None) -> dict:
15+
payload = {
16+
"signal_name": name,
17+
"value": serializer.envelope(args),
18+
"payload_codec": serializer.AVRO_CODEC,
19+
}
20+
if workflow_sequence is not None:
21+
payload["workflow_sequence"] = workflow_sequence
22+
1523
return {
1624
"event_type": "SignalReceived",
17-
"payload": {
18-
"signal_name": name,
19-
"value": serializer.envelope(args),
20-
"payload_codec": serializer.AVRO_CODEC,
21-
},
25+
"payload": payload,
2226
}
2327

2428

@@ -414,6 +418,52 @@ def test_repeated_physical_waits_keep_query_state_current_before_finish(self) ->
414418
],
415419
}
416420

421+
def test_signal_received_before_next_wait_uses_declared_workflow_sequence(self) -> None:
422+
history = [
423+
{
424+
"event_type": "ConditionWaitOpened",
425+
"payload": {
426+
"condition_wait_id": "wait-count-3",
427+
"condition_key": "done",
428+
"sequence": 1,
429+
},
430+
},
431+
_signal_received_event("increment", [3], workflow_sequence=1),
432+
_signal_received_event("finish", [], workflow_sequence=2),
433+
{
434+
"event_type": "ConditionWaitOpened",
435+
"payload": {
436+
"condition_wait_id": "wait-finish",
437+
"condition_key": "done",
438+
"sequence": 2,
439+
},
440+
},
441+
{
442+
"event_type": "ConditionWaitSatisfied",
443+
"payload": {
444+
"condition_wait_id": "wait-finish",
445+
"condition_key": "done",
446+
"sequence": 2,
447+
},
448+
},
449+
]
450+
451+
expected = {
452+
"count": 3,
453+
"done": True,
454+
"events": [
455+
{"signal": "increment", "amount": 3, "count": 3},
456+
{"signal": "finish", "count": 3},
457+
],
458+
}
459+
460+
outcome = replay(SignalCounterUntilFinished, history, [])
461+
462+
assert len(outcome.commands) == 1
463+
assert isinstance(outcome.commands[0], CompleteWorkflow)
464+
assert outcome.commands[0].result == expected
465+
assert query_state(SignalCounterUntilFinished, history, [], "current") == expected
466+
417467
def test_repeated_wait_after_activity_can_be_satisfied_by_later_signal(self) -> None:
418468
history = [
419469
{

tests/test_worker.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@
3232
WorkflowTaskHandler,
3333
WorkflowTaskInterceptorContext,
3434
)
35-
from durable_workflow.worker import Worker, _should_fail_workflow_task_after_completion_error
35+
from durable_workflow.worker import (
36+
Worker,
37+
_query_history_with_export_signal_arguments,
38+
_should_fail_workflow_task_after_completion_error,
39+
)
3640

3741

3842
@workflow.defn(name="test-wf")
@@ -916,6 +920,39 @@ async def test_update_task_retries_transient_completion_error(
916920
assert mock_client.complete_workflow_task.await_count == 2
917921
mock_client.fail_workflow_task.assert_not_called()
918922

923+
def test_query_history_enrichment_copies_signal_workflow_sequence_from_export(self) -> None:
924+
history = [
925+
{
926+
"event_type": "SignalReceived",
927+
"workflow_command_id": "cmd-finish",
928+
"payload": {
929+
"signal_id": "sig-finish",
930+
"workflow_command_id": "cmd-finish",
931+
"signal_name": "finish",
932+
},
933+
},
934+
]
935+
export = {
936+
"payloads": {"codec": "json"},
937+
"signals": [
938+
{
939+
"id": "sig-finish",
940+
"command_id": "cmd-finish",
941+
"name": "finish",
942+
"workflow_sequence": 2,
943+
"payload_codec": "json",
944+
"arguments": serializer.encode([], codec="json"),
945+
},
946+
],
947+
}
948+
949+
enriched = _query_history_with_export_signal_arguments(history, export, default_codec="json")
950+
951+
assert isinstance(enriched, list)
952+
payload = enriched[0]["payload"]
953+
assert payload["workflow_sequence"] == 2
954+
assert payload["arguments"]["codec"] == "json"
955+
919956
@pytest.mark.asyncio
920957
async def test_query_task_executes_registered_query(self, mock_client: AsyncMock) -> None:
921958
worker = Worker(mock_client, task_queue="q1", workflows=[QueryWorkflow], activities=[])

0 commit comments

Comments
 (0)