Skip to content

Commit e96b27a

Browse files
committed
fix: concurrency TypeError on identical resume times
When multiple tasks are scheduled with identical resume_time values, heapq.heappush attempts to compare ExecutableWithState objects to break the tie, causing a TypeError since ExecutableWithState doesn't implement `__lt__`. This fix adds a monotonically increasing counter as a tie-breaker in the heap tuple structure. Changes: - Update _pending_resumes type hint to include counter: tuple[float, int, ExecutableWithState] - Add _schedule_counter initialization in `__init__` - Modify schedule_resume() to include counter in heap tuple and increment after each schedule - Update heappop unpacking in _timer_loop to handle 3-tuple - Add missing _shutdown Event initialization The counter ensures deterministic FIFO ordering when timestamps collide, preventing object comparison while maintaining predictable execution order. Tests added to verify: - No TypeError with identical timestamps - Counter increments correctly - FIFO ordering is maintained for same-timestamp tasks closes #271
1 parent 8e762fd commit e96b27a

2 files changed

Lines changed: 145 additions & 17 deletions

File tree

src/aws_durable_execution_sdk_python/concurrency/executor.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ def __init__(
5858
self, resubmit_callback: Callable[[ExecutableWithState], None]
5959
) -> None:
6060
self.resubmit_callback = resubmit_callback
61-
self._pending_resumes: list[tuple[float, ExecutableWithState]] = []
61+
self._pending_resumes: list[tuple[float, int, ExecutableWithState]] = []
6262
self._lock = threading.Lock()
63+
self._schedule_counter = 0
6364
self._shutdown = threading.Event()
6465
self._timer_thread = threading.Thread(target=self._timer_loop, daemon=True)
6566
self._timer_thread.start()
@@ -73,9 +74,18 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
7374
def schedule_resume(
7475
self, exe_state: ExecutableWithState, resume_time: float
7576
) -> None:
76-
"""Schedule a task to resume at the specified time."""
77+
"""Schedule a task to resume at the specified time.
78+
79+
Uses a counter as a tie-breaker to ensure FIFO ordering when multiple
80+
tasks have the same resume_time, preventing TypeError from comparing
81+
ExecutableWithState objects.
82+
"""
7783
with self._lock:
78-
heapq.heappush(self._pending_resumes, (resume_time, exe_state))
84+
heapq.heappush(
85+
self._pending_resumes,
86+
(resume_time, self._schedule_counter, exe_state),
87+
)
88+
self._schedule_counter += 1
7989

8090
def shutdown(self) -> None:
8191
"""Shutdown the timer thread and cancel all pending resumes."""
@@ -108,7 +118,7 @@ def _timer_loop(self) -> None:
108118
self._pending_resumes
109119
and self._pending_resumes[0][0] <= current_time
110120
):
111-
_, exe_state = heapq.heappop(self._pending_resumes)
121+
_, _, exe_state = heapq.heappop(self._pending_resumes)
112122
if exe_state.can_resume:
113123
exe_state.reset_to_pending()
114124
self.resubmit_callback(exe_state)

tests/concurrency_test.py

Lines changed: 131 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2810,14 +2810,15 @@ def test_executor_terminates_quickly_when_impossible_to_succeed():
28102810

28112811
def task_func(ctx, item, idx, items):
28122812
executed_count["value"] += 1
2813-
if idx < 2:
2814-
raise Exception(f"fail_{idx}") # noqa EM102 TRY002
2813+
if idx == 0:
2814+
raise Exception("fail_0") # noqa EM101 TRY002
28152815
time.sleep(0.05)
28162816
return f"ok_{idx}"
28172817

2818-
items = list(range(100))
2818+
items = list(range(10))
28192819
config = MapConfig(
2820-
max_concurrency=10, completion_config=CompletionConfig(min_successful=99)
2820+
max_concurrency=1, # Sequential execution ensures deterministic ordering
2821+
completion_config=CompletionConfig(min_successful=10), # Need all 10 to succeed
28212822
)
28222823

28232824
executor = MapExecutor.from_items(items=items, func=task_func, config=config)
@@ -2830,16 +2831,14 @@ def task_func(ctx, item, idx, items):
28302831

28312832
result = executor.execute(execution_state, executor_context)
28322833

2833-
# With concurrency=1, only 2 tasks should execute before terminating
2834-
# min_successful(99) + failure_count(2) = 101 > total_tasks(100)
2835-
assert executed_count["value"] < 100
2834+
# After 1 failure, only 9 tasks remain but need 10 - impossible
2835+
# Should terminate immediately
28362836
assert (
2837-
result.completion_reason == CompletionReason.FAILURE_TOLERANCE_EXCEEDED
2838-
), executed_count
2839-
assert sum(1 for item in result.all if item.status == BatchItemStatus.FAILED) == 2
2840-
assert (
2841-
sum(1 for item in result.all if item.status == BatchItemStatus.SUCCEEDED) < 98
2842-
)
2837+
executed_count["value"] == 1
2838+
), f"Should execute only 1 task, executed {executed_count['value']}"
2839+
assert result.completion_reason == CompletionReason.FAILURE_TOLERANCE_EXCEEDED
2840+
assert sum(1 for item in result.all if item.status == BatchItemStatus.FAILED) == 1
2841+
assert sum(1 for item in result.all if item.status == BatchItemStatus.STARTED) == 9
28432842

28442843

