Skip to content

Commit 808a06d

Browse files
[cross-repo from server#267] Conformance: replay approve signal does not complete after activity state query (#84)
1 parent bbfd981 commit 808a06d

4 files changed

Lines changed: 119 additions & 13 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
## [Unreleased]
88

99
### Fixed
10+
- Signal and update receivers recorded after an activity result now replay after
11+
the workflow consumes that activity result, so receiver-mutated state is not
12+
overwritten by deterministic post-activity setup before a `wait_condition`.
1013
- Python workflow replay now throws terminal `ActivityFailed` history events
1114
into the generator as a typed `ActivityFailed` exception, including the
1215
recorded activity and failure metadata. Activity-only saga workflows can

src/durable_workflow/workflow.py

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2052,8 +2052,16 @@ def _state(commands: list[Command]) -> _ReplayState:
20522052
signal_registry: dict[str, str] = getattr(workflow_cls, "__workflow_signals__", {}) or {}
20532053
update_registry: dict[str, str] = getattr(workflow_cls, "__workflow_updates__", {}) or {}
20542054

2055-
def _apply_due_receivers() -> None:
2056-
while pending_receivers and pending_receivers[0][0] <= result_cursor:
2055+
def _apply_due_receivers(*, before_consuming_result: bool = False) -> None:
2056+
while pending_receivers:
2057+
receiver_index = pending_receivers[0][0]
2058+
due = (
2059+
receiver_index < result_cursor
2060+
if before_consuming_result
2061+
else receiver_index <= result_cursor
2062+
)
2063+
if not due:
2064+
break
20572065
_, kind, name, args = pending_receivers.pop(0)
20582066
if kind == "signal":
20592067
method_name = signal_registry.get(name)
@@ -2085,19 +2093,30 @@ def _apply_due_receivers() -> None:
20852093
pending: list[Command] = []
20862094
advanced_cmd: Any = None
20872095
wait_yield_count = 0
2096+
2097+
def _terminal_state(value: Any, *, include_pending: bool) -> _ReplayState:
2098+
_apply_due_receivers()
2099+
commands = list(pending) if include_pending else []
2100+
if isinstance(value, ContinueAsNew):
2101+
return _state(commands + [value])
2102+
return _state(commands + [CompleteWorkflow(result=value)])
2103+
20882104
try:
20892105
while True:
20902106
# Cursor-0 receivers are start-boundary events. Enter run() once
20912107
# before applying them so workflow initialization observes the
20922108
# same WorkflowStarted-before-Signal/Update ordering the server
20932109
# records in history.
20942110
if not first:
2095-
_apply_due_receivers()
2111+
_apply_due_receivers(before_consuming_result=True)
20962112
if advanced_cmd is not None:
20972113
cmd = advanced_cmd
20982114
advanced_cmd = None
20992115
else:
2100-
cmd = gen.send(None) if first else gen.send(next_value)
2116+
try:
2117+
cmd = gen.send(None) if first else gen.send(next_value)
2118+
except StopIteration as stop:
2119+
return _terminal_state(stop.value, include_pending=True)
21012120
first = False
21022121
_apply_due_receivers()
21032122
if isinstance(cmd, list):
@@ -2111,9 +2130,7 @@ def _apply_due_receivers() -> None:
21112130
advanced_cmd = gen.throw(failed)
21122131
continue
21132132
except StopIteration as stop:
2114-
if isinstance(stop.value, ContinueAsNew):
2115-
return _state([stop.value])
2116-
return _state([CompleteWorkflow(result=stop.value)])
2133+
return _terminal_state(stop.value, include_pending=False)
21172134
next_value = vals
21182135
continue
21192136
ctx.logger._set_replaying(False)
@@ -2208,19 +2225,21 @@ def _apply_due_receivers() -> None:
22082225
advanced_cmd = gen.throw(val)
22092226
continue
22102227
except StopIteration as stop:
2211-
if isinstance(stop.value, ContinueAsNew):
2212-
return _state([stop.value])
2213-
return _state([CompleteWorkflow(result=stop.value)])
2228+
return _terminal_state(stop.value, include_pending=False)
22142229
next_value = val
22152230
continue
22162231
ctx.logger._set_replaying(False)
22172232
pending.append(cmd)
22182233
return _state(pending)
22192234
raise TypeError(f"workflow yielded unsupported command: {cmd!r}")
22202235
except StopIteration as stop:
2221-
if isinstance(stop.value, ContinueAsNew):
2222-
return _state(pending + [stop.value])
2223-
return _state(pending + [CompleteWorkflow(result=stop.value)])
2236+
try:
2237+
return _terminal_state(stop.value, include_pending=True)
2238+
except Exception as exc:
2239+
return _state([FailWorkflow(
2240+
message=str(exc),
2241+
exception_type=type(exc).__name__,
2242+
)])
22242243
except Exception as exc:
22252244
return _state([FailWorkflow(
22262245
message=str(exc),

tests/test_updates.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,16 @@ def _workflow_started_event() -> dict[str, object]:
130130
}
131131

132132

133+
def _activity_completed_event(result: object) -> dict[str, object]:
134+
return {
135+
"event_type": "ActivityCompleted",
136+
"payload": {
137+
"result": serializer.encode(result, codec="json"),
138+
"payload_codec": "json",
139+
},
140+
}
141+
142+
133143
def _update_accepted_event(
134144
update_id: str,
135145
name: str,
@@ -210,6 +220,21 @@ def test_replay_applies_committed_update_events_to_workflow_state(self) -> None:
210220
payload_codec="json",
211221
) == 7
212222

223+
def test_query_state_applies_update_after_final_activity_result(self) -> None:
224+
history = [
225+
_signal_received_event("increment", [1]),
226+
_activity_completed_event(None),
227+
_update_applied_event("upd-final", "increment", [5]),
228+
]
229+
230+
assert query_state(
231+
StatefulUpdateReceiver,
232+
history,
233+
[],
234+
"count",
235+
payload_codec="json",
236+
) == 6
237+
213238
def test_apply_update_completes_accepted_update_with_handler_result(self) -> None:
214239
history = [
215240
_signal_received_event("increment", [2]),

tests/test_wait_condition.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
FailWorkflow,
77
WaitCondition,
88
WorkflowContext,
9+
query_state,
910
replay,
1011
)
1112

@@ -60,6 +61,38 @@ def run(self, ctx: WorkflowContext): # type: ignore[no-untyped-def]
6061
return "satisfied" if satisfied else "no"
6162

6263

64+
@workflow.defn(name="activity-then-wait-for-approval")
65+
class ActivityThenWaitForApproval:
66+
def __init__(self) -> None:
67+
self.activity_result: str | None = None
68+
self.approved_by: str | None = None
69+
70+
@workflow.signal("approve")
71+
def approve(self, approved_by: str) -> None:
72+
self.approved_by = approved_by
73+
74+
@workflow.query("state")
75+
def state(self) -> dict[str, str | None]:
76+
return {
77+
"activity_result": self.activity_result,
78+
"approved_by": self.approved_by,
79+
}
80+
81+
def run(self, ctx: WorkflowContext): # type: ignore[no-untyped-def]
82+
self.activity_result = yield ctx.schedule_activity("load-state", [])
83+
self.approved_by = None
84+
yield ctx.wait_condition(
85+
lambda: self.approved_by == "alice",
86+
key="approval",
87+
timeout=30,
88+
)
89+
90+
return {
91+
"activity_result": self.activity_result,
92+
"approved_by": self.approved_by,
93+
}
94+
95+
6396
class TestCtxWaitCondition:
6497
def test_wait_condition_returns_dataclass_with_predicate_and_key(self) -> None:
6598
ctx = WorkflowContext(run_id="x")
@@ -188,6 +221,32 @@ def test_condition_wait_satisfied_event_resolves_wait_even_if_predicate_false(se
188221
assert isinstance(outcome.commands[0], CompleteWorkflow)
189222
assert outcome.commands[0].result == "approved"
190223

224+
def test_signal_after_activity_result_applies_after_result_is_consumed(self) -> None:
225+
history = [
226+
{
227+
"event_type": "ActivityCompleted",
228+
"payload": {"result": '"loaded"'},
229+
},
230+
{
231+
"event_type": "ConditionWaitOpened",
232+
"payload": {"condition_wait_id": "wait-1", "condition_key": "approval"},
233+
},
234+
_signal_received_event("approve", ["alice"]),
235+
]
236+
237+
outcome = replay(ActivityThenWaitForApproval, history, [])
238+
239+
assert len(outcome.commands) == 1
240+
assert isinstance(outcome.commands[0], CompleteWorkflow)
241+
assert outcome.commands[0].result == {
242+
"activity_result": "loaded",
243+
"approved_by": "alice",
244+
}
245+
assert query_state(ActivityThenWaitForApproval, history, [], "state") == {
246+
"activity_result": "loaded",
247+
"approved_by": "alice",
248+
}
249+
191250
def test_open_with_no_resolution_and_predicate_false_re_emits_wait_condition(self) -> None:
192251
history = [
193252
{

0 commit comments

Comments
 (0)