feat: Implement async action handling in trajectory execution#405
feat: Implement async action handling in trajectory execution#405functionistic wants to merge 14 commits into
Conversation
functionistic
commented
Mar 6, 2026
- Introduced AsyncAction and AsyncActionExecutor to manage asynchronous actions during robot motion.
- Updated CombinedActions to support AsyncAction and provide methods for retrieving async actions.
- Enhanced MovementControllerContext to include async action executor.
- Modified move_forward controller to initialize and utilize AsyncActionExecutor for executing async actions.
- Added TrajectoryCursor support for async actions, including triggering and waiting for completion.
- Implemented error handling for async actions with different modes (raise, collect, callback).
- Created unit tests for async action functionality, including registration, execution, error handling, and integration with CombinedActions.
- Added AsyncActionError exception for detailed error reporting during async action failures.
- Introduced AsyncAction and AsyncActionExecutor to manage asynchronous actions during robot motion. - Updated CombinedActions to support AsyncAction and provide methods for retrieving async actions. - Enhanced MovementControllerContext to include async action executor. - Modified move_forward controller to initialize and utilize AsyncActionExecutor for executing async actions. - Added TrajectoryCursor support for async actions, including triggering and waiting for completion. - Implemented error handling for async actions with different modes (raise, collect, callback). - Created unit tests for async action functionality, including registration, execution, error handling, and integration with CombinedActions. - Added AsyncActionError exception for detailed error reporting during async action failures.
AsyncActionExecutor was imported under TYPE_CHECKING but Pydantic needs types resolved at runtime. Changed to Any to avoid circular import while keeping arbitrary_types_allowed.
- Use tuple(joint_position) instead of .joints for v2 RootModel - Remove __cause__ override conflicting with BaseException signature - Remove unnecessary type: ignore comments
There was a problem hiding this comment.
Pull request overview
This PR introduces async action handling during robot trajectory execution. It allows users to define async functions that execute at specific trajectory locations, either in parallel with robot motion or blocking it. The feature includes a registry pattern for action handlers, an executor for managing action lifecycles, and integration with both the move_forward and TrajectoryCursor movement controllers.
Changes:
- Added
AsyncActiontype,ActionRegistry, and supporting infrastructure (async_action.py) plus theAsyncActionExecutorclass to manage action triggering, error handling, and result collection during trajectory execution. - Integrated async action support into
move_forwardandTrajectoryCursormovement controllers, including lifecycle management (trigger, wait, cancel). - Updated
CombinedActionsandMovementControllerContextto support async actions, with comprehensive unit tests and an example file.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
nova/actions/async_action.py |
New module defining AsyncAction, ActionRegistry, AsyncActionResult, ErrorHandlingMode, and factory functions. |
nova/cell/movement_controller/async_action_executor.py |
New executor class managing async action lifecycle (triggering, blocking, timeout, error handling). |
nova/actions/container.py |
Extended ActionLocation, ActionContainerItem, CombinedActions, and MovementControllerContext to support AsyncAction. |
nova/cell/movement_controller/trajectory_cursor.py |
Integrated async action executor into cursor's state monitor loop. |
nova/cell/movement_controller/move_forward.py |
Integrated async action executor into move_forward controller. |
nova/exceptions.py |
Added AsyncActionError exception with detailed context. |
nova/actions/__init__.py |
Exported new async action public API. |
tests/actions/test_async_action.py |
Comprehensive unit tests for all async action components. |
examples/async_action.py |
Example demonstrating async actions during trajectory execution. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| tcp_pose = motion_group_state.tcp_pose | ||
| pose = Pose(tcp_pose) if tcp_pose else Pose((0, 0, 0, 0, 0, 0)) | ||
| robot_state = RobotState( | ||
| pose=pose, | ||
| tcp=motion_group_state.tcp, | ||
| joints=tuple(motion_group_state.joint_position), | ||
| ) |
There was a problem hiding this comment.
The logic to build a RobotState from motion_group_state is duplicated here and in trajectory_cursor.py (lines 991-996). Consider extracting this into a shared helper function (similar to how motion_group_state_to_motion_state exists in nova/types/state.py) to avoid drift between the two implementations.
| try: | ||
| return_value = running.task.result() | ||
| except asyncio.TimeoutError: | ||
| timed_out = True | ||
| error = asyncio.TimeoutError( | ||
| f"Async action '{running.action.action_name}' timed out" | ||
| ) | ||
| except Exception as e: | ||
| error = e | ||
| logger.error( | ||
| f"Async action '{running.action.action_name}' failed: {e}", exc_info=True | ||
| ) |
There was a problem hiding this comment.
_collect_completed_actions catches asyncio.TimeoutError and Exception, but asyncio.CancelledError is a subclass of BaseException (not Exception) since Python 3.9. If a task was cancelled (e.g., externally), calling running.task.result() on line 346 will raise CancelledError that won't be caught by except Exception, causing it to propagate unexpectedly out of _collect_completed_actions. Add a except asyncio.CancelledError clause before except Exception to handle cancelled tasks gracefully.
| # Note: blocking actions are already executed by the executor | ||
| # When a blocking action completes, emit signal | ||
| for action_result in self._async_action_executor.results: | ||
| # Emit signal for newly completed actions | ||
| # (The executor collects them, we just need to notify) | ||
| pass # Signal emission is handled in the executor's result collection |
There was a problem hiding this comment.
The loop on lines 1006-1009 is dead code — it iterates over self._async_action_executor.results but the body is only a pass statement with a misleading comment "Signal emission is handled in the executor's result collection." Either implement the signal emission (e.g., using the async_action_completed signal declared on line 330) or remove this dead loop entirely.
| except Exception as e: | ||
| logger.error(f"Async action error at location {self._current_location}: {e}") |
There was a problem hiding this comment.
The _check_and_trigger_async_actions method catches all Exceptions and silently logs them. This means if the AsyncActionExecutor is configured with ErrorHandlingMode.RAISE, the raised AsyncActionError will be caught and swallowed here, effectively overriding the user's chosen error handling policy. If RAISE mode is intended to halt trajectory execution on action failure, this broad except needs to selectively re-raise AsyncActionError (or at least not catch it).
| pause_callback: Callback to request motion pause (for blocking actions). | ||
| resume_callback: Callback to request motion resume (after blocking actions). | ||
| """ |
There was a problem hiding this comment.
The docstring for MovementControllerContext lists pause_callback and resume_callback as attributes, but these fields do not exist on the class. Only async_action_executor was added. Either remove pause_callback and resume_callback from the docstring or add them as fields.
| await executor.wait_for_blocking_actions() | ||
| await resume_motion() |
There was a problem hiding this comment.
The docstring example references executor.wait_for_blocking_actions(), which is not a method on AsyncActionExecutor. The actual method is wait_for_all_actions(). Please update the example to use the correct method name.
| # Signals emitted during motion events for external observers | ||
| motion_started = signal("motion_started") | ||
| motion_stopped = signal("motion_stopped") | ||
| async_action_completed = signal("async_action_completed") |
There was a problem hiding this comment.
The async_action_completed signal is declared but never emitted anywhere in the codebase. It's referenced by the dead loop in _check_and_trigger_async_actions (lines 1006-1009) which also does nothing. Consider either implementing the signal emission or removing this unused signal declaration.
…c actions - Introduce command queue pattern in move_forward for yielding StartMovementRequest and PauseMovementRequest via async callbacks - Extract robot_state_from_motion_group_state helper to nova/types/state.py - Move AsyncActionExecutor creation to MotionGroup.execute level - Remove async action handling from TrajectoryCursor (now handled by executor) - Add response_consumer and completion_watcher tasks in move_forward
- Removed was_blocking attribute from AsyncActionError and adjusted its initialization. - Updated exception messages to simplify context reporting. - Modified tests in test_async_action.py to accommodate changes in AsyncAction initialization, including action_id. - Introduced AwaitAction and WaitUntilAction classes with corresponding tests. - Enhanced AsyncActionExecutor tests to validate behavior with AwaitAction and WaitUntilAction. - Added ExecutionState tests for basic operations and predicate handling. - Updated CombinedActions tests to ensure compatibility with new action types.
|
|
||
| import asyncio | ||
|
|
||
| import nova |
| self._current_task.cancel() | ||
| try: | ||
| await self._current_task | ||
| except asyncio.CancelledError: |
|
|
||
| async def waiter(): | ||
| with stop_scope: | ||
| await stop |
|
|
||
| async def runner(): | ||
| with run_scope: | ||
| await run |
| assert robot_b.started_stages == ["stage_one"] | ||
|
|
||
| stage_gate_b.set() | ||
| await start_task |
…ustom-actions # Conflicts: # nova/cell/motion_group.py # pyproject.toml