28452844
def test_executor_exits_early_with_min_successful():
@@ -3031,3 +3030,122 @@ def slow_func():
30313030
assert result.failure_count == 0
30323031
assert result.started_count == 1
30333032
assert result.total_count == 2
3033+
3034+
3035+
# region TimerScheduler edge cases with exact same reschedule time
3036+
3037+
3038+
def test_timer_scheduler_same_timestamp_with_counter_tiebreaker():
3039+
"""
3040+
Test that scheduling two tasks with the exact same resume_time works.
3041+
3042+
This verifies the fix where a counter is used as a tie-breaker to prevent
3043+
TypeError when heapq tries to compare ExecutableWithState objects.
3044+
"""
3045+
resubmit_callback = Mock()
3046+
3047+
with TimerScheduler(resubmit_callback) as scheduler:
3048+
# Create two different ExecutableWithState objects
3049+
exe_state1 = ExecutableWithState(Executable(index=0, func=lambda: "test1"))
3050+
exe_state2 = ExecutableWithState(Executable(index=1, func=lambda: "test2"))
3051+
3052+
# Use the exact same timestamp for both
3053+
same_timestamp = time.time() + 10.0
3054+
3055+
# Both schedules should work fine now
3056+
scheduler.schedule_resume(exe_state1, same_timestamp)
3057+
scheduler.schedule_resume(exe_state2, same_timestamp)
3058+
3059+
# Verify both are in the heap
3060+
assert len(scheduler._pending_resumes) == 2 # noqa: SLF001
3061+
3062+
# Verify FIFO ordering (first scheduled should be first in heap)
3063+
first_item = scheduler._pending_resumes[0] # noqa: SLF001
3064+
assert first_item[0] == same_timestamp # timestamp
3065+
assert first_item[1] == 0 # counter (first scheduled)
3066+
assert first_item[2] == exe_state1 # first exe_state
3067+
3068+
3069+
def test_timer_scheduler_multiple_same_timestamps():
3070+
"""
3071+
Test that scheduling many tasks with the same timestamp works correctly.
3072+
3073+
Verifies FIFO ordering is maintained when multiple tasks have identical timestamps.
3074+
"""
3075+
resubmit_callback = Mock()
3076+
3077+
with TimerScheduler(resubmit_callback) as scheduler:
3078+
same_timestamp = time.time() + 10.0
3079+
3080+
# Create and schedule 10 tasks with the same timestamp
3081+
exe_states = [
3082+
ExecutableWithState(Executable(index=i, func=lambda i=i: f"test{i}"))
3083+
for i in range(10)
3084+
]
3085+
3086+
for exe_state in exe_states:
3087+
scheduler.schedule_resume(exe_state, same_timestamp)
3088+
3089+
# All should be scheduled successfully
3090+
assert len(scheduler._pending_resumes) == 10 # noqa: SLF001
3091+
3092+
# Verify the heap maintains proper ordering
3093+
# The first item should have counter 0
3094+
assert scheduler._pending_resumes[0][1] == 0 # noqa: SLF001
3095+
3096+
3097+
def test_timer_scheduler_counter_increments():
3098+
"""Test that the schedule counter increments correctly."""
3099+
resubmit_callback = Mock()
3100+
3101+
with TimerScheduler(resubmit_callback) as scheduler:
3102+
exe_state1 = ExecutableWithState(Executable(0, lambda: "test1"))
3103+
exe_state2 = ExecutableWithState(Executable(1, lambda: "test2"))
3104+
exe_state3 = ExecutableWithState(Executable(2, lambda: "test3"))
3105+
3106+
# Schedule with different times
3107+
scheduler.schedule_resume(exe_state1, time.time() + 1.0)
3108+
scheduler.schedule_resume(exe_state2, time.time() + 2.0)
3109+
scheduler.schedule_resume(exe_state3, time.time() + 3.0)
3110+
3111+
# Counter should have incremented to 3
3112+
assert scheduler._schedule_counter == 3 # noqa: SLF001
3113+
3114+
3115+
def test_timer_scheduler_fifo_ordering_with_same_timestamp():
3116+
"""
3117+
Test that FIFO ordering is maintained when timestamps are equal.
3118+
3119+
When multiple tasks have the same timestamp, they should be processed
3120+
in the order they were scheduled (FIFO). The timer thread processes
3121+
items synchronously, so callback order is deterministic.
3122+
"""
3123+
results = []
3124+
resubmit_callback = Mock(side_effect=lambda exe: results.append(exe.index))
3125+
3126+
with TimerScheduler(resubmit_callback) as scheduler:
3127+
# Use a past timestamp so they trigger immediately
3128+
past_time = time.time() - 0.1
3129+
3130+
exe_state1 = ExecutableWithState(Executable(0, lambda: "first"))
3131+
exe_state2 = ExecutableWithState(Executable(1, lambda: "second"))
3132+
exe_state3 = ExecutableWithState(Executable(2, lambda: "third"))
3133+
3134+
# Make them all resumable
3135+
exe_state1.suspend()
3136+
exe_state2.suspend()
3137+
exe_state3.suspend()
3138+
3139+
# Schedule all with same timestamp
3140+
scheduler.schedule_resume(exe_state1, past_time)
3141+
scheduler.schedule_resume(exe_state2, past_time)
3142+
scheduler.schedule_resume(exe_state3, past_time)
3143+
3144+
# Wait for timer thread to process them
3145+
time.sleep(0.3)
3146+
3147+
# Verify FIFO order - they should be resubmitted in order 0, 1, 2
3148+
assert results == [0, 1, 2]
3149+
3150+
3151+
# endregion TimerScheduler edge cases with exact same reschedule time

0 commit comments

Comments
 (0)