Skip to content

Commit b59b71a

Browse files
committed
fix: concurrency TypeError on identical resume times
When multiple tasks are scheduled with identical resume_time values, heapq.heappush attempts to compare ExecutableWithState objects to break the tie, causing a TypeError since ExecutableWithState doesn't implement `__lt__`. This fix adds a monotonically increasing counter as a tie-breaker in the heap tuple structure. Changes: - Update _pending_resumes type hint to include counter: tuple[float, int, ExecutableWithState] - Add _schedule_counter initialization in `__init__` - Modify schedule_resume() to include counter in heap tuple and increment after each schedule - Update heappop unpacking in _timer_loop to handle 3-tuple - Add missing _shutdown Event initialization The counter ensures deterministic FIFO ordering when timestamps collide, preventing object comparison while maintaining predictable execution order. Tests added to verify: - No TypeError with identical timestamps - Counter increments correctly - FIFO ordering is maintained for same-timestamp tasks closes #271
1 parent 8e762fd commit b59b71a

2 files changed

Lines changed: 139 additions & 7 deletions

File tree

src/aws_durable_execution_sdk_python/concurrency/executor.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ def __init__(
5858
self, resubmit_callback: Callable[[ExecutableWithState], None]
5959
) -> None:
6060
self.resubmit_callback = resubmit_callback
61-
self._pending_resumes: list[tuple[float, ExecutableWithState]] = []
61+
self._pending_resumes: list[tuple[float, int, ExecutableWithState]] = []
6262
self._lock = threading.Lock()
63+
self._schedule_counter = 0
6364
self._shutdown = threading.Event()
6465
self._timer_thread = threading.Thread(target=self._timer_loop, daemon=True)
6566
self._timer_thread.start()
@@ -73,9 +74,18 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
7374
def schedule_resume(
7475
self, exe_state: ExecutableWithState, resume_time: float
7576
) -> None:
76-
"""Schedule a task to resume at the specified time."""
77+
"""Schedule a task to resume at the specified time.
78+
79+
Uses a counter as a tie-breaker to ensure FIFO ordering when multiple
80+
tasks have the same resume_time, preventing TypeError from comparing
81+
ExecutableWithState objects.
82+
"""
7783
with self._lock:
78-
heapq.heappush(self._pending_resumes, (resume_time, exe_state))
84+
heapq.heappush(
85+
self._pending_resumes,
86+
(resume_time, self._schedule_counter, exe_state),
87+
)
88+
self._schedule_counter += 1
7989

8090
def shutdown(self) -> None:
8191
"""Shutdown the timer thread and cancel all pending resumes."""
@@ -108,7 +118,7 @@ def _timer_loop(self) -> None:
108118
self._pending_resumes
109119
and self._pending_resumes[0][0] <= current_time
110120
):
111-
_, exe_state = heapq.heappop(self._pending_resumes)
121+
_, _, exe_state = heapq.heappop(self._pending_resumes)
112122
if exe_state.can_resume:
113123
exe_state.reset_to_pending()
114124
self.resubmit_callback(exe_state)

tests/concurrency_test.py

Lines changed: 125 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2817,7 +2817,10 @@ def task_func(ctx, item, idx, items):
28172817

28182818
items = list(range(100))
28192819
config = MapConfig(
2820-
max_concurrency=10, completion_config=CompletionConfig(min_successful=99)
2820+
max_concurrency=10,
2821+
completion_config=CompletionConfig(
2822+
min_successful=99, tolerated_failure_count=1
2823+
),
28212824
)
28222825

28232826
executor = MapExecutor.from_items(items=items, func=task_func, config=config)
@@ -2830,8 +2833,8 @@ def task_func(ctx, item, idx, items):
28302833

28312834
result = executor.execute(execution_state, executor_context)
28322835

