Skip to content

Commit 1ec2e59

Browse files
Conformance finding: signals/queries Python workflow hits nondeterministic replay (#144)
1 parent 8a99ae1 commit 1ec2e59

3 files changed

Lines changed: 425 additions & 8 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2121
reopened wait. This handles server histories where later signals are accepted
2222
while the previous wait is still open but must replay against the next
2323
physical wait.
24+
- Condition-wait replay now lets a true predicate finish the current wait
25+
before any following same-key wait is considered stale terminal history.
26+
This keeps query and signal replay aligned with histories that include
27+
unresolved `ConditionWaitOpened` plus condition-timeout `TimerScheduled`
28+
rows after a replayed false reopen, while preserving pending sequential
29+
same-key waits and resolved reopens that must remain replay history.
2430
- Workflow workers now report unhandled workflow-task execution errors back to
2531
the server instead of leaving the leased task pending until the lease or CLI
2632
wait times out. This lets the server observe and retry or fail the task

src/durable_workflow/workflow.py

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2296,7 +2296,7 @@ def _receiver_condition_wait_bindings() -> dict[int, str | None]:
22962296
if _is_external_receiver_event(event_type):
22972297
explicit_sequence = _workflow_sequence(payload) is not None
22982298
if current_wait_id is None:
2299-
if not explicit_sequence:
2299+
if prefix_can_bind_to_first_wait or not explicit_sequence:
23002300
prefix_receivers.append(index)
23012301
continue
23022302

@@ -2562,9 +2562,31 @@ def _same_logical_condition_wait(opened: Mapping[str, Any], cmd: WaitCondition)
25622562
first = True
25632563
pending: list[Command] = []
25642564
advanced_cmd: Any = None
2565+
terminal_condition_reopen_cmd: WaitCondition | None = None
2566+
2567+
def _condition_wait_has_pending_receivers(condition_wait_id: str | None) -> bool:
2568+
if condition_wait_id is None:
2569+
return False
2570+
return any(receiver.condition_wait_id == condition_wait_id for receiver in pending_receivers)
2571+
2572+
def _consume_terminal_condition_reopens() -> None:
2573+
nonlocal wait_yield_count
2574+
if terminal_condition_reopen_cmd is None:
2575+
return
2576+
while wait_yield_count < len(wait_opened):
2577+
opened = wait_opened[wait_yield_count]
2578+
if not _same_logical_condition_wait(opened, terminal_condition_reopen_cmd):
2579+
break
2580+
opened_id = opened.get("condition_wait_id")
2581+
if not isinstance(opened_id, str):
2582+
break
2583+
if opened_id in wait_resolutions or _condition_wait_has_pending_receivers(opened_id):
2584+
break
2585+
wait_yield_count += 1
25652586

25662587
def _terminal_state(value: Any, *, include_pending: bool) -> _ReplayState:
25672588
_apply_due_receivers()
2589+
_consume_terminal_condition_reopens()
25682590
commands = list(pending) if include_pending else []
25692591
if isinstance(value, ContinueAsNew):
25702592
_assert_no_unconsumed_history("continue as new")
@@ -2652,6 +2674,9 @@ def _terminal_state(value: Any, *, include_pending: bool) -> _ReplayState:
26522674
step = _next_unconsumed_recorded_step()
26532675
if step is not None:
26542676
_assert_step_matches(cmd, step)
2677+
# Terminal cleanup may only skip a later open after this
2678+
# yielded wait has already replayed at least one false reopen.
2679+
consumed_reopen_for_current_wait = False
26552680
while True:
26562681
resolution: str | None = None
26572682
opened: dict[str, Any] | None = None
@@ -2666,7 +2691,15 @@ def _terminal_state(value: Any, *, include_pending: bool) -> _ReplayState:
26662691
if isinstance(opened_id, str):
26672692
resolution = wait_resolutions.get(opened_id)
26682693
_apply_condition_wait_receivers(opened_id)
2694+
next_wait_index = wait_yield_count + 1
2695+
has_reopened_same_wait = (
2696+
opened is not None
2697+
and next_wait_index < len(wait_opened)
2698+
and _same_logical_condition_wait(wait_opened[next_wait_index], cmd)
2699+
)
2700+
26692701
if resolution == "timed_out":
2702+
terminal_condition_reopen_cmd = None
26702703
next_value = False
26712704
wait_yield_count += 1
26722705
break
@@ -2677,21 +2710,27 @@ def _terminal_state(value: Any, *, include_pending: bool) -> _ReplayState:
26772710
message=f"wait_condition predicate raised: {exc}",
26782711
exception_type=type(exc).__name__,
26792712
)])
2680-
if satisfied or resolution == "satisfied":
2713+
if resolution == "satisfied":
2714+
terminal_condition_reopen_cmd = None
2715+
next_value = True
2716+
wait_yield_count += 1
2717+
break
2718+
if satisfied:
2719+
terminal_condition_reopen_cmd = (
2720+
cmd
2721+
if has_reopened_same_wait and consumed_reopen_for_current_wait
2722+
else None
2723+
)
26812724
next_value = True
26822725
wait_yield_count += 1
26832726
break
26842727

2685-
next_wait_index = wait_yield_count + 1
26862728
# A single logical wait can be re-opened in history after
26872729
# non-satisfying signals. Consume repeated physical opens
26882730
# with the same key/fingerprint before declaring the
26892731
# logical wait still pending.
2690-
if (
2691-
opened is not None
2692-
and next_wait_index < len(wait_opened)
2693-
and _same_logical_condition_wait(wait_opened[next_wait_index], cmd)
2694-
):
2732+
if has_reopened_same_wait:
2733+
consumed_reopen_for_current_wait = True
26952734
wait_yield_count = next_wait_index
26962735
continue
26972736

0 commit comments

Comments
 (0)