Skip to content

Commit 038c011

Browse files
committed
fix: concurrency TypeError on identical resume times
When multiple tasks are scheduled with identical resume_time values, heapq.heappush attempts to compare ExecutableWithState objects to break the tie, causing a TypeError since ExecutableWithState doesn't implement `__lt__`. This fix adds a monotonically increasing counter as a tie-breaker in the heap tuple structure. Changes: - Update _pending_resumes type hint to include counter: tuple[float, int, ExecutableWithState] - Add _schedule_counter initialization in `__init__` - Modify schedule_resume() to include counter in heap tuple and increment after each schedule - Update heappop unpacking in _timer_loop to handle 3-tuple - Add missing _shutdown Event initialization The counter ensures deterministic FIFO ordering when timestamps collide, preventing object comparison while maintaining predictable execution order. Tests added to verify: - No TypeError with identical timestamps - Counter increments correctly - FIFO ordering is maintained for same-timestamp tasks closes #271
1 parent 8e762fd commit 038c011

2 files changed

Lines changed: 133 additions & 4 deletions

File tree

src/aws_durable_execution_sdk_python/concurrency/executor.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ def __init__(
5858
self, resubmit_callback: Callable[[ExecutableWithState], None]
5959
) -> None:
6060
self.resubmit_callback = resubmit_callback
61-
self._pending_resumes: list[tuple[float, ExecutableWithState]] = []
61+
self._pending_resumes: list[tuple[float, int, ExecutableWithState]] = []
6262
self._lock = threading.Lock()
63+
self._schedule_counter = 0
6364
self._shutdown = threading.Event()
6465
self._timer_thread = threading.Thread(target=self._timer_loop, daemon=True)
6566
self._timer_thread.start()
@@ -73,9 +74,18 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
7374
def schedule_resume(
7475
self, exe_state: ExecutableWithState, resume_time: float
7576
) -> None:
76-
"""Schedule a task to resume at the specified time."""
77+
"""Schedule a task to resume at the specified time.
78+
79+
Uses a counter as a tie-breaker to ensure FIFO ordering when multiple
80+
tasks have the same resume_time, preventing TypeError from comparing
81+
ExecutableWithState objects.
82+
"""
7783
with self._lock:
78-
heapq.heappush(self._pending_resumes, (resume_time, exe_state))
84+
heapq.heappush(
85+
self._pending_resumes,
86+
(resume_time, self._schedule_counter, exe_state),
87+
)
88+
self._schedule_counter += 1
7989

8090
def shutdown(self) -> None:
8191
"""Shutdown the timer thread and cancel all pending resumes."""
@@ -108,7 +118,7 @@ def _timer_loop(self) -> None:
108118
self._pending_resumes
109119
and self._pending_resumes[0][0] <= current_time
110120
):
111-
_, exe_state = heapq.heappop(self._pending_resumes)
121+
_, _, exe_state = heapq.heappop(self._pending_resumes)
112122
if exe_state.can_resume:
113123
exe_state.reset_to_pending()
114124
self.resubmit_callback(exe_state)

