Skip to content

Commit 669f69d

Browse files
Conformance finding: signals/queries smoke hits Python nondeterministic replay
1 parent 248d0d4 commit 669f69d

4 files changed

Lines changed: 72 additions & 14 deletions

File tree

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
those signals before the task's `ConditionWaitOpened` row. This avoids
1313
applying rapid signal batches to the previous wait and prevents replay from
1414
completing with a later wait/timer history step left unconsumed.
15+
- Condition-wait replay now prefers the recorded event-order wait binding over
16+
a signal's stored `workflow_sequence` when several signals arrive before a
17+
reopened wait. This handles server histories where later signals are accepted
18+
while the previous wait is still open but must replay against the next
19+
physical wait.
1520
- Workflow workers now report unhandled workflow-task execution errors back to
1621
the server instead of leaving the leased task pending until the lease or CLI
1722
wait times out. This lets the server observe and retry or fail the task

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "durable-workflow"
7-
version = "0.4.58"
7+
version = "0.4.59"
88
description = "Python SDK for the Durable Workflow server (language-neutral HTTP protocol)"
99
readme = "README.md"
1010
requires-python = ">=3.10"

src/durable_workflow/workflow.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2278,7 +2278,9 @@ def _receiver_condition_wait_bindings() -> dict[int, str | None]:
22782278
# When multiple signals arrive while the task woken by the
22792279
# first signal is still leased, the server records those
22802280
# later SignalReceived rows before the task's next
2281-
# ConditionWaitOpened row. Replay them at that next wait.
2281+
# ConditionWaitOpened row. Replay them at that next wait
2282+
# even if their stored workflow_sequence still points at
2283+
# the wait that was open when the signal was accepted.
22822284
for receiver_index in receivers_since_wait[1:]:
22832285
if receiver_index is not None:
22842286
bindings[receiver_index] = wait_id
@@ -2296,8 +2298,8 @@ def _receiver_condition_wait_bindings() -> dict[int, str | None]:
22962298
prefix_receivers.append(index)
22972299
continue
22982300

2299-
receivers_since_wait.append(None if explicit_sequence else index)
2300-
if len(receivers_since_wait) == 1 and not explicit_sequence:
2301+
receivers_since_wait.append(index)
2302+
if len(receivers_since_wait) == 1:
23012303
bindings[index] = current_wait_id
23022304
continue
23032305

@@ -2404,11 +2406,12 @@ def _receiver_condition_wait_bindings() -> dict[int, str | None]:
24042406
signal_name = payload.get("signal_name")
24052407
if isinstance(signal_name, str) and signal_name:
24062408
workflow_sequence = _workflow_sequence(payload)
2407-
condition_wait_id = (
2408-
condition_wait_ids_by_sequence.get(workflow_sequence)
2409-
if workflow_sequence is not None
2410-
else receiver_condition_wait_ids.get(event_index)
2411-
)
2409+
if event_index in receiver_condition_wait_ids:
2410+
condition_wait_id = receiver_condition_wait_ids[event_index]
2411+
elif workflow_sequence is not None:
2412+
condition_wait_id = condition_wait_ids_by_sequence.get(workflow_sequence)
2413+
else:
2414+
condition_wait_id = None
24122415
pending_receivers.append(_PendingReceiver(
24132416
result_index=len(resolved_results),
24142417
kind="signal",
@@ -2429,11 +2432,12 @@ def _receiver_condition_wait_bindings() -> dict[int, str | None]:
24292432
update_name = payload.get("update_name")
24302433
if isinstance(update_name, str) and update_name:
24312434
workflow_sequence = _workflow_sequence(payload)
2432-
condition_wait_id = (
2433-
condition_wait_ids_by_sequence.get(workflow_sequence)
2434-
if workflow_sequence is not None
2435-
else receiver_condition_wait_ids.get(event_index)
2436-
)
2435+
if event_index in receiver_condition_wait_ids:
2436+
condition_wait_id = receiver_condition_wait_ids[event_index]
2437+
elif workflow_sequence is not None:
2438+
condition_wait_id = condition_wait_ids_by_sequence.get(workflow_sequence)
2439+
else:
2440+
condition_wait_id = None
24372441
pending_receivers.append(_PendingReceiver(
24382442
result_index=len(resolved_results),
24392443
kind="update",

tests/test_wait_condition.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,55 @@ def test_signal_received_before_reopened_wait_uses_next_wait_when_sequence_absen
513513
assert outcome.commands[0].result == expected
514514
assert query_state(SignalCounterUntilFinished, history, [], "current") == expected
515515

516+
def test_signal_received_before_reopened_wait_uses_next_wait_when_sequence_is_stale(self) -> None:
517+
history = [
518+
{
519+
"event_type": "ConditionWaitOpened",
520+
"payload": {
521+
"condition_wait_id": "wait-count-3",
522+
"condition_key": "done",
523+
"sequence": 21,
524+
},
525+
},
526+
_signal_received_event("increment", [3], workflow_sequence=21),
527+
_signal_received_event("finish", [], workflow_sequence=21),
528+
{
529+
"event_type": "ConditionWaitOpened",
530+
"payload": {
531+
"condition_wait_id": "wait-finish",
532+
"condition_key": "done",
533+
"sequence": 22,
534+
"timeout_seconds": 30,
535+
},
536+
},
537+
{
538+
"event_type": "TimerScheduled",
539+
"payload": {
540+
"timer_kind": "condition_timeout",
541+
"condition_wait_id": "wait-finish",
542+
"condition_key": "done",
543+
"sequence": 22,
544+
"delay_seconds": 30,
545+
},
546+
},
547+
]
548+
549+
expected = {
550+
"count": 3,
551+
"done": True,
552+
"events": [
553+
{"signal": "increment", "amount": 3, "count": 3},
554+
{"signal": "finish", "count": 3},
555+
],
556+
}
557+
558+
outcome = replay(SignalCounterUntilFinished, history, [])
559+
560+
assert len(outcome.commands) == 1
561+
assert isinstance(outcome.commands[0], CompleteWorkflow)
562+
assert outcome.commands[0].result == expected
563+
assert query_state(SignalCounterUntilFinished, history, [], "current") == expected
564+
516565
def test_repeated_wait_after_activity_can_be_satisfied_by_later_signal(self) -> None:
517566
history = [
518567
{

0 commit comments

Comments
 (0)