Skip to content

Commit 0816464

Browse files
Conformance finding: signals/queries smoke hits Python nondeterministic replay (#121)
1 parent 19de0fa commit 0816464

4 files changed

Lines changed: 169 additions & 18 deletions

File tree

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
## [Unreleased]
88

99
### Fixed
10+
- Condition-wait replay now binds signals that arrive during a leased
11+
workflow task to the next recorded wait when the server history records
12+
those signals before the task's `ConditionWaitOpened` row. This avoids
13+
applying rapid signal batches to the previous wait and prevents replay from
14+
completing with a later wait/timer history step left unconsumed.
1015
- Workflow workers now report unhandled workflow-task execution errors back to
1116
the server instead of leaving the leased task pending until the lease or CLI
1217
wait times out. This lets the server observe and retry or fail the task

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "durable-workflow"
7-
version = "0.4.55"
7+
version = "0.4.56"
88
description = "Python SDK for the Durable Workflow server (language-neutral HTTP protocol)"
99
readme = "README.md"
1010
requires-python = ">=3.10"

src/durable_workflow/workflow.py

Lines changed: 114 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2112,8 +2112,6 @@ def _state(commands: list[Command]) -> _ReplayState:
21122112
# in history, future server-recorded) or 'timed_out' (from a matching
21132113
# condition_timeout TimerFired event).
21142114
wait_resolutions: dict[str, str] = {}
2115-
open_condition_wait_ids: list[str] = []
2116-
21172115
def _append_resolved_result(value: Any, shape: str, event: Mapping[str, Any]) -> None:
21182116
payload = event.get("payload") or {}
21192117
if not isinstance(payload, Mapping):
@@ -2227,16 +2225,113 @@ def _assert_no_unconsumed_history(terminal_shape: str) -> None:
22272225
step.event_types,
22282226
)
22292227

2230-
def _current_condition_wait_id() -> str | None:
2231-
return open_condition_wait_ids[-1] if open_condition_wait_ids else None
2228+
def _is_external_receiver_event(event_type: str | None) -> bool:
2229+
return event_type in ("SignalReceived", "UpdateApplied")
22322230

2233-
def _close_condition_wait(wait_id: Any) -> None:
2234-
if not isinstance(wait_id, str) or not wait_id:
2235-
return
2236-
with contextlib.suppress(ValueError):
2237-
open_condition_wait_ids.remove(wait_id)
2231+
def _receiver_binding_boundary_kind(event_type: str | None, payload: Mapping[str, Any]) -> str | None:
2232+
if event_type in (
2233+
"ConditionWaitSatisfied",
2234+
"ConditionWaitTimedOut",
2235+
):
2236+
return "condition"
2237+
if (
2238+
event_type in ("TimerFired", "TimerCancelled")
2239+
and payload.get("timer_kind") == "condition_timeout"
2240+
):
2241+
return "condition"
2242+
if event_type in (
2243+
"WorkflowCompleted",
2244+
"WorkflowFailed",
2245+
"WorkflowContinuedAsNew",
2246+
"ActivityScheduled",
2247+
"ActivityStarted",
2248+
"ChildWorkflowScheduled",
2249+
"ChildRunStarted",
2250+
):
2251+
return "step"
2252+
if event_type == "TimerScheduled":
2253+
return "step" if payload.get("timer_kind") not in ("condition_timeout", "signal_timeout") else None
2254+
return "step" if _is_resolved_step_event(event_type, payload) else None
2255+
2256+
def _receiver_condition_wait_bindings() -> dict[int, str | None]:
2257+
bindings: dict[int, str | None] = {}
2258+
prefix_receivers: list[int] = []
2259+
prefix_can_bind_to_first_wait = True
2260+
current_wait_id: str | None = None
2261+
receivers_since_wait: list[int | None] = []
2262+
2263+
for index, event in enumerate(events):
2264+
event_type = _history_event_type(event)
2265+
payload = event.get("payload") or {}
2266+
if not isinstance(payload, Mapping):
2267+
payload = {}
2268+
2269+
if event_type == "ConditionWaitOpened":
2270+
wait_id = payload.get("condition_wait_id")
2271+
if not isinstance(wait_id, str) or not wait_id:
2272+
continue
22382273

