Skip to content

Commit c577675

Browse files
committed
fix: concurrency drain timer resumes outside lock
A map or parallel stays in the current invocation while at least one branch is still running. When one iteration keeps it alive and the other branches wait on short timers (a wait, a wait_for_condition poll, or a step retry backoff), those branches resume in-process as their timers come due. Before this commit, a single timer thread runs those resumes one at a time and holds its lock across a blocking checkpoint that refreshes state. Every resume costs one network round trip under the lock, and every branch trying to register its next wait queues behind it. When many timers come due together the timer thread falls behind, the invocation reaches its function timeout, and the backend reinvokes, so a map that should finish in seconds runs for minutes across several timed-out invocations. Holding the lock across the submit also allowed a latent self-deadlock, where a branch that finished inline reacquired the same lock on the timer thread through its done-callback. This commit holds the lock only long enough to take all due timers off the queue and mark them pending, then releases it and runs one shared refresh for the whole wave before handing the branches back to the worker pool. One round trip now serves the whole wave instead of one per resume, and new waits no longer queue behind a network call. The take and the mark stay atomic so a branch never looks parked while it is about to resume, which would otherwise suspend the whole operation by mistake. If the refresh fails, which happens only when the checkpoint subsystem has already failed and is terminal, the timer thread records that one error and re-raises it from execute() so the platform retries from the last checkpoint. Closes #473
1 parent cd51a60 commit c577675

2 files changed

Lines changed: 123 additions & 12 deletions

File tree

packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/concurrency/executor.py

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class TimerScheduler:
5959
"""Manage timed suspend tasks with a background timer thread."""
6060

6161
def __init__(
62-
self, resubmit_callback: Callable[[ExecutableWithState], None]
62+
self, resubmit_callback: Callable[[list[ExecutableWithState]], None]
6363
) -> None:
6464
self.resubmit_callback = resubmit_callback
6565
self._pending_resumes: list[tuple[float, int, ExecutableWithState]] = []
@@ -114,18 +114,31 @@ def _timer_loop(self) -> None:
114114