2833-
# With concurrency=1, only 2 tasks should execute before terminating
2834-
# min_successful(99) + failure_count(2) = 101 > total_tasks(100)
2836+
# With tolerated_failure_count=1, executor stops when failure_count > 1 (at 2 failures)
2837+
# Executor terminates early rather than executing all 100 tasks
28352838
assert executed_count["value"] < 100
28362839
assert (
28372840
result.completion_reason == CompletionReason.FAILURE_TOLERANCE_EXCEEDED
@@ -3031,3 +3034,122 @@ def slow_func():
30313034
assert result.failure_count == 0
30323035
assert result.started_count == 1
30333036
assert result.total_count == 2
3037+
3038+
3039+
# region TimerScheduler edge cases with exact same reschedule time
3040+
3041+
3042+
def test_timer_scheduler_same_timestamp_with_counter_tiebreaker():
3043+
"""
3044+
Test that scheduling two tasks with the exact same resume_time works.
3045+
3046+
This verifies the fix where a counter is used as a tie-breaker to prevent
3047+
TypeError when heapq tries to compare ExecutableWithState objects.
3048+
"""
3049+
resubmit_callback = Mock()
3050+
3051+
with TimerScheduler(resubmit_callback) as scheduler:
3052+
# Create two different ExecutableWithState objects
3053+
exe_state1 = ExecutableWithState(Executable(index=0, func=lambda: "test1"))
3054+
exe_state2 = ExecutableWithState(Executable(index=1, func=lambda: "test2"))
3055+
3056+
# Use the exact same timestamp for both
3057+
same_timestamp = time.time() + 10.0
3058+
3059+
# Both schedules should work fine now
3060+
scheduler.schedule_resume(exe_state1, same_timestamp)
3061+
scheduler.schedule_resume(exe_state2, same_timestamp)
3062+
3063+
# Verify both are in the heap
3064+
assert len(scheduler._pending_resumes) == 2 # noqa: SLF001
3065+
3066+
# Verify FIFO ordering (first scheduled should be first in heap)
3067+
first_item = scheduler._pending_resumes[0] # noqa: SLF001
3068+
assert first_item[0] == same_timestamp # timestamp
3069+
assert first_item[1] == 0 # counter (first scheduled)
3070+
assert first_item[2] == exe_state1 # first exe_state
3071+
3072+
3073+
def test_timer_scheduler_multiple_same_timestamps():
3074+
"""
3075+
Test that scheduling many tasks with the same timestamp works correctly.
3076+
3077+
Verifies FIFO ordering is maintained when multiple tasks have identical timestamps.
3078+
"""
3079+
resubmit_callback = Mock()
3080+
3081+
with TimerScheduler(resubmit_callback) as scheduler:
3082+
same_timestamp = time.time() + 10.0
3083+
3084+
# Create and schedule 10 tasks with the same timestamp
3085+
exe_states = [
3086+
ExecutableWithState(Executable(index=i, func=lambda i=i: f"test{i}"))
3087+
for i in range(10)
3088+
]
3089+
3090+
for exe_state in exe_states:
3091+
scheduler.schedule_resume(exe_state, same_timestamp)
3092+
3093+
# All should be scheduled successfully
3094+
assert len(scheduler._pending_resumes) == 10 # noqa: SLF001
3095+
3096+
# Verify the heap maintains proper ordering
3097+
# The first item should have counter 0
3098+
assert scheduler._pending_resumes[0][1] == 0 # noqa: SLF001
3099+
3100+
3101+
def test_timer_scheduler_counter_increments():
3102+
"""Test that the schedule counter increments correctly."""
3103+
resubmit_callback = Mock()
3104+
3105+
with TimerScheduler(resubmit_callback) as scheduler:
3106+
exe_state1 = ExecutableWithState(Executable(0, lambda: "test1"))
3107+
exe_state2 = ExecutableWithState(Executable(1, lambda: "test2"))
3108+
exe_state3 = ExecutableWithState(Executable(2, lambda: "test3"))
3109+
3110+
# Schedule with different times
3111+
scheduler.schedule_resume(exe_state1, time.time() + 1.0)
3112+
scheduler.schedule_resume(exe_state2, time.time() + 2.0)
3113+
scheduler.schedule_resume(exe_state3, time.time() + 3.0)
3114+
3115+
# Counter should have incremented to 3
3116+
assert scheduler._schedule_counter == 3 # noqa: SLF001
3117+
3118+
3119+
def test_timer_scheduler_fifo_ordering_with_same_timestamp():
3120+
"""
3121+
Test that FIFO ordering is maintained when timestamps are equal.
3122+
3123+
When multiple tasks have the same timestamp, they should be processed
3124+
in the order they were scheduled (FIFO). The timer thread processes
3125+
items synchronously, so callback order is deterministic.
3126+
"""
3127+
results = []
3128+
resubmit_callback = Mock(side_effect=lambda exe: results.append(exe.index))
3129+
3130+
with TimerScheduler(resubmit_callback) as scheduler:
3131+
# Use a past timestamp so they trigger immediately
3132+
past_time = time.time() - 0.1
3133+
3134+
exe_state1 = ExecutableWithState(Executable(0, lambda: "first"))
3135+
exe_state2 = ExecutableWithState(Executable(1, lambda: "second"))
3136+
exe_state3 = ExecutableWithState(Executable(2, lambda: "third"))
3137+
3138+
# Make them all resumable
3139+
exe_state1.suspend()
3140+
exe_state2.suspend()
3141+
exe_state3.suspend()
3142+
3143+
# Schedule all with same timestamp
3144+
scheduler.schedule_resume(exe_state1, past_time)
3145+
scheduler.schedule_resume(exe_state2, past_time)
3146+
scheduler.schedule_resume(exe_state3, past_time)
3147+
3148+
# Wait for timer thread to process them
3149+
time.sleep(0.3)
3150+
3151+
# Verify FIFO order - they should be resubmitted in order 0, 1, 2
3152+
assert results == [0, 1, 2]
3153+
3154+
3155+
# endregion TimerScheduler edge cases with exact same reschedule time

0 commit comments

Comments
 (0)