2239-
for ev in events:
2274+
if current_wait_id is None:
2275+
for receiver_index in prefix_receivers:
2276+
bindings[receiver_index] = wait_id if prefix_can_bind_to_first_wait else None
2277+
else:
2278+
# When multiple signals arrive while the task woken by the
2279+
# first signal is still leased, the server records those
2280+
# later SignalReceived rows before the task's next
2281+
# ConditionWaitOpened row. Replay them at that next wait.
2282+
for receiver_index in receivers_since_wait[1:]:
2283+
if receiver_index is not None:
2284+
bindings[receiver_index] = wait_id
2285+
2286+
prefix_receivers = []
2287+
prefix_can_bind_to_first_wait = True
2288+
current_wait_id = wait_id
2289+
receivers_since_wait = []
2290+
continue
2291+
2292+
if _is_external_receiver_event(event_type):
2293+
explicit_sequence = _workflow_sequence(payload) is not None
2294+
if current_wait_id is None:
2295+
if not explicit_sequence:
2296+
prefix_receivers.append(index)
2297+
continue
2298+
2299+
receivers_since_wait.append(None if explicit_sequence else index)
2300+
if len(receivers_since_wait) == 1 and not explicit_sequence:
2301+
bindings[index] = current_wait_id
2302+
continue
2303+
2304+
boundary_kind = _receiver_binding_boundary_kind(event_type, payload)
2305+
if boundary_kind is None:
2306+
continue
2307+
2308+
if current_wait_id is None:
2309+
if boundary_kind == "step":
2310+
for receiver_index in prefix_receivers:
2311+
bindings[receiver_index] = None
2312+
prefix_receivers = []
2313+
prefix_can_bind_to_first_wait = False
2314+
continue
2315+
2316+
for receiver_index in receivers_since_wait:
2317+
if receiver_index is not None and receiver_index not in bindings:
2318+
bindings[receiver_index] = current_wait_id
2319+
current_wait_id = None
2320+
receivers_since_wait = []
2321+
prefix_can_bind_to_first_wait = boundary_kind == "condition"
2322+
2323+
for receiver_index in prefix_receivers:
2324+
bindings[receiver_index] = None
2325+
if current_wait_id is not None:
2326+
for receiver_index in receivers_since_wait:
2327+
if receiver_index is not None and receiver_index not in bindings:
2328+
bindings[receiver_index] = current_wait_id
2329+
2330+
return bindings
2331+
2332+
receiver_condition_wait_ids = _receiver_condition_wait_bindings()
2333+
2334+
for event_index, ev in enumerate(events):
22402335
etype = _history_event_type(ev)
22412336
payload = ev.get("payload") or {}
22422337
if etype == "ActivityCompleted":
@@ -2260,7 +2355,6 @@ def _close_condition_wait(wait_id: Any) -> None:
22602355
wait_id = payload.get("condition_wait_id")
22612356
if isinstance(wait_id, str) and wait_id:
22622357
wait_resolutions[wait_id] = "timed_out"
2263-
_close_condition_wait(wait_id)
22642358
continue
22652359
if timer_kind == "signal_timeout":
22662360
continue
@@ -2274,17 +2368,14 @@ def _close_condition_wait(wait_id: Any) -> None:
22742368
if isinstance(wait_id, str) and wait_id:
22752369
recorded_wait_steps.append(_recorded_step("condition wait", ev))
22762370
wait_opened.append(dict(payload))
2277-
open_condition_wait_ids.append(wait_id)
22782371
elif etype == "ConditionWaitSatisfied":
22792372
wait_id = payload.get("condition_wait_id")
22802373
if isinstance(wait_id, str) and wait_id:
22812374
wait_resolutions[wait_id] = "satisfied"
2282-
_close_condition_wait(wait_id)
22832375
elif etype == "ConditionWaitTimedOut":
22842376
wait_id = payload.get("condition_wait_id")
22852377
if isinstance(wait_id, str) and wait_id:
22862378
wait_resolutions[wait_id] = "timed_out"
2287-
_close_condition_wait(wait_id)
22882379
elif etype in ("SideEffectRecorded", "ChildRunCompleted"):
22892380
shape = "side effect" if etype == "SideEffectRecorded" else "child workflow"
22902381
_append_resolved_result(
@@ -2316,7 +2407,7 @@ def _close_condition_wait(wait_id: Any) -> None:
23162407
condition_wait_id = (
23172408
condition_wait_ids_by_sequence.get(workflow_sequence)
23182409
if workflow_sequence is not None
2319-
else None
2410+
else receiver_condition_wait_ids.get(event_index)
23202411
)
23212412
pending_receivers.append(_PendingReceiver(
23222413
result_index=len(resolved_results),
@@ -2332,11 +2423,17 @@ def _close_condition_wait(wait_id: Any) -> None:
23322423
external_storage=external_storage,
23332424
external_storage_cache=external_storage_cache,
23342425
),
2335-
condition_wait_id=condition_wait_id or _current_condition_wait_id(),
2426+
condition_wait_id=condition_wait_id,
23362427
))
23372428
elif etype == "UpdateApplied":
23382429
update_name = payload.get("update_name")
23392430
if isinstance(update_name, str) and update_name:
2431+
workflow_sequence = _workflow_sequence(payload)
2432+
condition_wait_id = (
2433+
condition_wait_ids_by_sequence.get(workflow_sequence)
2434+
if workflow_sequence is not None
2435+
else receiver_condition_wait_ids.get(event_index)
2436+
)
23402437
pending_receivers.append(_PendingReceiver(
23412438
result_index=len(resolved_results),
23422439
kind="update",
@@ -2351,7 +2448,7 @@ def _close_condition_wait(wait_id: Any) -> None:
23512448
external_storage=external_storage,
23522449
external_storage_cache=external_storage_cache,
23532450
),
2354-
condition_wait_id=_current_condition_wait_id(),
2451+
condition_wait_id=condition_wait_id,
23552452
))
23562453