tests/concurrency_test.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3031,3 +3031,122 @@ def slow_func():
30313031
assert result.failure_count == 0
30323032
assert result.started_count == 1
30333033
assert result.total_count == 2
3034+
3035+
3036+
# region TimerScheduler edge cases with exact same reschedule time
3037+
3038+
3039+
def test_timer_scheduler_same_timestamp_with_counter_tiebreaker():
3040+
"""
3041+
Test that scheduling two tasks with the exact same resume_time works.
3042+
3043+
This verifies the fix where a counter is used as a tie-breaker to prevent
3044+
TypeError when heapq tries to compare ExecutableWithState objects.
3045+
"""
3046+
resubmit_callback = Mock()
3047+
3048+
with TimerScheduler(resubmit_callback) as scheduler:
3049+
# Create two different ExecutableWithState objects
3050+
exe_state1 = ExecutableWithState(Executable(index=0, func=lambda: "test1"))
3051+
exe_state2 = ExecutableWithState(Executable(index=1, func=lambda: "test2"))
3052+
3053+
# Use the exact same timestamp for both
3054+
same_timestamp = time.time() + 10.0
3055+
3056+
# Both schedules should work fine now
3057+
scheduler.schedule_resume(exe_state1, same_timestamp)
3058+
scheduler.schedule_resume(exe_state2, same_timestamp)
3059+
3060+
# Verify both are in the heap
3061+
assert len(scheduler._pending_resumes) == 2 # noqa: SLF001
3062+
3063+
# Verify FIFO ordering (first scheduled should be first in heap)
3064+
first_item = scheduler._pending_resumes[0] # noqa: SLF001
3065+
assert first_item[0] == same_timestamp # timestamp
3066+
assert first_item[1] == 0 # counter (first scheduled)
3067+
assert first_item[2] == exe_state1 # first exe_state
3068+
3069+
3070+
def test_timer_scheduler_multiple_same_timestamps():
3071+
"""
3072+
Test that scheduling many tasks with the same timestamp works correctly.
3073+
3074+
Verifies FIFO ordering is maintained when multiple tasks have identical timestamps.
3075+
"""
3076+
resubmit_callback = Mock()
3077+
3078+
with TimerScheduler(resubmit_callback) as scheduler:
3079+
same_timestamp = time.time() + 10.0
3080+
3081+
# Create and schedule 10 tasks with the same timestamp
3082+
exe_states = [
3083+
ExecutableWithState(Executable(index=i, func=lambda i=i: f"test{i}"))
3084+
for i in range(10)
3085+
]
3086+
3087+
for exe_state in exe_states:
3088+
scheduler.schedule_resume(exe_state, same_timestamp)
3089+
3090+
# All should be scheduled successfully
3091+
assert len(scheduler._pending_resumes) == 10 # noqa: SLF001
3092+
3093+
# Verify the heap maintains proper ordering
3094+
# The first item should have counter 0
3095+
assert scheduler._pending_resumes[0][1] == 0 # noqa: SLF001
3096+
3097+
3098+
def test_timer_scheduler_counter_increments():
3099+
"""Test that the schedule counter increments correctly."""
3100+
resubmit_callback = Mock()
3101+
3102+
with TimerScheduler(resubmit_callback) as scheduler:
3103+
exe_state1 = ExecutableWithState(Executable(0, lambda: "test1"))
3104+
exe_state2 = ExecutableWithState(Executable(1, lambda: "test2"))
3105+
exe_state3 = ExecutableWithState(Executable(2, lambda: "test3"))
3106+
3107+
# Schedule with different times
3108+
scheduler.schedule_resume(exe_state1, time.time() + 1.0)
3109+
scheduler.schedule_resume(exe_state2, time.time() + 2.0)
3110+
scheduler.schedule_resume(exe_state3, time.time() + 3.0)
3111+
3112+
# Counter should have incremented to 3
3113+
assert scheduler._schedule_counter == 3 # noqa: SLF001
3114+
3115+
3116+
def test_timer_scheduler_fifo_ordering_with_same_timestamp():
3117+
"""
3118+
Test that FIFO ordering is maintained when timestamps are equal.
3119+
3120+
When multiple tasks have the same timestamp, they should be processed
3121+
in the order they were scheduled (FIFO). The timer thread processes
3122+
items synchronously, so callback order is deterministic.
3123+
"""
3124+
results = []
3125+
resubmit_callback = Mock(side_effect=lambda exe: results.append(exe.index))
3126+
3127+
with TimerScheduler(resubmit_callback) as scheduler:
3128+
# Use a past timestamp so they trigger immediately
3129+
past_time = time.time() - 0.1
3130+
3131+
exe_state1 = ExecutableWithState(Executable(0, lambda: "first"))
3132+
exe_state2 = ExecutableWithState(Executable(1, lambda: "second"))
3133+
exe_state3 = ExecutableWithState(Executable(2, lambda: "third"))
3134+
3135+
# Make them all resumable
3136+
exe_state1.suspend()
3137+
exe_state2.suspend()
3138+
exe_state3.suspend()
3139+
3140+
# Schedule all with same timestamp
3141+
scheduler.schedule_resume(exe_state1, past_time)
3142+
scheduler.schedule_resume(exe_state2, past_time)
3143+
scheduler.schedule_resume(exe_state3, past_time)
3144+
3145+
# Wait for timer thread to process them
3146+
time.sleep(0.3)
3147+
3148+
# Verify FIFO order - they should be resubmitted in order 0, 1, 2
3149+
assert results == [0, 1, 2]
3150+
3151+
3152+
# endregion TimerScheduler edge cases with exact same reschedule time

0 commit comments

Comments
 (0)