Skip to content

Commit 06f841b

Browse files
Replay start-boundary receivers after workflow initialization
Replay start-boundary receivers after workflow init
1 parent eae7cae commit 06f841b

3 files changed

Lines changed: 99 additions & 3 deletions

File tree

src/durable_workflow/workflow.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1681,8 +1681,6 @@ def _apply_due_receivers() -> None:
16811681
handler(*args)
16821682

16831683
result_cursor = 0
1684-
_apply_due_receivers()
1685-
16861684
gen = instance.run(ctx, *start_input)
16871685
if not hasattr(gen, "__next__"):
16881686
if isinstance(gen, ContinueAsNew):
@@ -1698,13 +1696,19 @@ def _apply_due_receivers() -> None:
16981696
wait_yield_count = 0
16991697
try:
17001698
while True:
1701-
_apply_due_receivers()
1699+
# Cursor-0 receivers are start-boundary events. Enter run() once
1700+
# before applying them so workflow initialization observes the
1701+
# same WorkflowStarted-before-Signal/Update ordering the server
1702+
# records in history.
1703+
if not first:
1704+
_apply_due_receivers()
17021705
if advanced_cmd is not None:
17031706
cmd = advanced_cmd
17041707
advanced_cmd = None
17051708
else:
17061709
cmd = gen.send(None) if first else gen.send(next_value)
17071710
first = False
1711+
_apply_due_receivers()
17081712
if isinstance(cmd, list):
17091713
needed = len(cmd)
17101714
if result_cursor + needed <= len(resolved_results):

tests/test_signals.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,20 @@ def _activity_completed_event(result: Any) -> dict[str, Any]:
7171
}
7272

7373

74+
def _json_activity_completed_event(result: Any) -> dict[str, Any]:
75+
return {
76+
"event_type": "ActivityCompleted",
77+
"payload": {"result": serializer.encode(result, codec="json"), "payload_codec": "json"},
78+
}
79+
80+
81+
def _workflow_started_event() -> dict[str, Any]:
82+
return {
83+
"event_type": "WorkflowStarted",
84+
"payload": {"timestamp": "2026-04-21T00:00:00Z"},
85+
}
86+
87+
7488
def _signal_received_event(name: str, args: list[Any]) -> dict[str, Any]:
7589
return {
7690
"event_type": "SignalReceived",
@@ -82,7 +96,45 @@ def _signal_received_event(name: str, args: list[Any]) -> dict[str, Any]:
8296
}
8397

8498

99+
def _json_signal_received_event(name: str, args: list[Any]) -> dict[str, Any]:
100+
return {
101+
"event_type": "SignalReceived",
102+
"payload": {
103+
"signal_name": name,
104+
"value": serializer.encode(args, codec="json"),
105+
"payload_codec": "json",
106+
},
107+
}
108+
109+
85110
class TestSignalDispatchDuringReplay:
111+
def test_start_boundary_signal_applies_after_workflow_initialization(self) -> None:
112+
@workflow.defn(name="start-boundary-signal-workflow")
113+
class StartBoundarySignalWorkflow:
114+
def __init__(self) -> None:
115+
self.messages: list[str] = []
116+
117+
@workflow.signal("message")
118+
def on_message(self, value: str) -> None:
119+
self.messages.append(value)
120+
121+
def run(self, ctx: WorkflowContext): # type: ignore[no-untyped-def]
122+
self.messages = ["started"]
123+
yield ctx.schedule_activity("wait", [])
124+
return list(self.messages)
125+
126+
events = [
127+
_workflow_started_event(),
128+
_json_signal_received_event("message", ["after-start"]),
129+
_json_activity_completed_event(None),
130+
]
131+
132+
outcome = replay(StartBoundarySignalWorkflow, events, [], payload_codec="json")
133+
134+
assert len(outcome.commands) == 1
135+
assert isinstance(outcome.commands[0], CompleteWorkflow)
136+
assert outcome.commands[0].result == ["started", "after-start"]
137+
86138
def test_signal_mutates_workflow_state_before_completion(self) -> None:
87139
events = [
88140
_signal_received_event("approve", ["alice"]),

tests/test_updates.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ def _signal_received_event(name: str, args: list[object]) -> dict[str, object]:
123123
}
124124

125125

126+
def _workflow_started_event() -> dict[str, object]:
127+
return {
128+
"event_type": "WorkflowStarted",
129+
"payload": {"timestamp": "2026-04-21T00:00:00Z"},
130+
}
131+
132+
126133
def _update_accepted_event(
127134
update_id: str,
128135
name: str,
@@ -156,6 +163,39 @@ def _update_applied_event(
156163

157164

158165
class TestUpdateApplicationReplay:
166+
def test_start_boundary_update_applies_after_workflow_initialization(self) -> None:
167+
@workflow.defn(name="start-boundary-update-receiver")
168+
class StartBoundaryUpdateReceiver:
169+
def __init__(self) -> None:
170+
self.messages: list[str] = []
171+
172+
@workflow.update("append")
173+
def append(self, value: str) -> list[str]:
174+
self.messages.append(value)
175+
return list(self.messages)
176+
177+
@workflow.query("messages")
178+
def read_messages(self) -> list[str]:
179+
return list(self.messages)
180+
181+
def run(self, ctx: WorkflowContext): # type: ignore[no-untyped-def]
182+
self.messages = ["started"]
183+
yield ctx.schedule_activity("wait", [])
184+
return list(self.messages)
185+
186+
history = [
187+
_workflow_started_event(),
188+
_update_applied_event("upd-start", "append", ["after-start"]),
189+
]
190+
191+
assert query_state(
192+
StartBoundaryUpdateReceiver,
193+
history,
194+
[],
195+
"messages",
196+
payload_codec="json",
197+
) == ["started", "after-start"]
198+
159199
def test_replay_applies_committed_update_events_to_workflow_state(self) -> None:
160200
history = [
161201
_update_applied_event("upd-1", "increment", [3]),

0 commit comments

Comments
 (0)