Skip to content

feat: Implement async action handling in trajectory execution#405

Draft
functionistic wants to merge 14 commits into
mainfrom
feat/NDX-155-simple-custom-actions
Draft

feat: Implement async action handling in trajectory execution#405
functionistic wants to merge 14 commits into
mainfrom
feat/NDX-155-simple-custom-actions

Conversation

@functionistic
Copy link
Copy Markdown
Collaborator

  • Introduced AsyncAction and AsyncActionExecutor to manage asynchronous actions during robot motion.
  • Updated CombinedActions to support AsyncAction and provide methods for retrieving async actions.
  • Enhanced MovementControllerContext to include async action executor.
  • Modified move_forward controller to initialize and utilize AsyncActionExecutor for executing async actions.
  • Added TrajectoryCursor support for async actions, including triggering and waiting for completion.
  • Implemented error handling for async actions with different modes (raise, collect, callback).
  • Created unit tests for async action functionality, including registration, execution, error handling, and integration with CombinedActions.
  • Added AsyncActionError exception for detailed error reporting during async action failures.

- Introduced AsyncAction and AsyncActionExecutor to manage asynchronous actions during robot motion.
- Updated CombinedActions to support AsyncAction and provide methods for retrieving async actions.
- Enhanced MovementControllerContext to include async action executor.
- Modified move_forward controller to initialize and utilize AsyncActionExecutor for executing async actions.
- Added TrajectoryCursor support for async actions, including triggering and waiting for completion.
- Implemented error handling for async actions with different modes (raise, collect, callback).
- Created unit tests for async action functionality, including registration, execution, error handling, and integration with CombinedActions.
- Added AsyncActionError exception for detailed error reporting during async action failures.
AsyncActionExecutor was imported under TYPE_CHECKING but Pydantic
needs types resolved at runtime. Changed to Any to avoid circular
import while keeping arbitrary_types_allowed.
- Use tuple(joint_position) instead of .joints for v2 RootModel
- Remove __cause__ override conflicting with BaseException signature
- Remove unnecessary type: ignore comments
@functionistic functionistic marked this pull request as ready for review March 9, 2026 20:23
Copilot AI review requested due to automatic review settings March 9, 2026 20:23
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces async action handling during robot trajectory execution. It allows users to define async functions that execute at specific trajectory locations, either in parallel with robot motion or blocking it. The feature includes a registry pattern for action handlers, an executor for managing action lifecycles, and integration with both the move_forward and TrajectoryCursor movement controllers.

Changes:

  • Added AsyncAction type, ActionRegistry, and supporting infrastructure (async_action.py) plus the AsyncActionExecutor class to manage action triggering, error handling, and result collection during trajectory execution.
  • Integrated async action support into move_forward and TrajectoryCursor movement controllers, including lifecycle management (trigger, wait, cancel).
  • Updated CombinedActions and MovementControllerContext to support async actions, with comprehensive unit tests and an example file.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
nova/actions/async_action.py New module defining AsyncAction, ActionRegistry, AsyncActionResult, ErrorHandlingMode, and factory functions.
nova/cell/movement_controller/async_action_executor.py New executor class managing async action lifecycle (triggering, blocking, timeout, error handling).
nova/actions/container.py Extended ActionLocation, ActionContainerItem, CombinedActions, and MovementControllerContext to support AsyncAction.
nova/cell/movement_controller/trajectory_cursor.py Integrated async action executor into cursor's state monitor loop.
nova/cell/movement_controller/move_forward.py Integrated async action executor into move_forward controller.
nova/exceptions.py Added AsyncActionError exception with detailed context.
nova/actions/__init__.py Exported new async action public API.
tests/actions/test_async_action.py Comprehensive unit tests for all async action components.
examples/async_action.py Example demonstrating async actions during trajectory execution.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +77 to +83
tcp_pose = motion_group_state.tcp_pose
pose = Pose(tcp_pose) if tcp_pose else Pose((0, 0, 0, 0, 0, 0))
robot_state = RobotState(
pose=pose,
tcp=motion_group_state.tcp,
joints=tuple(motion_group_state.joint_position),
)
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic to build a RobotState from motion_group_state is duplicated here and in trajectory_cursor.py (lines 991-996). Consider extracting this into a shared helper function (similar to how motion_group_state_to_motion_state exists in nova/types/state.py) to avoid drift between the two implementations.

Copilot uses AI. Check for mistakes.
Comment on lines +345 to +356
try:
return_value = running.task.result()
except asyncio.TimeoutError:
timed_out = True
error = asyncio.TimeoutError(
f"Async action '{running.action.action_name}' timed out"
)
except Exception as e:
error = e
logger.error(
f"Async action '{running.action.action_name}' failed: {e}", exc_info=True
)
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_collect_completed_actions catches asyncio.TimeoutError and Exception, but asyncio.CancelledError is a subclass of BaseException (not Exception) since Python 3.9. If a task was cancelled (e.g., externally), calling running.task.result() on line 346 will raise CancelledError that won't be caught by except Exception, causing it to propagate unexpectedly out of _collect_completed_actions. Add a except asyncio.CancelledError clause before except Exception to handle cancelled tasks gracefully.

