Skip to content

Commit b45253c

Browse files
[cross-repo from workflow#631] Conformance blocker: expand replay coverage beyond current smoke (#113)
1 parent b74bcb1 commit b45253c

4 files changed

Lines changed: 80 additions & 18 deletions

File tree

CONFORMANCE.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ The Python SDK claims two targets from the suite's matrix:
2626
| `control_plane_request_response` | `tests/fixtures/control-plane/` | stable, parity-shared with `cli` |
2727
| `signal_query_runtime_contract` | `tests/test_signals.py`, `tests/test_queries.py`, `tests/test_worker.py` | stable, parity-shared with PHP worker, CLI, and server routes |
2828
| `worker_task_lifecycle` | `tests/fixtures/external-task-input/`, `tests/fixtures/external-task-result/` | stable |
29-
| `history_replay_bundles` | `tests/fixtures/golden_history/` | stable, parity-shared with `workflow` golden bundles |
29+
| `history_replay_bundles` | `tests/fixtures/golden_history/` and the public replay scenario manifest at <https://durable-workflow.github.io/platform-conformance/replay-runtime-scenarios.json> | stable, parity-shared with `workflow` golden bundles and the full runtime replay scenario matrix |
3030

3131
The fixtures in this repo are exercised today by:
3232

@@ -51,7 +51,7 @@ result document before tag, with the conformance level at `full` or
5151
| Field | Value |
5252
| --- | --- |
5353
| Required claimed targets | `official_sdk`, `worker_protocol_implementation` |
54-
| Required suite version | `PlatformConformanceSuite::VERSION` (currently `2`, mirrored at `/platform-conformance-contract.json`) |
54+
| Required suite version | `PlatformConformanceSuite::VERSION` (currently `3`, mirrored at `/platform-conformance-contract.json`) |
5555
| CI job | `platform-conformance` (lands when the harness reference implementation publishes; until then `cli-parity` and `test_history_event_contract.py` cover the same ground) |
5656
| Block on `nonconforming` | yes |
5757
| Artifact attached to release | harness result document, schema `durable-workflow.v2.platform-conformance.result` |

src/durable_workflow/workflow.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2190,30 +2190,27 @@ def _assert_next_step_matches(command: Any, offset: int = 0) -> None:
21902190
return
21912191
_assert_step_matches(command, recorded_steps[step_index])
21922192

2193-
def _next_unconsumed_recorded_step() -> _RecordedStep | None:
2193+
def _unconsumed_recorded_steps() -> list[_RecordedStep]:
21942194
candidates: list[_RecordedStep] = []
21952195
if result_cursor < len(recorded_steps):
2196-
candidates.append(recorded_steps[result_cursor])
2196+
candidates.extend(recorded_steps[result_cursor:])
21972197
if wait_yield_count < len(recorded_wait_steps):
2198-
candidates.append(recorded_wait_steps[wait_yield_count])
2198+
candidates.extend(recorded_wait_steps[wait_yield_count:])
21992199
if pending_step_cursor < len(recorded_pending_steps):
2200-
candidates.append(recorded_pending_steps[pending_step_cursor])
2200+
candidates.extend(recorded_pending_steps[pending_step_cursor:])
2201+
return sorted(candidates, key=lambda step: step.workflow_sequence)
2202+
2203+
def _next_unconsumed_recorded_step() -> _RecordedStep | None:
2204+
candidates = _unconsumed_recorded_steps()
22012205
if not candidates:
22022206
return None
2203-
return min(candidates, key=lambda step: step.workflow_sequence)
2207+
return candidates[0]
22042208

22052209
def _assert_pending_step_matches(command: Any, offset: int = 0) -> None:
2206-
if offset == 0:
2207-
step = _next_unconsumed_recorded_step()
2208-
if step is None:
2209-
return
2210-
_assert_step_matches(command, step)
2211-
return
2212-
2213-
step_index = pending_step_cursor + offset
2214-
if step_index >= len(recorded_pending_steps):
2210+
candidates = _unconsumed_recorded_steps()
2211+
if offset >= len(candidates):
22152212
return
2216-
_assert_step_matches(command, recorded_pending_steps[step_index])
2213+
_assert_step_matches(command, candidates[offset])
22172214

22182215
def _assert_no_unconsumed_history(terminal_shape: str) -> None:
22192216
step = _next_unconsumed_recorded_step()

tests/test_replay.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import pytest
77

88
from durable_workflow import Replayer, ReplayOutcome, serializer, workflow
9-
from durable_workflow.errors import ActivityFailed, ChildWorkflowFailed
9+
from durable_workflow.errors import ActivityFailed, ChildWorkflowFailed, NonDeterministicReplayError
1010
from durable_workflow.workflow import (
1111
ActivityRetryPolicy,
1212
ChildWorkflowRetryPolicy,
@@ -165,6 +165,21 @@ def test_first_replay_schedules(self) -> None:
165165
assert cmd.activity_type == "greet"
166166
assert cmd.arguments == ["world"]
167167

168+
def test_pending_timer_history_rejects_single_activity_drift(self) -> None:
169+
history = [
170+
{
171+
"event_type": "TimerScheduled",
172+
"payload": {"sequence": 1, "timer_kind": "durable_timer"},
173+
},
174+
]
175+
176+
with pytest.raises(NonDeterministicReplayError) as exc_info:
177+
replay(OneActivity, history, ["world"])
178+
179+
assert exc_info.value.workflow_sequence == 1
180+
assert exc_info.value.expected_shape == "activity"
181+
assert exc_info.value.recorded_event_types == ["TimerScheduled"]
182+
168183
def test_completed_activity_triggers_completion(self) -> None:
169184
history = [{"event_type": "ActivityCompleted", "payload": {"result": '"hello, world"'}}]
170185
outcome = replay(OneActivity, history, ["world"])
@@ -1229,6 +1244,13 @@ def run(self, ctx: WorkflowContext): # type: ignore[no-untyped-def]
12291244
return result
12301245

12311246

1247+
@workflow.defn(name="fan-out-activity-timer-wf")
1248+
class FanOutActivityTimerWorkflow:
1249+
def run(self, ctx: WorkflowContext): # type: ignore[no-untyped-def]
1250+
yield [ctx.schedule_activity("fetch", []), ctx.start_timer(5)]
1251+
return "done"
1252+
1253+
12321254
@workflow.defn(name="fan-out-then-sequential-wf")
12331255
class FanOutThenSequentialWorkflow:
12341256
def run(self, ctx: WorkflowContext): # type: ignore[no-untyped-def]
@@ -1288,6 +1310,24 @@ def test_timers_fired_then_activity(self) -> None:
12881310
assert isinstance(outcome.commands[0], ScheduleActivity)
12891311
assert outcome.commands[0].activity_type == "after-timers"
12901312

1313+
def test_pending_batch_history_matches_by_workflow_sequence(self) -> None:
1314+
history = [
1315+
{
1316+
"event_type": "TimerScheduled",
1317+
"payload": {"sequence": 2, "timer_kind": "durable_timer"},
1318+
},
1319+
{
1320+
"event_type": "ActivityScheduled",
1321+
"payload": {"sequence": 1, "activity_type": "fetch"},
1322+
},
1323+
]
1324+
1325+
outcome = replay(FanOutActivityTimerWorkflow, history, [])
1326+
1327+
assert len(outcome.commands) == 2
1328+
assert isinstance(outcome.commands[0], ScheduleActivity)
1329+
assert isinstance(outcome.commands[1], StartTimer)
1330+
12911331
def test_fan_out_then_sequential(self) -> None:
12921332
history = [
12931333
{"event_type": "ActivityCompleted", "payload": {"result": '"r1"'}},

tests/test_replay_verify.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,31 @@ def test_verify_replay_refuses_in_flight_scheduled_command_drift() -> None:
149149
assert report.error["recorded_event_types"] == ["ActivityScheduled"]
150150

151151

152+
def test_verify_replay_refuses_single_command_replacing_pending_timer() -> None:
153+
history = [
154+
{
155+
"event_type": "TimerScheduled",
156+
"payload": {
157+
"sequence": 1,
158+
"timer_kind": "durable_timer",
159+
},
160+
}
161+
]
162+
163+
report = verify_replay(
164+
ActivityDivergenceWorkflow,
165+
history,
166+
case_id="activity-replaced-timer",
167+
)
168+
169+
assert report.status == STATUS_DRIFTED
170+
assert report.reason == REASON_SHAPE_MISMATCH
171+
assert report.error is not None
172+
assert report.error["workflow_sequence"] == 1
173+
assert report.error["expected_shape"] == "activity"
174+
assert report.error["recorded_event_types"] == ["TimerScheduled"]
175+
176+
152177
def test_verify_replay_refuses_same_shape_activity_name_drift() -> None:
153178
history = [
154179
{

0 commit comments

Comments
 (0)