115115
current_time = time.time()
116116
if current_time >= next_resume_time:
117-
# Time to resume
117+
# Drain every due resume under the lock, transitioning each to
118+
# PENDING atomically with the pop. Keeping pop+reset_to_pending
119+
# together is required: should_execution_suspend reads branch
120+
# status without this lock, so an item that is removed from the
121+
# heap but still SUSPENDED_WITH_TIMEOUT could trigger a spurious
122+
# parent suspend.
123+
ready: list[ExecutableWithState] = []
118124
with self._lock:
119-
# no branch cover because hard to test reliably - this is a double-safety check if heap mutated
120-
# since the first peek on next_resume_time further up
121-
if ( # pragma: no branch
125+
while (
122126
self._pending_resumes
123127
and self._pending_resumes[0][0] <= current_time
124128
):
125129
_, _, exe_state = heapq.heappop(self._pending_resumes)
126130
if exe_state.can_resume:
127131
exe_state.reset_to_pending()
128-
self.resubmit_callback(exe_state)
132+
ready.append(exe_state)
133+
# Resubmit outside the lock. Only the heap pop and the PENDING
134+
# transition need the lock. The checkpoint refresh is a blocking
135+
# network call and the submit hands work to the pool, so running
136+
# them off the lock keeps timed resumes from serializing behind
137+
# the network round trip and keeps the timer thread from
138+
# re-entering this non-reentrant lock when a submitted future
139+
# completes inline and its done-callback calls schedule_resume.
140+
if ready:
141+
self.resubmit_callback(ready)
129142
else:
130143
# Wait until next resume time
131144
wait_time = min(next_resume_time - current_time, 0.1)
@@ -169,6 +182,7 @@ def __init__(
169182
# Event-driven state tracking for when the executor is done
170183
self._completion_event = threading.Event()
171184
self._suspend_exception: SuspendExecution | None = None
185+
self._resume_error: Exception | None = None
172186

173187
# ExecutionCounters will keep track of completion criteria and on-going counters
174188
min_successful = self.completion_config.min_successful or len(self.executables)
@@ -222,11 +236,32 @@ def execute(
222236
]
223237
self._completion_event.clear()
224238
self._suspend_exception = None
225-
226-
def resubmitter(executable_with_state: ExecutableWithState) -> None:
227-
"""Resubmit a timed suspended task."""
228-
execution_state.create_checkpoint()
229-
submit_task(executable_with_state)
239+
self._resume_error = None
240+
241+
def resubmitter(ready: list[ExecutableWithState]) -> None:
242+
"""Resubmit a wave of timed-suspended tasks.
243+
244+
One checkpoint refresh serves the whole due wave: the fetch returns
245+
all operations, so every resumed branch reads fresh state. The
246+
refresh only raises when the background checkpoint subsystem has
247+
failed, which is terminal for the whole execution, so record the
248+
error and wake the parent to re-raise it. Catching here keeps the
249+
single timer thread alive so a failure does not strand the other
250+
pending resumes.
251+
"""
252+
try:
253+
execution_state.create_checkpoint()
254+
except Exception as exc: # noqa: BLE001
255+
# resubmitter runs only on the single timer thread, so this
256+
# check-then-set needs no lock. First error wins: keep the
257+
# earliest failure if several waves fail before execute() reads
258+
# it (they are the same terminal checkpoint failure anyway).
259+
if self._resume_error is None: # pragma: no branch
260+
self._resume_error = exc
261+
self._completion_event.set()
262+
return
263+
for executable_with_state in ready:
264+
submit_task(executable_with_state)
230265

231266
thread_executor = ThreadPoolExecutor(max_workers=max_workers)
232267
try:
@@ -259,6 +294,12 @@ def on_done(future: Future) -> None:
259294
for future in futures:
260295
future.cancel()
261296

297+
# A timed resume failed to refresh state (terminal checkpoint
298+
# subsystem failure). Re-raise so the invocation fails and the
299+
# backend retries from the last durable checkpoint.
300+
if self._resume_error is not None:
301+
raise self._resume_error
302+
262303
# Suspend execution if everything done and at least one of the tasks raised a suspend exception.
263304
if self._suspend_exception:
264305
raise self._suspend_exception

packages/aws-durable-execution-sdk-python/tests/concurrency_test.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1370,6 +1370,74 @@ def execute_item(self, child_context, executable):
13701370
executor.execute(execution_state, executor_context)
13711371

13721372

1373+
def test_concurrent_executor_resume_checkpoint_failure_propagates():
1374+
"""A resume-time checkpoint refresh failure propagates out of execute().
1375+
1376+
Regression guard: the timer resubmit does a blocking checkpoint refresh.
1377+
That refresh only raises when the checkpoint subsystem has failed, which
1378+
is terminal. execute() must re-raise it (so the invocation fails and the
1379+
backend retries from the last durable checkpoint) rather than leave the
1380+
wave PENDING forever - the completion wait has no timeout, so a stranded
1381+
PENDING branch would hang the whole map.
1382+
"""
1383+
1384+
class TestExecutor(ConcurrentExecutor):
1385+
def __init__(self, *args, **kwargs):
1386+
super().__init__(*args, **kwargs)
1387+
self.calls: dict[int, int] = {}
1388+
self.long_runner_release = threading.Event()
1389+
1390+
def execute_item(self, child_context, executable):
1391+
task_id = executable.index
1392+
self.calls[task_id] = self.calls.get(task_id, 0) + 1
1393+
if task_id == 0:
1394+
# Long-runner keeps the map alive so task 1 resumes in-process.
1395+
self.long_runner_release.wait(timeout=5)
1396+
return "result_A"
1397+
# Task 1 suspends with a past timestamp -> immediate in-process resume.
1398+
msg = "resume-me"
1399+
raise TimedSuspendExecution(msg, time.time() - 1)
1400+
1401+
executables = [Executable(0, lambda: "task_A"), Executable(1, lambda: "task_B")]
1402+
completion_config = CompletionConfig(
1403+
min_successful=2,
1404+
tolerated_failure_count=None,
1405+
tolerated_failure_percentage=None,
1406+
)
1407+
1408+
executor = TestExecutor(
1409+
executables=executables,
1410+
max_concurrency=2,
1411+
completion_config=completion_config,
1412+
sub_type_top="TOP",
1413+
sub_type_iteration="ITER",
1414+
name_prefix="test_",
1415+
serdes=None,
1416+
)
1417+
1418+
execution_state = Mock()
1419+
1420+
def checkpoint(*args, **kwargs):
1421+
# The resume refresh calls create_checkpoint() with no arguments.
1422+
# Fail that call; leave the branches' own checkpoints as no-ops.
1423+
if not args and not kwargs:
1424+
msg = "resume refresh failed"
1425+
raise RuntimeError(msg)
1426+
1427+
execution_state.create_checkpoint = Mock(side_effect=checkpoint)
1428+
1429+
executor_context = Mock()
1430+
executor_context._create_step_id_for_logical_step = lambda *args: "1" # noqa: SLF001
1431+
child_context = Mock()
1432+
child_context.state.wrap_user_function = lambda func, *args, **kwargs: func
1433+
executor_context.create_child_context = lambda *args, **kwargs: child_context
1434+
1435+
# Must re-raise (not hang): the resume failure surfaces as the original error.
1436+
with pytest.raises(RuntimeError, match="resume refresh failed"):
1437+
executor.execute(execution_state, executor_context)
1438+
executor.long_runner_release.set()
1439+
1440+
13731441
def test_concurrent_executor_with_timed_resubmit_while_other_task_running():
13741442
"""Test timed resubmission while other tasks are still running."""
13751443

@@ -3200,7 +3268,9 @@ def test_timer_scheduler_fifo_ordering_with_same_timestamp():
32003268
items synchronously, so callback order is deterministic.
32013269
"""
32023270
results = []
3203-
resubmit_callback = Mock(side_effect=lambda exe: results.append(exe.index))
3271+
resubmit_callback = Mock(
3272+
side_effect=lambda batch: results.extend(exe.index for exe in batch)
3273+
)
32043274

32053275
with TimerScheduler(resubmit_callback) as scheduler:
32063276
# Use a past timestamp so they trigger immediately

0 commit comments

Comments
 (0)