Skip to content

Commit 2946440

Browse files
nadavelkabetsmergify[bot]
authored andcommitted
Bugfix: executor doesn't propagate exception from task that awaited a future (#1643)
* Schedule the original task when task awaits a future Signed-off-by: Nadav Elkabets <elnadav12@gmail.com> * Add MultiThreadedExecutor to test Signed-off-by: Nadav Elkabets <elnadav12@gmail.com> * Add tests for awaiting a done future and task cancellation during await Signed-off-by: Nadav Elkabets <elnadav12@gmail.com> * Removed unused variable Signed-off-by: Nadav Elkabets <elnadav12@gmail.com> --------- Signed-off-by: Nadav Elkabets <elnadav12@gmail.com> (cherry picked from commit aac0ebb)
1 parent 134b013 commit 2946440

3 files changed

Lines changed: 112 additions & 6 deletions

File tree

rclpy/rclpy/executors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,10 @@ def _wait_for_ready_callbacks(
740740
ready_tasks_count = len(self._ready_tasks)
741741
for _ in range(ready_tasks_count):
742742
task = self._ready_tasks.popleft()
743+
# Skip tasks that were cancelled or set done while awaiting a
744+
# future and got rescheduled when the future completed
745+
if task.cancelled() or task.done():
746+
continue
743747
task_data = self._pending_tasks[task]
744748
node = task_data.source_node
745749
if node is None or node in nodes_to_use:

rclpy/rclpy/task.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ def __init__(self, *, executor: Optional['Executor'] = None) -> None:
5151
# An exception raised by the handler when called
5252
self._exception: Optional[Exception] = None
5353
self._exception_fetched = False
54-
# callbacks to be scheduled after this task completes
55-
self._callbacks: List[Callable[['Future[T]'], None]] = []
54+
# callbacks or tasks to be scheduled after this task completes
55+
self._callbacks: List[Union[Callable[['Future[T]'], None], 'Task[Any]']] = []
5656
# Lock for threadsafety
5757
self._lock = threading.Lock()
5858
# An executor to use when scheduling done callbacks
@@ -165,10 +165,18 @@ def _schedule_or_invoke_done_callbacks(self) -> None:
165165
if executor is not None:
166166
# Have the executor take care of the callbacks
167167
for callback in callbacks:
168-
executor.create_task(callback, self)
168+
if isinstance(callback, Task):
169+
executor._call_task_in_next_spin(callback)
170+
else:
171+
executor.create_task(callback, self)
169172
else:
170173
# No executor, call right away
171174
for callback in callbacks:
175+
if isinstance(callback, Task):
176+
warnings.warn(
177+
'Dropping task awaiting future: '
178+
'executor reference could not be resolved')
179+
continue
172180
try:
173181
callback(self)
174182
except Exception as e:
@@ -210,6 +218,21 @@ def add_done_callback(self, callback: Callable[['Future[T]'], None]) -> None:
210218
if invoke:
211219
callback(self)
212220

221+
def _add_waiting_task(self, task: 'Task[Any]') -> None:
222+
"""Schedule a task to resume when this future completes."""
223+
with self._lock:
224+
if not self._pending():
225+
assert self._executor is not None
226+
executor = self._executor()
227+
if executor is not None:
228+
executor._call_task_in_next_spin(task)
229+
else:
230+
warnings.warn(
231+
'Dropping task awaiting future: '
232+
'executor reference could not be resolved')
233+
else:
234+
self._callbacks.append(task)
235+
213236
def remove_done_callback(self, callback: Callable[['Future[T]'], None]) -> bool:
214237
"""
215238
Remove a previously-added done callback.
@@ -352,9 +375,8 @@ def _add_resume_callback(self, future: Future[T], executor: 'Executor') -> None:
352375
elif future_executor is not executor:
353376
raise RuntimeError('A task can only await futures associated with the same executor')
354377

355-
# The future is associated with the same executor, so we can resume the task directly
356-
# in the done callback
357-
future.add_done_callback(lambda _: self.__call__())
378+
# Register the task to resume when the future is done or cancelled
379+
future._add_waiting_task(self)
358380

359381
def _complete_task(self) -> None:
360382
"""Cleanup after task finished."""

rclpy/test/test_executor.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,86 @@ async def coro2():
460460
self.assertTrue(future1.done())
461461
self.assertEqual('Sentinel Result 1', future1.result())
462462

463+
def test_coroutine_exception_after_await(self) -> None:
464+
"""Exception in a coroutine after awaiting a future must propagate."""
465+
self.assertIsNotNone(self.node.handle)
466+
# EventsExecutor excluded - segfaults on exception propagation (#1641)
467+
for cls in [SingleThreadedExecutor, MultiThreadedExecutor]:
468+
with self.subTest(cls=cls):
469+
executor = cls(context=self.context)
470+
executor.add_node(self.node)
471+
472+
first_fut = executor.create_future()
473+
second_fut = executor.create_future()
474+
475+
async def coro_that_raises() -> None:
476+
first_fut.set_result(None)
477+
await second_fut
478+
raise RuntimeError('Expected error after await')
479+
480+
task = executor.create_task(coro_that_raises)
481+
482+
executor.spin_until_future_complete(first_fut, timeout_sec=5)
483+
self.assertFalse(task.done())
484+
# Resolve the inner future — triggers resume
485+
second_fut.set_result(None)
486+
487+
with self.assertRaises(RuntimeError) as cm:
488+
executor.spin_until_future_complete(task, timeout_sec=5)
489+
self.assertIn('Expected error after await', str(cm.exception))
490+
491+
def test_cancel_task_while_awaiting_future(self) -> None:
492+
"""Cancelling a task parked on a future must not crash the dispatch loop."""
493+
self.assertIsNotNone(self.node.handle)
494+
# EventsExecutor excluded - see #1641
495+
for cls in [SingleThreadedExecutor, MultiThreadedExecutor]:
496+
with self.subTest(cls=cls):
497+
executor = cls(context=self.context)
498+
executor.add_node(self.node)
499+
500+
first_fut = executor.create_future()
501+
second_fut = executor.create_future()
502+
third_fut = executor.create_future()
503+
504+
async def coro() -> None:
505+
first_fut.set_result(None)
506+
await second_fut
507+
third_fut.set_result(None)
508+
509+
task = executor.create_task(coro)
510+
511+
executor.spin_until_future_complete(first_fut, timeout_sec=5)
512+
self.assertFalse(task.done())
513+
514+
task.cancel()
515+
self.assertTrue(task.cancelled())
516+
517+
second_fut.set_result(None)
518+
519+
executor.spin_until_future_complete(first_fut, timeout_sec=5)
520+
self.assertFalse(third_fut.done())
521+
522+
def test_await_already_completed_future(self) -> None:
523+
"""Awaiting an already-completed future must resume and return its result."""
524+
self.assertIsNotNone(self.node.handle)
525+
# EventsExecutor excluded - see #1641
526+
for cls in [SingleThreadedExecutor, MultiThreadedExecutor]:
527+
with self.subTest(cls=cls):
528+
executor = cls(context=self.context)
529+
executor.add_node(self.node)
530+
531+
fut: Future[str] = executor.create_future()
532+
fut.set_result('done') # complete before the task runs
533+
534+
async def coro() -> str:
535+
return await fut # type: ignore[return-value]
536+
537+
task = executor.create_task(coro)
538+
539+
executor.spin_until_future_complete(task, timeout_sec=5)
540+
self.assertTrue(task.done())
541+
self.assertEqual('done', task.result())
542+
463543
def test_create_task_during_spin(self) -> None:
464544
self.assertIsNotNone(self.node.handle)
465545
for cls in [SingleThreadedExecutor, EventsExecutor]:

0 commit comments

Comments
 (0)