23572454
signal_registry: dict[str, str] = getattr(workflow_cls, "__workflow_signals__", {}) or {}

tests/test_wait_condition.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,55 @@ def test_signal_received_before_next_wait_uses_declared_workflow_sequence(self)
464464
assert outcome.commands[0].result == expected
465465
assert query_state(SignalCounterUntilFinished, history, [], "current") == expected
466466

467+
def test_signal_received_before_reopened_wait_uses_next_wait_when_sequence_absent(self) -> None:
468+
history = [
469+
{
470+
"event_type": "ConditionWaitOpened",
471+
"payload": {
472+
"condition_wait_id": "wait-count-3",
473+
"condition_key": "done",
474+
"sequence": 21,
475+
},
476+
},
477+
_signal_received_event("increment", [3]),
478+
_signal_received_event("finish", []),
479+
{
480+
"event_type": "ConditionWaitOpened",
481+
"payload": {
482+
"condition_wait_id": "wait-finish",
483+
"condition_key": "done",
484+
"sequence": 22,
485+
"timeout_seconds": 30,
486+
},
487+
},
488+
{
489+
"event_type": "TimerScheduled",
490+
"payload": {
491+
"timer_kind": "condition_timeout",
492+
"condition_wait_id": "wait-finish",
493+
"condition_key": "done",
494+
"sequence": 22,
495+
"delay_seconds": 30,
496+
},
497+
},
498+
]
499+
500+
expected = {
501+
"count": 3,
502+
"done": True,
503+
"events": [
504+
{"signal": "increment", "amount": 3, "count": 3},
505+
{"signal": "finish", "count": 3},
506+
],
507+
}
508+
509+
outcome = replay(SignalCounterUntilFinished, history, [])
510+
511+
assert len(outcome.commands) == 1
512+
assert isinstance(outcome.commands[0], CompleteWorkflow)
513+
assert outcome.commands[0].result == expected
514+
assert query_state(SignalCounterUntilFinished, history, [], "current") == expected
515+
467516
def test_repeated_wait_after_activity_can_be_satisfied_by_later_signal(self) -> None:
468517
history = [
469518
{

0 commit comments

Comments
 (0)