Skip to content

Commit cd1385f

Browse files
nadavelkabetsmergify[bot]
authored andcommitted
Bugfix: executor doesn't propagate exception from task that awaited a future (#1643)
* Schedule the original task when task awaits a future Signed-off-by: Nadav Elkabets <elnadav12@gmail.com> * Add MultiThreadedExecutor to test Signed-off-by: Nadav Elkabets <elnadav12@gmail.com> * Add tests for awaiting a done future and task cancellation during await Signed-off-by: Nadav Elkabets <elnadav12@gmail.com> * Removed unused variable Signed-off-by: Nadav Elkabets <elnadav12@gmail.com> --------- Signed-off-by: Nadav Elkabets <elnadav12@gmail.com> (cherry picked from commit aac0ebb) # Conflicts: # rclpy/rclpy/task.py # rclpy/test/test_executor.py
1 parent baf9d72 commit cd1385f

3 files changed

Lines changed: 123 additions & 4 deletions

File tree

rclpy/rclpy/executors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,10 @@ def _wait_for_ready_callbacks(
661661
ready_tasks_count = len(self._ready_tasks)
662662
for _ in range(ready_tasks_count):
663663
task = self._ready_tasks.popleft()
664+
# Skip tasks that were cancelled or set done while awaiting a
665+
# future and got rescheduled when the future completed
666+
if task.cancelled() or task.done():
667+
continue
664668
task_data = self._pending_tasks[task]
665669
node = task_data.source_node
666670
if node is None or node in nodes_to_use:

rclpy/rclpy/task.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,13 @@ def __init__(self, *, executor=None):
4444
# An exception raised by the handler when called
4545
self._exception = None
4646
self._exception_fetched = False
47+
<<<<<<< HEAD
4748
# callbacks to be scheduled after this task completes
4849
self._callbacks = []
50+
=======
51+
# callbacks or tasks to be scheduled after this task completes
52+
self._callbacks: List[Union[Callable[['Future[T]'], None], 'Task[Any]']] = []
53+
>>>>>>> aac0ebb (Bugfix: executor doesn't propagate exception from task that awaited a future (#1643))
4954
# Lock for threadsafety
5055
self._lock = threading.Lock()
5156
# An executor to use when scheduling done callbacks
@@ -157,10 +162,18 @@ def _schedule_or_invoke_done_callbacks(self):
157162
if executor is not None:
158163
# Have the executor take care of the callbacks
159164
for callback in callbacks:
160-
executor.create_task(callback, self)
165+
if isinstance(callback, Task):
166+
executor._call_task_in_next_spin(callback)
167+
else:
168+
executor.create_task(callback, self)
161169
else:
162170
# No executor, call right away
163171
for callback in callbacks:
172+
if isinstance(callback, Task):
173+
warnings.warn(
174+
'Dropping task awaiting future: '
175+
'executor reference could not be resolved')
176+
continue
164177
try:
165178
callback(self)
166179
except Exception as e:
@@ -201,7 +214,26 @@ def add_done_callback(self, callback):
201214
if invoke:
202215
callback(self)
203216

217+
<<<<<<< HEAD
204218
def remove_done_callback(self, callback: Callable[['Future'], None]) -> bool:
219+
=======
220+
def _add_waiting_task(self, task: 'Task[Any]') -> None:
221+
"""Schedule a task to resume when this future completes."""
222+
with self._lock:
223+
if not self._pending():
224+
assert self._executor is not None
225+
executor = self._executor()
226+
if executor is not None:
227+
executor._call_task_in_next_spin(task)
228+
else:
229+
warnings.warn(
230+
'Dropping task awaiting future: '
231+
'executor reference could not be resolved')
232+
else:
233+
self._callbacks.append(task)
234+
235+
def remove_done_callback(self, callback: Callable[['Future[T]'], None]) -> bool:
236+
>>>>>>> aac0ebb (Bugfix: executor doesn't propagate exception from task that awaited a future (#1643))
205237
"""
206238
Remove a previously-added done callback.
207239
@@ -317,9 +349,8 @@ def _add_resume_callback(self, future: Future, executor) -> None:
317349
elif future_executor is not executor:
318350
raise RuntimeError('A task can only await futures associated with the same executor')
319351

320-
# The future is associated with the same executor, so we can resume the task directly
321-
# in the done callback
322-
future.add_done_callback(lambda _: self.__call__())
352+
# Register the task to resume when the future is done or cancelled
353+
future._add_waiting_task(self)
323354

324355
def _complete_task(self) -> None:
325356
"""Cleanup after task finished."""

rclpy/test/test_executor.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,91 @@ async def coro2():
414414
self.assertTrue(future1.done())
415415
self.assertEqual('Sentinel Result 1', future1.result())
416416

417+
<<<<<<< HEAD
417418
def test_create_task_during_spin(self):
419+
=======
420+
def test_coroutine_exception_after_await(self) -> None:
421+
"""Exception in a coroutine after awaiting a future must propagate."""
422+
self.assertIsNotNone(self.node.handle)
423+
# EventsExecutor excluded - segfaults on exception propagation (#1641)
424+
for cls in [SingleThreadedExecutor, MultiThreadedExecutor]:
425+
with self.subTest(cls=cls):
426+
executor = cls(context=self.context)
427+
executor.add_node(self.node)
428+
429+
first_fut = executor.create_future()
430+
second_fut = executor.create_future()
431+
432+
async def coro_that_raises() -> None:
433+
first_fut.set_result(None)
434+
await second_fut
435+
raise RuntimeError('Expected error after await')
436+
437+
task = executor.create_task(coro_that_raises)
438+
439+
executor.spin_until_future_complete(first_fut, timeout_sec=5)
440+
self.assertFalse(task.done())
441+
# Resolve the inner future — triggers resume
442+
second_fut.set_result(None)
443+
444+
with self.assertRaises(RuntimeError) as cm:
445+
executor.spin_until_future_complete(task, timeout_sec=5)
446+
self.assertIn('Expected error after await', str(cm.exception))
447+
448+
def test_cancel_task_while_awaiting_future(self) -> None:
449+
"""Cancelling a task parked on a future must not crash the dispatch loop."""
450+
self.assertIsNotNone(self.node.handle)
451+
# EventsExecutor excluded - see #1641
452+
for cls in [SingleThreadedExecutor, MultiThreadedExecutor]:
453+
with self.subTest(cls=cls):
454+
executor = cls(context=self.context)
455+
executor.add_node(self.node)
456+
457+
first_fut = executor.create_future()
458+
second_fut = executor.create_future()
459+
third_fut = executor.create_future()
460+
461+
async def coro() -> None:
462+
first_fut.set_result(None)
463+
await second_fut
464+
third_fut.set_result(None)
465+
466+
task = executor.create_task(coro)
467+
468+
executor.spin_until_future_complete(first_fut, timeout_sec=5)
469+
self.assertFalse(task.done())
470+
471+
task.cancel()
472+
self.assertTrue(task.cancelled())
473+
474+
second_fut.set_result(None)
475+
476+
executor.spin_until_future_complete(first_fut, timeout_sec=5)
477+
self.assertFalse(third_fut.done())
478+
479+
def test_await_already_completed_future(self) -> None:
480+
"""Awaiting an already-completed future must resume and return its result."""
481+
self.assertIsNotNone(self.node.handle)
482+
# EventsExecutor excluded - see #1641
483+
for cls in [SingleThreadedExecutor, MultiThreadedExecutor]:
484+
with self.subTest(cls=cls):
485+
executor = cls(context=self.context)
486+
executor.add_node(self.node)
487+
488+
fut: Future[str] = executor.create_future()
489+
fut.set_result('done') # complete before the task runs
490+
491+
async def coro() -> str:
492+
return await fut # type: ignore[return-value]
493+
494+
task = executor.create_task(coro)
495+
496+
executor.spin_until_future_complete(task, timeout_sec=5)
497+
self.assertTrue(task.done())
498+
self.assertEqual('done', task.result())
499+
500+
def test_create_task_during_spin(self) -> None:
501+
>>>>>>> aac0ebb (Bugfix: executor doesn't propagate exception from task that awaited a future (#1643))
418502
self.assertIsNotNone(self.node.handle)
419503
for cls in [SingleThreadedExecutor, EventsExecutor]:
420504
with self.subTest(cls=cls):

0 commit comments

Comments
 (0)