Copilot uses AI. Check for mistakes.
Comment on lines +1004 to +1009
# Note: blocking actions are already executed by the executor
# When a blocking action completes, emit signal
for action_result in self._async_action_executor.results:
# Emit signal for newly completed actions
# (The executor collects them, we just need to notify)
pass # Signal emission is handled in the executor's result collection
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop on lines 1006-1009 is dead code — it iterates over self._async_action_executor.results but the body is only a pass statement with a misleading comment "Signal emission is handled in the executor's result collection." Either implement the signal emission (e.g., using the async_action_completed signal declared on line 330) or remove this dead loop entirely.

Copilot uses AI. Check for mistakes.
Comment on lines +1011 to +1012
except Exception as e:
logger.error(f"Async action error at location {self._current_location}: {e}")
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _check_and_trigger_async_actions method catches all Exceptions and silently logs them. This means if the AsyncActionExecutor is configured with ErrorHandlingMode.RAISE, the raised AsyncActionError will be caught and swallowed here, effectively overriding the user's chosen error handling policy. If RAISE mode is intended to halt trajectory execution on action failure, this broad except needs to selectively re-raise AsyncActionError (or at least not catch it).

Copilot uses AI. Check for mistakes.
Comment thread nova/actions/container.py
Comment on lines +180 to +182
pause_callback: Callback to request motion pause (for blocking actions).
resume_callback: Callback to request motion resume (after blocking actions).
"""
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring for MovementControllerContext lists pause_callback and resume_callback as attributes, but these fields do not exist on the class. Only async_action_executor was added. Either remove pause_callback and resume_callback from the docstring or add them as fields.

Copilot uses AI. Check for mistakes.
Comment on lines +105 to +106
await executor.wait_for_blocking_actions()
await resume_motion()
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring example references executor.wait_for_blocking_actions(), which is not a method on AsyncActionExecutor. The actual method is wait_for_all_actions(). Please update the example to use the correct method name.

Copilot uses AI. Check for mistakes.
# Signals emitted during motion events for external observers
motion_started = signal("motion_started")
motion_stopped = signal("motion_stopped")
async_action_completed = signal("async_action_completed")
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async_action_completed signal is declared but never emitted anywhere in the codebase. It's referenced by the dead loop in _check_and_trigger_async_actions (lines 1006-1009) which also does nothing. Consider either implementing the signal emission or removing this unused signal declaration.

Copilot uses AI. Check for mistakes.
@functionistic functionistic marked this pull request as draft March 9, 2026 20:52
Comment thread examples/setup.py Fixed
Comment thread examples/trajectory_with_safety_pause.py Fixed
Comment thread examples/trajectory_with_safety_pause.py Fixed
Comment thread examples/trajectory_with_safety_pause.py Fixed
Comment thread examples/actor_multibot.py Fixed
Comment thread examples/async_action.py Fixed
Comment thread examples/consec.py Fixed
Comment thread examples/consec_cursor.py Fixed
Comment thread examples/controller_io_observer.py Fixed
Comment thread examples/minimal.py Fixed
…c actions

- Introduce command queue pattern in move_forward for yielding
  StartMovementRequest and PauseMovementRequest via async callbacks
- Extract robot_state_from_motion_group_state helper to nova/types/state.py
- Move AsyncActionExecutor creation to MotionGroup.execute level
- Remove async action handling from TrajectoryCursor (now handled by executor)
- Add response_consumer and completion_watcher tasks in move_forward
- Removed was_blocking attribute from AsyncActionError and adjusted its initialization.
- Updated exception messages to simplify context reporting.
- Modified tests in test_async_action.py to accommodate changes in AsyncAction initialization, including action_id.
- Introduced AwaitAction and WaitUntilAction classes with corresponding tests.
- Enhanced AsyncActionExecutor tests to validate behavior with AwaitAction and WaitUntilAction.
- Added ExecutionState tests for basic operations and predicate handling.
- Updated CombinedActions tests to ensure compatibility with new action types.
Comment thread examples/async_action.py

import asyncio

import nova
Comment thread examples/minimal_cursor.py Fixed
self._current_task.cancel()
try:
await self._current_task
except asyncio.CancelledError:
Comment thread nova/utils/runtime.py

async def waiter():
with stop_scope:
await stop
Comment thread nova/utils/runtime.py

async def runner():
with run_scope:
await run
assert robot_b.started_stages == ["stage_one"]

stage_gate_b.set()
await start_task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants