diff --git a/strands_robots_sim/isaac/loaders.py b/strands_robots_sim/isaac/loaders.py new file mode 100644 index 0000000..952374c --- /dev/null +++ b/strands_robots_sim/isaac/loaders.py @@ -0,0 +1,732 @@ +"""Robot description file loaders → :class:`ProceduralRobot`. + +Follow-up to the R7 Phase 1 procedural-builder slice (PR #46): instead of +hardcoding ``_build_so100`` / ``_build_panda`` / ``_build_unitree_g1`` in +``procedural.py``, drive the same ``ProceduralRobot`` dataclass from existing +robot description files (URDF, MJCF, USD) so the code path becomes a generic +loader rather than a per-robot Python builder. + +Supported formats: + * **URDF** — ``load_urdf(path)``. Parsed with stdlib + ``xml.etree.ElementTree``. No external deps; mirrors the parser + semantics on the Newton side (``strands_robots_sim/newton/simulation.py`` + on PR #30) so both backends share behaviour. + * **MJCF** — ``load_mjcf(path)``. Parsed with stdlib + ``xml.etree.ElementTree``. Handles ```` / nested ```` + / ```` for LIBERO-style scenes. No mujoco-Python dep needed for + definition extraction. + * **USD** — ``load_usd(path)``. Walks the USD prim hierarchy via + ``pxr.Usd`` / ``pxr.UsdPhysics`` to extract ``PhysicsRevoluteJoint`` / + ``PhysicsPrismaticJoint`` + body inertia. Gated behind the ``[isaac]`` + extra (``usd-core>=24.5``); raises :class:`ImportError` with an + install hint when ``pxr`` is unavailable. + +Failure semantics (closes the #33 class of bugs — silent ``joint_count=0`` +on parse failure): + + * Missing path → :class:`FileNotFoundError`. + * Malformed XML / unparseable document → :class:`ValueError` with the + file path and the offending element / parser message. + * Empty document (zero links / zero joints / zero bodies) → + :class:`ValueError`. Loaders never silently return a phantom robot. + +The procedural builders in :mod:`strands_robots_sim.isaac.procedural` are +intentionally retained as the zero-dep, testable fallback used when no +description file is configured. The loaders here layer on top. +""" + +from __future__ import annotations + +import os +import xml.etree.ElementTree as ET +from typing import TYPE_CHECKING, Any + +from strands_robots_sim.isaac.procedural import ( + BodyDef, + JointDef, + ProceduralRobot, + _validate_kinematic_tree, +) + +if TYPE_CHECKING: + pass + +__all__ = [ + "load_urdf", + "load_mjcf", + "load_usd", +] + + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + + +def _require_existing_file(path: str, fmt: str) -> None: + """Raise FileNotFoundError if path doesn't exist or isn't a file. + + Parameters + ---------- + path : str + Filesystem path to check. + fmt : str + Format label for the error message ("URDF", "MJCF", "USD"). + """ + if not os.path.exists(path): + raise FileNotFoundError(f"{fmt} loader: file not found: {path}") + if not os.path.isfile(path): + raise FileNotFoundError(f"{fmt} loader: path is not a regular file: {path}") + + +def _parse_xml(path: str, fmt: str) -> ET.Element: + """Parse an XML file, converting parser errors into ValueError. + + Returns the root element. The :class:`xml.etree.ElementTree.ParseError` + is wrapped in a :class:`ValueError` carrying the file path so the + failure mode is explicit (not a silent zero-joint robot — see #33). + """ + try: + tree = ET.parse(path) + except ET.ParseError as e: + raise ValueError(f"{fmt} loader: malformed XML in {path}: {e}") from e + return tree.getroot() + + +def _parse_axis( + axis_str: str | None, default: tuple[float, float, float] = (0.0, 0.0, 1.0) +) -> tuple[float, float, float]: + """Parse a whitespace-separated 3-vector. Returns ``default`` if empty / malformed.""" + if not axis_str: + return default + try: + parts = axis_str.replace(",", " ").split() + if len(parts) != 3: + return default + return (float(parts[0]), float(parts[1]), float(parts[2])) + except (ValueError, TypeError): + return default + + +def _parse_xyz( + xyz_str: str | None, default: tuple[float, float, float] = (0.0, 0.0, 0.0) +) -> tuple[float, float, float]: + """Parse a whitespace-separated 3-vector position. Returns ``default`` on failure.""" + if not xyz_str: + return default + try: + parts = xyz_str.replace(",", " ").split() + if len(parts) < 3: + return default + return (float(parts[0]), float(parts[1]), float(parts[2])) + except (ValueError, TypeError): + return default + + +def _safe_float(value: str | None, default: float) -> float: + """Parse a float, returning ``default`` on failure.""" + if value is None: + return default + try: + return float(value) + except (ValueError, TypeError): + return default + + +# --------------------------------------------------------------------------- +# URDF +# --------------------------------------------------------------------------- + + +# Map URDF joint type → ProceduralRobot joint type. +# URDF spec types: revolute, continuous, prismatic, fixed, floating, planar. +# We collapse "continuous" → "revolute" (continuous is a revolute with no +# limits; we surface unbounded ±π as the limit). "floating" / "planar" are +# rare and don't have a clean 1-DOF axis; we surface them as "fixed" with a +# warning-via-comment in the joint name (callers can refine if needed). +_URDF_JOINT_TYPE_MAP = { + "revolute": "revolute", + "continuous": "revolute", + "prismatic": "prismatic", + "fixed": "fixed", + "floating": "fixed", + "planar": "fixed", +} + + +def load_urdf(path: str) -> ProceduralRobot: + """Load a URDF file and return a :class:`ProceduralRobot`. + + Parses ```` and ```` elements via stdlib + :mod:`xml.etree.ElementTree`. Joint axes, limits, parent / child link + references, and per-link inertial mass are extracted; geometry is + surfaced as a best-effort ``shape`` / ``shape_size`` (defaulting to a + unit box when absent — Phase 1 doesn't render, only the kinematic + structure matters). + + Parameters + ---------- + path : str + Filesystem path to a URDF (XML) file. + + Returns + ------- + ProceduralRobot + Robot definition mirroring the file's link / joint topology. + + Raises + ------ + FileNotFoundError + If ``path`` doesn't exist. + ValueError + If the XML is malformed, the root tag isn't ````, or the + document declares zero links (a #33-style "phantom robot" guard). + """ + _require_existing_file(path, "URDF") + root = _parse_xml(path, "URDF") + + if root.tag != "robot": + raise ValueError(f"URDF loader: root element must be , got <{root.tag}> in {path}") + + name = root.get("name", os.path.splitext(os.path.basename(path))[0]) + + # Pass 1: collect links → bodies (preserving file order so joint + # parent/child name lookups become a stable index). + bodies: list[BodyDef] = [] + link_index: dict[str, int] = {} + for link_el in root.findall("link"): + link_name = link_el.get("name") + if not link_name: + raise ValueError(f"URDF loader: without name attribute in {path}") + if link_name in link_index: + raise ValueError(f"URDF loader: duplicate in {path}") + + # Inertial mass (defaults to 1.0 for renderable / 0.0 would suggest + # massless — but URDF mass is required for non-fixed children, so + # default 1.0 is the safer guess for procedural builders). + mass = 1.0 + inertial = link_el.find("inertial") + if inertial is not None: + mass_el = inertial.find("mass") + if mass_el is not None: + mass = _safe_float(mass_el.get("value"), 1.0) + + # Geometry — best effort; URDF lets multiple / + # blocks coexist and arbitrary mesh references. We extract the + # first we find, falling back to . + shape, shape_size = _extract_urdf_shape(link_el) + + bodies.append( + BodyDef( + name=link_name, + position=(0.0, 0.0, 0.0), # absolute pose computed by joint chain at instantiation time + mass=mass, + shape=shape, + shape_size=shape_size, + ) + ) + link_index[link_name] = len(bodies) - 1 + + if not bodies: + raise ValueError(f"URDF loader: {path} declares zero elements (phantom robot guard)") + + # Pass 2: collect joints. For each joint, look up parent / child link + # by name and resolve to body indices. + joints: list[JointDef] = [] + for joint_el in root.findall("joint"): + jname = joint_el.get("name") + if not jname: + raise ValueError(f"URDF loader: without name attribute in {path}") + + urdf_type = joint_el.get("type", "fixed") + jtype = _URDF_JOINT_TYPE_MAP.get(urdf_type) + if jtype is None: + raise ValueError( + f"URDF loader: in {path}: " + f"unknown joint type (expected one of {sorted(_URDF_JOINT_TYPE_MAP)})" + ) + + parent_el = joint_el.find("parent") + child_el = joint_el.find("child") + if parent_el is None or child_el is None: + raise ValueError(f"URDF loader: in {path} missing or ") + parent_name = parent_el.get("link") + child_name = child_el.get("link") + if not parent_name or not child_name: + raise ValueError( + f"URDF loader: in {path}: " f" / missing 'link' attribute" + ) + if parent_name not in link_index: + raise ValueError( + f"URDF loader: references unknown parent link " f"'{parent_name}' in {path}" + ) + if child_name not in link_index: + raise ValueError( + f"URDF loader: references unknown child link " f"'{child_name}' in {path}" + ) + + axis_el = joint_el.find("axis") + axis = _parse_axis(axis_el.get("xyz") if axis_el is not None else None) + + # Limits — URDF requires for revolute/prismatic, optional + # for continuous. Defaults below match the dataclass defaults. + lower = -3.14159 + upper = 3.14159 + damping = 0.1 + limit_el = joint_el.find("limit") + if limit_el is not None: + lower = _safe_float(limit_el.get("lower"), lower) + upper = _safe_float(limit_el.get("upper"), upper) + dynamics_el = joint_el.find("dynamics") + if dynamics_el is not None: + damping = _safe_float(dynamics_el.get("damping"), damping) + + joints.append( + JointDef( + name=jname, + joint_type=jtype, + parent_body=link_index[parent_name], + child_body=link_index[child_name], + axis=axis, + limit_lower=lower, + limit_upper=upper, + damping=damping, + ) + ) + + robot = ProceduralRobot(name=name, bodies=bodies, joints=joints) + _validate_kinematic_tree(robot) + return robot + + +def _extract_urdf_shape(link_el: ET.Element) -> tuple[str, tuple[float, ...]]: + """Best-effort URDF link → (shape, shape_size) extraction. + + Falls back to a small unit box when no primitive is found. + Mesh-only links surface as ``shape="box"`` with an estimated size — the + loader is for kinematic structure, not visual fidelity. + """ + for parent_tag in ("collision", "visual"): + parent = link_el.find(parent_tag) + if parent is None: + continue + geom = parent.find("geometry") + if geom is None: + continue + for prim_tag, parser in ( + ("box", _parse_box_size), + ("cylinder", _parse_cylinder_size), + ("sphere", _parse_sphere_size), + ("capsule", _parse_cylinder_size), # uncommon, treat like cylinder + ): + prim = geom.find(prim_tag) + if prim is not None: + return prim_tag, parser(prim) + # Mesh — no primitive size; default to small box. + if geom.find("mesh") is not None: + return "box", (0.05, 0.05, 0.05) + return "box", (0.05, 0.05, 0.05) + + +def _parse_box_size(el: ET.Element) -> tuple[float, ...]: + size = _parse_xyz(el.get("size"), default=(0.05, 0.05, 0.05)) + return size + + +def _parse_cylinder_size(el: ET.Element) -> tuple[float, ...]: + radius = _safe_float(el.get("radius"), 0.05) + length = _safe_float(el.get("length"), 0.1) + return (radius, length) + + +def _parse_sphere_size(el: ET.Element) -> tuple[float, ...]: + radius = _safe_float(el.get("radius"), 0.05) + return (radius,) + + +# --------------------------------------------------------------------------- +# MJCF +# --------------------------------------------------------------------------- + + +# Map MJCF joint type → ProceduralRobot joint type. +# MJCF spec types: free, ball, slide, hinge. +# - hinge → revolute (1-DOF rotational) +# - slide → prismatic +# - ball → not 1-DOF; no clean mapping — surface as "fixed" so the body +# index is preserved without claiming actuated DOF. +# - free → 6-DOF root joint; not part of the actuated chain — "fixed". +_MJCF_JOINT_TYPE_MAP = { + "hinge": "revolute", + "slide": "prismatic", + "ball": "fixed", + "free": "fixed", +} + + +def load_mjcf(path: str) -> ProceduralRobot: + """Load an MJCF file and return a :class:`ProceduralRobot`. + + Parses MuJoCo's MJCF format with stdlib + :mod:`xml.etree.ElementTree`. Walks ```` / nested ```` + elements depth-first to assign body indices in tree order, then emits a + :class:`JointDef` for each ```` connecting that body to its + parent. Useful for LIBERO scenes (the matrix's main consumer ships + MJCF). + + Parameters + ---------- + path : str + Filesystem path to an MJCF (XML) file. + + Returns + ------- + ProceduralRobot + Robot definition mirroring the body / joint topology. + + Raises + ------ + FileNotFoundError + If ``path`` doesn't exist. + ValueError + If the XML is malformed, the root tag isn't ````, or no + ```` / no descendant ```` is present. + """ + _require_existing_file(path, "MJCF") + root = _parse_xml(path, "MJCF") + + if root.tag != "mujoco": + raise ValueError(f"MJCF loader: root element must be , got <{root.tag}> in {path}") + + model_name = root.get("model", os.path.splitext(os.path.basename(path))[0]) + + worldbody = root.find("worldbody") + if worldbody is None: + raise ValueError(f"MJCF loader: {path} has no ") + + bodies: list[BodyDef] = [] + joints: list[JointDef] = [] + + # Synthetic root body so MJCF top-level s under + # always have a valid parent index. MJCF's "world" is implicit. + bodies.append( + BodyDef( + name="world", + position=(0.0, 0.0, 0.0), + mass=0.0, + shape="box", + shape_size=(0.0, 0.0, 0.0), + ) + ) + + def _walk(body_el: ET.Element, parent_idx: int) -> None: + body_name = body_el.get("name") or f"body_{len(bodies)}" + position = _parse_xyz(body_el.get("pos")) + + # MJCF mass — usually inferred via or via + # density; default to 1.0 if absent. + mass = 1.0 + inertial = body_el.find("inertial") + if inertial is not None: + mass = _safe_float(inertial.get("mass"), 1.0) + + # Geometry — first primitive type. + shape, shape_size = _extract_mjcf_shape(body_el) + + bodies.append( + BodyDef( + name=body_name, + position=position, + mass=mass, + shape=shape, + shape_size=shape_size, + ) + ) + body_idx = len(bodies) - 1 + + # Each child connects this body to its parent. + for joint_el in body_el.findall("joint"): + jname = joint_el.get("name") or f"{body_name}_joint_{len(joints)}" + mjcf_type = joint_el.get("type", "hinge") + jtype = _MJCF_JOINT_TYPE_MAP.get(mjcf_type) + if jtype is None: + raise ValueError( + f"MJCF loader: in {path}: " + f"unknown joint type (expected one of {sorted(_MJCF_JOINT_TYPE_MAP)})" + ) + + axis = _parse_axis(joint_el.get("axis")) + range_str = joint_el.get("range") + lower, upper = -3.14159, 3.14159 + if range_str: + try: + parts = range_str.replace(",", " ").split() + if len(parts) >= 2: + lower = float(parts[0]) + upper = float(parts[1]) + except (ValueError, TypeError): + pass + + damping = _safe_float(joint_el.get("damping"), 0.1) + armature = _safe_float(joint_el.get("armature"), 0.01) + + joints.append( + JointDef( + name=jname, + joint_type=jtype, + parent_body=parent_idx, + child_body=body_idx, + axis=axis, + limit_lower=lower, + limit_upper=upper, + damping=damping, + armature=armature, + ) + ) + + for child in body_el.findall("body"): + _walk(child, body_idx) + + top_bodies = list(worldbody.findall("body")) + if not top_bodies: + raise ValueError(f"MJCF loader: {path} has no children (phantom robot guard)") + + for body_el in top_bodies: + _walk(body_el, parent_idx=0) + + robot = ProceduralRobot(name=model_name, bodies=bodies, joints=joints) + _validate_kinematic_tree(robot) + return robot + + +def _extract_mjcf_shape(body_el: ET.Element) -> tuple[str, tuple[float, ...]]: + """Best-effort MJCF body → (shape, shape_size) extraction from first .""" + geom = body_el.find("geom") + if geom is None: + return "box", (0.05, 0.05, 0.05) + gtype = geom.get("type", "box") + size_str = geom.get("size", "") + sizes: list[float] = [] + if size_str: + try: + sizes = [float(p) for p in size_str.replace(",", " ").split()] + except (ValueError, TypeError): + sizes = [] + if gtype == "box": + if len(sizes) >= 3: + return "box", (sizes[0], sizes[1], sizes[2]) + return "box", (0.05, 0.05, 0.05) + if gtype == "sphere": + if sizes: + return "sphere", (sizes[0],) + return "sphere", (0.05,) + if gtype in ("cylinder", "capsule"): + # MJCF size for capsule/cylinder is (radius, half-length). + if len(sizes) >= 2: + return gtype, (sizes[0], sizes[1]) + if len(sizes) == 1: + return gtype, (sizes[0], 0.05) + return gtype, (0.05, 0.05) + # Mesh, plane, ellipsoid, hfield etc. — treat as a small box for kinematic-only purposes. + return "box", (0.05, 0.05, 0.05) + + +# --------------------------------------------------------------------------- +# USD +# --------------------------------------------------------------------------- + + +def _lazy_import_usd() -> tuple[Any, Any, Any]: + """Lazy-import pxr.Usd / Sdf / UsdPhysics. Mirrors the pattern from PR #44. + + Returns (Usd, Sdf, UsdPhysics) tuple. Raises ImportError with an install + hint when the modules are unavailable (Pixar USD ships only via the + ``[isaac]`` extra). + """ + try: + from pxr import Sdf, Usd, UsdPhysics # type: ignore[import-not-found] + + return Usd, Sdf, UsdPhysics + except ImportError as e: + raise ImportError( + "USD loader requires Pixar USD (pxr.Usd / pxr.UsdPhysics). " + "Install via: pip install 'strands-robots-sim[isaac]' " + "or directly: pip install 'usd-core>=24.5'" + ) from e + + +# Map UsdPhysics joint API → ProceduralRobot joint type. +_USD_JOINT_TYPE_MAP = { + "PhysicsRevoluteJoint": "revolute", + "PhysicsPrismaticJoint": "prismatic", + "PhysicsFixedJoint": "fixed", + "PhysicsSphericalJoint": "fixed", # 3-DOF; not 1-DOF, surface as fixed + "PhysicsDistanceJoint": "fixed", +} + + +# Map USD physics joint axis token → ProceduralRobot axis tuple. +_USD_AXIS_MAP = { + "X": (1.0, 0.0, 0.0), + "Y": (0.0, 1.0, 0.0), + "Z": (0.0, 0.0, 1.0), +} + + +def load_usd(path: str) -> ProceduralRobot: + """Load a USD file and return a :class:`ProceduralRobot`. + + Walks the USD prim hierarchy via ``pxr.Usd`` / ``pxr.UsdPhysics`` to + extract physics joint prims (``PhysicsRevoluteJoint`` / + ``PhysicsPrismaticJoint`` / ``PhysicsFixedJoint``) plus rigid-body + prims with ``UsdPhysicsRigidBodyAPI``. + + Gated behind the ``[isaac]`` extra (``usd-core``); raises + :class:`ImportError` with an install hint when ``pxr`` is unavailable. + + Parameters + ---------- + path : str + Filesystem path to a USD file (.usd / .usda / .usdc / .usdz). + + Returns + ------- + ProceduralRobot + Robot definition mirroring the rigid-body / physics-joint graph. + + Raises + ------ + FileNotFoundError + If ``path`` doesn't exist. + ImportError + If ``pxr`` is not importable (install via ``[isaac]`` extra). + ValueError + If the stage fails to open, declares zero rigid bodies, or has a + joint with an unresolved body0 / body1 reference. + """ + _require_existing_file(path, "USD") + Usd, _Sdf, UsdPhysics = _lazy_import_usd() + + stage = Usd.Stage.Open(path) + if stage is None: + raise ValueError(f"USD loader: failed to open stage at {path}") + + name = os.path.splitext(os.path.basename(path))[0] + + # Pass 1: collect rigid bodies. We treat any prim with + # UsdPhysicsRigidBodyAPI as a body; ordering follows depth-first + # traversal of the stage's pseudo-root. + bodies: list[BodyDef] = [] + body_index: dict[str, int] = {} + + for prim in stage.Traverse(): + if not prim.HasAPI(UsdPhysics.RigidBodyAPI): + continue + prim_path = str(prim.GetPath()) + if prim_path in body_index: + continue + body_name = prim.GetName() or prim_path.replace("/", "_").lstrip("_") + + # Mass — UsdPhysicsMassAPI + mass = 1.0 + if prim.HasAPI(UsdPhysics.MassAPI): + mass_api = UsdPhysics.MassAPI(prim) + mass_attr = mass_api.GetMassAttr() + if mass_attr and mass_attr.HasAuthoredValue(): + mass = float(mass_attr.Get() or 1.0) + + bodies.append( + BodyDef( + name=body_name, + position=(0.0, 0.0, 0.0), + mass=mass, + shape="box", + shape_size=(0.05, 0.05, 0.05), + ) + ) + body_index[prim_path] = len(bodies) - 1 + + if not bodies: + raise ValueError( + f"USD loader: {path} declares zero rigid bodies " + f"(no prims with UsdPhysicsRigidBodyAPI); phantom robot guard" + ) + + # Pass 2: collect physics joints. Any UsdPhysics.Joint subclass + # (Revolute / Prismatic / Fixed / Spherical / Distance) shows up here + # via prim.IsA(UsdPhysics.Joint). + joints: list[JointDef] = [] + for prim in stage.Traverse(): + if not prim.IsA(UsdPhysics.Joint): + continue + type_name = prim.GetTypeName() + jtype = _USD_JOINT_TYPE_MAP.get(str(type_name)) + if jtype is None: + # Unknown subclass — preserve the prim as a fixed joint to + # keep body indexing consistent. Surfacing via name pattern. + jtype = "fixed" + + joint_api = UsdPhysics.Joint(prim) + body0_rel = joint_api.GetBody0Rel() + body1_rel = joint_api.GetBody1Rel() + body0_targets = list(body0_rel.GetTargets()) if body0_rel else [] + body1_targets = list(body1_rel.GetTargets()) if body1_rel else [] + if not body0_targets or not body1_targets: + raise ValueError( + f"USD loader: joint {prim.GetPath()} in {path} has unresolved " + f"body0/body1 relationship (Phase-1 phantom-robot guard)" + ) + body0_path = str(body0_targets[0]) + body1_path = str(body1_targets[0]) + if body0_path not in body_index: + raise ValueError( + f"USD loader: joint {prim.GetPath()} body0 references " + f"{body0_path} which is not a rigid body in {path}" + ) + if body1_path not in body_index: + raise ValueError( + f"USD loader: joint {prim.GetPath()} body1 references " + f"{body1_path} which is not a rigid body in {path}" + ) + + # Axis: UsdPhysicsRevoluteJoint / PrismaticJoint expose an "axis" + # token attribute valued "X" / "Y" / "Z". + axis: tuple[float, float, float] = (0.0, 0.0, 1.0) + if str(type_name) in ("PhysicsRevoluteJoint", "PhysicsPrismaticJoint"): + schema_cls = ( + UsdPhysics.RevoluteJoint if str(type_name) == "PhysicsRevoluteJoint" else UsdPhysics.PrismaticJoint + ) + schema = schema_cls(prim) + axis_attr = schema.GetAxisAttr() + if axis_attr and axis_attr.HasAuthoredValue(): + axis = _USD_AXIS_MAP.get(str(axis_attr.Get()), axis) + + lower_attr = schema.GetLowerLimitAttr() + upper_attr = schema.GetUpperLimitAttr() + lower = -3.14159 + upper = 3.14159 + if lower_attr and lower_attr.HasAuthoredValue(): + lower = float(lower_attr.Get()) + if upper_attr and upper_attr.HasAuthoredValue(): + upper = float(upper_attr.Get()) + else: + lower = -3.14159 + upper = 3.14159 + + jname = prim.GetName() or str(prim.GetPath()).replace("/", "_").lstrip("_") + + joints.append( + JointDef( + name=jname, + joint_type=jtype, + parent_body=body_index[body0_path], + child_body=body_index[body1_path], + axis=axis, + limit_lower=lower, + limit_upper=upper, + ) + ) + + robot = ProceduralRobot(name=name, bodies=bodies, joints=joints) + _validate_kinematic_tree(robot) + return robot diff --git a/strands_robots_sim/isaac/procedural.py b/strands_robots_sim/isaac/procedural.py index 8df6465..905434a 100644 --- a/strands_robots_sim/isaac/procedural.py +++ b/strands_robots_sim/isaac/procedural.py @@ -11,7 +11,6 @@ from __future__ import annotations -import os from dataclasses import dataclass, field @@ -64,21 +63,26 @@ def joint_names(self) -> list[str]: def _validate_kinematic_tree(robot: ProceduralRobot) -> None: - """Phase-1 fail-fast guard: reject duplicate (parent, child) body edges. - - Phase-1 callers don't instantiate the articulation, so this is a no-op at - builder time UNLESS ``STRANDS_ISAAC_VALIDATE_KINEMATICS=1`` is set - (Phase-2 dev path). When the guard fires, it surfaces the defect with - body indices + joint names rather than letting USD/MuJoCo emit a cryptic - articulation error two layers down. - - See the NOTE in ``_build_unitree_g1`` for the specific defect this is - designed to catch (G1 legs/arms currently share a single ``(parent, - child)`` edge across two joint axes; USD requires intermediate massless - link bodies). + """Fail-fast guard: reject duplicate (parent, child) body edges. + + A USD/MuJoCo articulation requires a tree where each non-root link has + exactly one inbound joint. Two joints sharing the same ``(parent_body, + child_body)`` edge violate that invariant and would surface two layers + down as a cryptic articulation error at instantiation time. + + Validation runs unconditionally on every procedural builder + every + URDF / MJCF / USD loader: shipping a robot we know cannot instantiate + has no good use case in this package, and silently producing one is + worse than failing fast at builder time. The check raises ``ValueError`` + with body indices + joint names so the offender is obvious from the + traceback alone. + + For 2-DOF compound joints (e.g. a hip with both roll and pitch axes), + insert an intermediate massless link body between the two joints so each + joint has its own ``(parent, child)`` edge. ``_build_unitree_g1`` is the + canonical example -- six ``*_link`` intermediate bodies split the + hip / ankle / arm 2-DOF axes. """ - if os.environ.get("STRANDS_ISAAC_VALIDATE_KINEMATICS", "").lower() not in ("1", "true", "yes"): - return from collections import Counter edges = [(j.parent_body, j.child_body) for j in robot.joints] @@ -86,7 +90,7 @@ def _validate_kinematic_tree(robot: ProceduralRobot) -> None: if dups: offenders = {edge: [j.name for j in robot.joints if (j.parent_body, j.child_body) == edge] for edge in dups} raise ValueError( - f"{robot.name}: duplicate parent->child body edges (Phase-2 defect): {offenders}. " + f"{robot.name}: duplicate parent->child body edges: {offenders}. " f"Insert intermediate massless link bodies before instantiating articulation." ) @@ -170,7 +174,15 @@ def _build_panda() -> ProceduralRobot: def _build_unitree_g1() -> ProceduralRobot: - """Build Unitree G1 humanoid (simplified 21-DOF) procedurally.""" + """Build Unitree G1 humanoid (simplified 21-DOF) procedurally. + + The G1 has six 2-DOF compound joints (hips, ankles, shoulder/elbow on each + arm). To keep the kinematic graph a valid tree -- one inbound joint per + non-root link, as USD / MuJoCo articulations require -- each compound + joint is split through a massless intermediate ``*_link`` body. The two + axes still resolve to two actuated joints (so ``num_joints == 21``); only + the topology gains the six extra fixed link bodies. + """ bodies = [ BodyDef(name="pelvis", position=(0.0, 0.0, 0.85), mass=10.0, shape="box", shape_size=(0.15, 0.1, 0.15)), BodyDef(name="torso", position=(0.0, 0.0, 1.1), mass=8.0, shape="box", shape_size=(0.12, 0.08, 0.3)), @@ -193,38 +205,39 @@ def _build_unitree_g1() -> ProceduralRobot: BodyDef(name="r_shoulder", position=(0.2, 0.0, 1.2), mass=1.5, shape="sphere", shape_size=(0.04,)), BodyDef(name="r_upper_arm", position=(0.35, 0.0, 1.2), mass=1.5, shape="capsule", shape_size=(0.03, 0.12)), BodyDef(name="r_forearm", position=(0.55, 0.0, 1.2), mass=1.0, shape="capsule", shape_size=(0.025, 0.1)), + # Massless intermediate link bodies so 2-DOF compound joints (hips, + # ankles, shoulder-yaw/elbow) each have a unique (parent, child) edge. + # Indices 17..22. + BodyDef(name="l_hip_link", position=(-0.08, 0.0, 0.7), mass=0.0, shape="sphere", shape_size=(0.001,)), + BodyDef(name="r_hip_link", position=(0.08, 0.0, 0.7), mass=0.0, shape="sphere", shape_size=(0.001,)), + BodyDef(name="l_ankle_link", position=(-0.08, 0.0, 0.1), mass=0.0, shape="sphere", shape_size=(0.001,)), + BodyDef(name="r_ankle_link", position=(0.08, 0.0, 0.1), mass=0.0, shape="sphere", shape_size=(0.001,)), + BodyDef(name="l_elbow_link", position=(-0.45, 0.0, 1.2), mass=0.0, shape="sphere", shape_size=(0.001,)), + BodyDef(name="r_elbow_link", position=(0.45, 0.0, 1.2), mass=0.0, shape="sphere", shape_size=(0.001,)), ] - # Simplified joint set (21 DOF total: 1 torso + 6 left leg + 6 right leg + 4 left arm + 4 right arm). - # NOTE: this kinematic graph contains duplicate (parent, child) edges on each leg/arm - # (e.g. l_hip_roll and l_hip_pitch both map bodies 3 -> 4). A real USD/MuJoCo articulation - # builder requires a tree where each non-root link has exactly one inbound joint, so this - # topology will need intermediate massless link bodies before Phase 2 wires up the actual - # USD prim chain. Tracked as Phase-2 work; the Phase-1 skeleton does not instantiate the - # articulation, so the duplicate-edge defect is dormant on this branch. - # - # ``_validate_kinematic_tree`` (called below) is a no-op by default but raises when - # ``STRANDS_ISAAC_VALIDATE_KINEMATICS=1`` is set, which is the Phase-2 dev path: it - # surfaces this defect at builder time with body indices + joint names rather than - # letting USD/MuJoCo emit a cryptic articulation error two layers down. + # 21 DOF total: 1 torso + 6 left leg + 6 right leg + 4 left arm + 4 right arm. + # Each 2-DOF compound joint is split through a massless intermediate body + # (indices 17..22) so every (parent, child) edge in the kinematic tree is + # unique -- the invariant ``_validate_kinematic_tree`` enforces below. joints = [ # Torso JointDef(name="torso_yaw", parent_body=0, child_body=1, axis=(0, 0, 1), limit_lower=-1.0, limit_upper=1.0), - # Left leg (6 DOF) + # Left leg (6 DOF). Hip 2-DOF split via l_hip_link (17); ankle 2-DOF split via l_ankle_link (19). JointDef(name="l_hip_yaw", parent_body=0, child_body=3, axis=(0, 0, 1), limit_lower=-0.5, limit_upper=0.5), - JointDef(name="l_hip_roll", parent_body=3, child_body=4, axis=(1, 0, 0), limit_lower=-0.5, limit_upper=0.5), - JointDef(name="l_hip_pitch", parent_body=3, child_body=4, axis=(0, 1, 0), limit_lower=-1.5, limit_upper=0.5), + JointDef(name="l_hip_roll", parent_body=3, child_body=17, axis=(1, 0, 0), limit_lower=-0.5, limit_upper=0.5), + JointDef(name="l_hip_pitch", parent_body=17, child_body=4, axis=(0, 1, 0), limit_lower=-1.5, limit_upper=0.5), JointDef(name="l_knee", parent_body=4, child_body=5, axis=(0, 1, 0), limit_lower=-0.1, limit_upper=2.5), - JointDef(name="l_ankle_pitch", parent_body=5, child_body=6, axis=(0, 1, 0), limit_lower=-0.8, limit_upper=0.5), - JointDef(name="l_ankle_roll", parent_body=5, child_body=6, axis=(1, 0, 0), limit_lower=-0.3, limit_upper=0.3), - # Right leg (6 DOF) + JointDef(name="l_ankle_pitch", parent_body=5, child_body=19, axis=(0, 1, 0), limit_lower=-0.8, limit_upper=0.5), + JointDef(name="l_ankle_roll", parent_body=19, child_body=6, axis=(1, 0, 0), limit_lower=-0.3, limit_upper=0.3), + # Right leg (6 DOF). Hip 2-DOF split via r_hip_link (18); ankle 2-DOF split via r_ankle_link (20). JointDef(name="r_hip_yaw", parent_body=0, child_body=7, axis=(0, 0, 1), limit_lower=-0.5, limit_upper=0.5), - JointDef(name="r_hip_roll", parent_body=7, child_body=8, axis=(1, 0, 0), limit_lower=-0.5, limit_upper=0.5), - JointDef(name="r_hip_pitch", parent_body=7, child_body=8, axis=(0, 1, 0), limit_lower=-1.5, limit_upper=0.5), + JointDef(name="r_hip_roll", parent_body=7, child_body=18, axis=(1, 0, 0), limit_lower=-0.5, limit_upper=0.5), + JointDef(name="r_hip_pitch", parent_body=18, child_body=8, axis=(0, 1, 0), limit_lower=-1.5, limit_upper=0.5), JointDef(name="r_knee", parent_body=8, child_body=9, axis=(0, 1, 0), limit_lower=-0.1, limit_upper=2.5), - JointDef(name="r_ankle_pitch", parent_body=9, child_body=10, axis=(0, 1, 0), limit_lower=-0.8, limit_upper=0.5), - JointDef(name="r_ankle_roll", parent_body=9, child_body=10, axis=(1, 0, 0), limit_lower=-0.3, limit_upper=0.3), - # Left arm (4 DOF simplified) + JointDef(name="r_ankle_pitch", parent_body=9, child_body=20, axis=(0, 1, 0), limit_lower=-0.8, limit_upper=0.5), + JointDef(name="r_ankle_roll", parent_body=20, child_body=10, axis=(1, 0, 0), limit_lower=-0.3, limit_upper=0.3), + # Left arm (4 DOF simplified). Shoulder-yaw / elbow 2-DOF split via l_elbow_link (21). JointDef( name="l_shoulder_pitch", parent_body=1, child_body=11, axis=(0, 1, 0), limit_lower=-3.14, limit_upper=1.0 ), @@ -232,10 +245,10 @@ def _build_unitree_g1() -> ProceduralRobot: name="l_shoulder_roll", parent_body=11, child_body=12, axis=(1, 0, 0), limit_lower=-0.3, limit_upper=3.14 ), JointDef( - name="l_shoulder_yaw", parent_body=12, child_body=13, axis=(0, 0, 1), limit_lower=-1.5, limit_upper=1.5 + name="l_shoulder_yaw", parent_body=12, child_body=21, axis=(0, 0, 1), limit_lower=-1.5, limit_upper=1.5 ), - JointDef(name="l_elbow", parent_body=12, child_body=13, axis=(0, 1, 0), limit_lower=-2.5, limit_upper=0.0), - # Right arm (4 DOF simplified) + JointDef(name="l_elbow", parent_body=21, child_body=13, axis=(0, 1, 0), limit_lower=-2.5, limit_upper=0.0), + # Right arm (4 DOF simplified). Shoulder-yaw / elbow 2-DOF split via r_elbow_link (22). JointDef( name="r_shoulder_pitch", parent_body=1, child_body=14, axis=(0, 1, 0), limit_lower=-3.14, limit_upper=1.0 ), @@ -243,9 +256,9 @@ def _build_unitree_g1() -> ProceduralRobot: name="r_shoulder_roll", parent_body=14, child_body=15, axis=(1, 0, 0), limit_lower=-3.14, limit_upper=0.3 ), JointDef( - name="r_shoulder_yaw", parent_body=15, child_body=16, axis=(0, 0, 1), limit_lower=-1.5, limit_upper=1.5 + name="r_shoulder_yaw", parent_body=15, child_body=22, axis=(0, 0, 1), limit_lower=-1.5, limit_upper=1.5 ), - JointDef(name="r_elbow", parent_body=15, child_body=16, axis=(0, 1, 0), limit_lower=-2.5, limit_upper=0.0), + JointDef(name="r_elbow", parent_body=22, child_body=16, axis=(0, 1, 0), limit_lower=-2.5, limit_upper=0.0), ] robot = ProceduralRobot(name="unitree_g1", bodies=bodies, joints=joints, base_position=(0.0, 0.0, 0.85)) diff --git a/strands_robots_sim/isaac/tests/test_loaders.py b/strands_robots_sim/isaac/tests/test_loaders.py new file mode 100644 index 0000000..3d4c57f --- /dev/null +++ b/strands_robots_sim/isaac/tests/test_loaders.py @@ -0,0 +1,685 @@ +"""Tests for ``strands_robots_sim.isaac.loaders``. + +Acceptance-criteria coverage (from issue #50): + +* ``load_urdf(path) -> ProceduralRobot`` round-trips a Panda-style URDF. +* ``load_mjcf(path) -> ProceduralRobot`` round-trips a LIBERO-style MJCF. +* ``load_usd(path) -> ProceduralRobot`` round-trips a USD asset (gated + behind the ``[isaac]`` extra → skipped when ``pxr`` is unavailable). +* Parse failure raises explicit :class:`ValueError` with file path + + offending element — closes the #33 class of "phantom robot" silent-zero + bugs. +* DOF count, joint names, joint types, and body parents match the + procedural builders for at least one robot per format (parity test). + +Test fixtures are written into the pytest ``tmp_path`` per test, so this +suite is hermetic — no binary assets in the repo (out-of-scope per the +issue). +""" + +from __future__ import annotations + +import importlib.util +from pathlib import Path + +import pytest + +from strands_robots_sim.isaac import loaders +from strands_robots_sim.isaac.procedural import ( + ProceduralRobot, + _build_panda, + _build_so100, +) + +# --------------------------------------------------------------------------- +# Sample documents +# --------------------------------------------------------------------------- + + +PANDA_URDF = """ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +""" + + +SO100_URDF = """ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +""" + + +# Minimal LIBERO-style MJCF: a 2-body chain (so100 shoulder + upper arm). +LIBERO_MJCF = """ + + + + + + + + + + + + + + + + + + +""" + + +# --------------------------------------------------------------------------- +# URDF loader tests +# --------------------------------------------------------------------------- + + +class TestLoadUrdf: + """Acceptance: ``loaders.load_urdf(path) -> ProceduralRobot`` round-trips Panda.""" + + def test_load_urdf_round_trips_panda(self, tmp_path: Path) -> None: + urdf_path = tmp_path / "panda.urdf" + urdf_path.write_text(PANDA_URDF) + + robot = loaders.load_urdf(str(urdf_path)) + + assert isinstance(robot, ProceduralRobot) + assert robot.name == "panda" + assert len(robot.bodies) == 8 + # 7 revolute joints (DOF count parity with _build_panda). + assert robot.num_joints == 7 + + def test_load_urdf_panda_parity_with_procedural(self, tmp_path: Path) -> None: + """Acceptance: DOF count, joint names, joint types, body parents + match the procedural builder for Panda.""" + urdf_path = tmp_path / "panda.urdf" + urdf_path.write_text(PANDA_URDF) + loaded = loaders.load_urdf(str(urdf_path)) + proc = _build_panda() + + # DOF count parity + assert loaded.num_joints == proc.num_joints, "DOF count diverges" + # Joint names parity (same order in both) + assert loaded.joint_names == proc.joint_names, "joint names diverge" + # Joint types parity + loaded_types = [j.joint_type for j in loaded.joints] + proc_types = [j.joint_type for j in proc.joints] + assert loaded_types == proc_types, "joint types diverge" + # Body parents parity (parent_body / child_body indices identical) + loaded_edges = [(j.parent_body, j.child_body) for j in loaded.joints] + proc_edges = [(j.parent_body, j.child_body) for j in proc.joints] + assert loaded_edges == proc_edges, "body parent/child edges diverge" + + def test_load_urdf_so100_parity_with_procedural(self, tmp_path: Path) -> None: + """Mixed revolute+prismatic parity case (so100's gripper is prismatic).""" + urdf_path = tmp_path / "so100.urdf" + urdf_path.write_text(SO100_URDF) + loaded = loaders.load_urdf(str(urdf_path)) + proc = _build_so100() + + assert loaded.num_joints == proc.num_joints + assert loaded.joint_names == proc.joint_names + loaded_types = [j.joint_type for j in loaded.joints] + proc_types = [j.joint_type for j in proc.joints] + assert loaded_types == proc_types, "prismatic gripper joint should preserve type" + + def test_load_urdf_extracts_axis_and_limits(self, tmp_path: Path) -> None: + urdf_path = tmp_path / "panda.urdf" + urdf_path.write_text(PANDA_URDF) + robot = loaders.load_urdf(str(urdf_path)) + + joint1 = robot.joints[0] + assert joint1.name == "panda_joint1" + assert joint1.axis == (0.0, 0.0, 1.0) + assert joint1.limit_lower == pytest.approx(-2.8973) + assert joint1.limit_upper == pytest.approx(2.8973) + + joint4 = robot.joints[3] + assert joint4.axis == (0.0, -1.0, 0.0) + + +class TestUrdfFailureModes: + """Acceptance: parse failure raises explicit ValueError with file path — + closes #33 (no silent ``joint_count=0`` phantom robots).""" + + def test_missing_file_raises_filenotfound(self, tmp_path: Path) -> None: + with pytest.raises(FileNotFoundError, match="URDF loader: file not found"): + loaders.load_urdf(str(tmp_path / "nope.urdf")) + + def test_malformed_xml_raises_valueerror_with_path(self, tmp_path: Path) -> None: + bad = tmp_path / "bad.urdf" + bad.write_text("") + with pytest.raises(ValueError, match="malformed XML"): + loaders.load_urdf(str(bad)) + + def test_wrong_root_tag_raises_valueerror(self, tmp_path: Path) -> None: + wrong = tmp_path / "wrong.urdf" + wrong.write_text("") + with pytest.raises(ValueError, match="root element must be "): + loaders.load_urdf(str(wrong)) + + def test_zero_links_raises_valueerror(self, tmp_path: Path) -> None: + empty = tmp_path / "empty.urdf" + empty.write_text("") + with pytest.raises(ValueError, match="zero "): + loaders.load_urdf(str(empty)) + + def test_unknown_joint_type_raises_valueerror_with_offending_element(self, tmp_path: Path) -> None: + bad = tmp_path / "weird.urdf" + bad.write_text( + "" + "" + "" + "" + "" + "" + ) + with pytest.raises(ValueError, match="unknown joint type"): + loaders.load_urdf(str(bad)) + + def test_joint_referencing_unknown_link_raises_valueerror(self, tmp_path: Path) -> None: + bad = tmp_path / "danglingref.urdf" + bad.write_text( + "" + "" + "" + "" + "" + "" + "" + ) + with pytest.raises(ValueError, match="unknown child link"): + loaders.load_urdf(str(bad)) + + +# --------------------------------------------------------------------------- +# MJCF loader tests +# --------------------------------------------------------------------------- + + +class TestLoadMjcf: + """Acceptance: ``loaders.load_mjcf(path) -> ProceduralRobot`` round-trips + a LIBERO-style MJCF scene.""" + + def test_load_mjcf_round_trips_libero_like_scene(self, tmp_path: Path) -> None: + mjcf_path = tmp_path / "scene.xml" + mjcf_path.write_text(LIBERO_MJCF) + + robot = loaders.load_mjcf(str(mjcf_path)) + + assert isinstance(robot, ProceduralRobot) + assert robot.name == "so100_lite" + # Synthetic "world" + 3 declared bodies. + assert len(robot.bodies) == 4 + assert robot.bodies[0].name == "world" + # Two hinge joints → two actuated DOF. + assert robot.num_joints == 2 + assert robot.joint_names == ["shoulder_pan", "shoulder_lift"] + + def test_load_mjcf_joint_types_mapped(self, tmp_path: Path) -> None: + mjcf_path = tmp_path / "scene.xml" + mjcf_path.write_text(LIBERO_MJCF) + robot = loaders.load_mjcf(str(mjcf_path)) + types = [j.joint_type for j in robot.joints] + assert types == ["revolute", "revolute"], "hinge → revolute mapping" + + def test_load_mjcf_parent_child_indices_consistent(self, tmp_path: Path) -> None: + """Parent-body should be the synthetic world (idx 0) for the root + body; the second joint connects shoulder_link → upper_arm_link.""" + mjcf_path = tmp_path / "scene.xml" + mjcf_path.write_text(LIBERO_MJCF) + robot = loaders.load_mjcf(str(mjcf_path)) + + # Body indices: 0=world, 1=base_link, 2=shoulder_link, 3=upper_arm_link + # First joint is on shoulder_link (parent: base_link=1 → child=2) + assert robot.joints[0].parent_body == 1 + assert robot.joints[0].child_body == 2 + # Second joint on upper_arm_link (parent: shoulder_link=2 → child=3) + assert robot.joints[1].parent_body == 2 + assert robot.joints[1].child_body == 3 + + def test_load_mjcf_extracts_axis_and_limits(self, tmp_path: Path) -> None: + mjcf_path = tmp_path / "scene.xml" + mjcf_path.write_text(LIBERO_MJCF) + robot = loaders.load_mjcf(str(mjcf_path)) + + assert robot.joints[0].axis == (0.0, 0.0, 1.0) + assert robot.joints[0].limit_lower == pytest.approx(-3.14) + assert robot.joints[0].limit_upper == pytest.approx(3.14) + assert robot.joints[1].axis == (0.0, 1.0, 0.0) + + +class TestMjcfFailureModes: + def test_missing_file_raises_filenotfound(self, tmp_path: Path) -> None: + with pytest.raises(FileNotFoundError, match="MJCF loader"): + loaders.load_mjcf(str(tmp_path / "missing.xml")) + + def test_wrong_root_tag_raises_valueerror(self, tmp_path: Path) -> None: + wrong = tmp_path / "wrong.xml" + wrong.write_text("") + with pytest.raises(ValueError, match="root element must be "): + loaders.load_mjcf(str(wrong)) + + def test_no_worldbody_raises_valueerror(self, tmp_path: Path) -> None: + bad = tmp_path / "noworld.xml" + bad.write_text("") + with pytest.raises(ValueError, match="no "): + loaders.load_mjcf(str(bad)) + + def test_empty_worldbody_raises_valueerror(self, tmp_path: Path) -> None: + empty = tmp_path / "empty.xml" + empty.write_text("") + with pytest.raises(ValueError, match="no "): + loaders.load_mjcf(str(empty)) + + def test_unknown_joint_type_raises_valueerror(self, tmp_path: Path) -> None: + bad = tmp_path / "weird.xml" + bad.write_text( + "" + "" + "" + ) + with pytest.raises(ValueError, match="unknown joint type"): + loaders.load_mjcf(str(bad)) + + def test_malformed_xml_raises_valueerror_with_path(self, tmp_path: Path) -> None: + bad = tmp_path / "bad.xml" + bad.write_text("") + with pytest.raises(ValueError, match="malformed XML"): + loaders.load_mjcf(str(bad)) + + +# --------------------------------------------------------------------------- +# USD loader tests +# --------------------------------------------------------------------------- + + +_HAS_PXR = importlib.util.find_spec("pxr") is not None + + +@pytest.mark.skipif(not _HAS_PXR, reason="USD loader requires Pixar USD ([isaac] extra)") +class TestLoadUsd: + """Acceptance: ``loaders.load_usd(path) -> ProceduralRobot`` round-trips a USD + asset (gated behind the ``[isaac]`` extra; skipped when unavailable).""" + + @staticmethod + def _build_two_body_revolute_stage(out_path: str) -> None: + """Create a minimal USD stage: two rigid bodies connected by a revolute joint.""" + from pxr import Sdf, Usd, UsdGeom, UsdPhysics # type: ignore + + stage = Usd.Stage.CreateNew(out_path) + UsdGeom.SetStageUpAxis(stage, UsdGeom.Tokens.z) + + # Two cube prims with RigidBodyAPI applied. + for body_path in ("/World/base", "/World/link1"): + prim = UsdGeom.Cube.Define(stage, body_path).GetPrim() + UsdPhysics.RigidBodyAPI.Apply(prim) + mass_api = UsdPhysics.MassAPI.Apply(prim) + mass_api.GetMassAttr().Set(1.5 if body_path.endswith("base") else 0.7) + + # Revolute joint between them, axis Z, limits ±1.0 rad. + joint = UsdPhysics.RevoluteJoint.Define(stage, "/World/joint1") + joint.GetBody0Rel().SetTargets([Sdf.Path("/World/base")]) + joint.GetBody1Rel().SetTargets([Sdf.Path("/World/link1")]) + joint.GetAxisAttr().Set("Z") + joint.GetLowerLimitAttr().Set(-1.0) + joint.GetUpperLimitAttr().Set(1.0) + + stage.GetRootLayer().Save() + + def test_load_usd_round_trips_two_body_revolute(self, tmp_path: Path) -> None: + usd_path = tmp_path / "stage.usda" + self._build_two_body_revolute_stage(str(usd_path)) + + robot = loaders.load_usd(str(usd_path)) + + assert isinstance(robot, ProceduralRobot) + assert len(robot.bodies) == 2 + assert robot.num_joints == 1 + joint = robot.joints[0] + assert joint.joint_type == "revolute" + assert joint.axis == (0.0, 0.0, 1.0) + assert joint.limit_lower == pytest.approx(-1.0) + assert joint.limit_upper == pytest.approx(1.0) + + def test_load_usd_extracts_mass_from_mass_api(self, tmp_path: Path) -> None: + usd_path = tmp_path / "stage.usda" + self._build_two_body_revolute_stage(str(usd_path)) + robot = loaders.load_usd(str(usd_path)) + masses = sorted(b.mass for b in robot.bodies) + # Authored masses 0.7 and 1.5. + assert masses == pytest.approx([0.7, 1.5]) + + +class TestUsdFailureModes: + def test_missing_file_raises_filenotfound(self, tmp_path: Path) -> None: + with pytest.raises(FileNotFoundError, match="USD loader"): + loaders.load_usd(str(tmp_path / "missing.usda")) + + @pytest.mark.skipif(_HAS_PXR, reason="exercise the no-pxr path only when pxr is unavailable") + def test_load_usd_without_pxr_raises_importerror(self, tmp_path: Path) -> None: + # Touch a real path so we hit the lazy import (not the file-not-found guard). + path = tmp_path / "stage.usda" + path.write_text("dummy") + with pytest.raises(ImportError, match=r"strands-robots-sim\[isaac\]"): + loaders.load_usd(str(path)) + + @pytest.mark.skipif(not _HAS_PXR, reason="needs pxr to construct the malformed stage scenario") + def test_load_usd_zero_rigid_bodies_raises_valueerror(self, tmp_path: Path) -> None: + from pxr import Usd, UsdGeom # type: ignore + + usd_path = tmp_path / "empty.usda" + stage = Usd.Stage.CreateNew(str(usd_path)) + # A pure-geometry prim with no RigidBodyAPI applied. + UsdGeom.Cube.Define(stage, "/World/decorative") + stage.GetRootLayer().Save() + + with pytest.raises(ValueError, match="zero rigid bodies"): + loaders.load_usd(str(usd_path)) + + @pytest.mark.skipif(not _HAS_PXR, reason="needs pxr to construct the malformed stage scenario") + def test_load_usd_joint_dangling_body_ref_raises_valueerror(self, tmp_path: Path) -> None: + from pxr import Sdf, Usd, UsdGeom, UsdPhysics # type: ignore + + usd_path = tmp_path / "dangling.usda" + stage = Usd.Stage.CreateNew(str(usd_path)) + prim = UsdGeom.Cube.Define(stage, "/World/base").GetPrim() + UsdPhysics.RigidBodyAPI.Apply(prim) + # The joint references body0 OK, but body1 is a non-rigid prim. + not_rigid = UsdGeom.Cube.Define(stage, "/World/decorative").GetPrim() # noqa: F841 + joint = UsdPhysics.RevoluteJoint.Define(stage, "/World/joint") + joint.GetBody0Rel().SetTargets([Sdf.Path("/World/base")]) + joint.GetBody1Rel().SetTargets([Sdf.Path("/World/decorative")]) + stage.GetRootLayer().Save() + + with pytest.raises(ValueError, match="not a rigid body"): + loaders.load_usd(str(usd_path)) + + +# --------------------------------------------------------------------------- +# Cross-format invariants +# --------------------------------------------------------------------------- + + +class TestLoadersAreImportable: + """The loaders module must be importable on a clean environment with + no Pixar USD / mujoco / urdfpy installed (issue's recommendation for + keeping procedural builders as zero-dep fallback).""" + + def test_loaders_module_imports_with_only_stdlib(self) -> None: + # If this import succeeded above (file-level), we already passed. + assert hasattr(loaders, "load_urdf") + assert hasattr(loaders, "load_mjcf") + assert hasattr(loaders, "load_usd") + + def test_load_urdf_does_not_require_pxr(self, tmp_path: Path) -> None: + """URDF + MJCF parsers must work without pxr; they only need stdlib.""" + urdf_path = tmp_path / "p.urdf" + urdf_path.write_text(PANDA_URDF) + # Just verify the call succeeds; the implementation must not have + # imported pxr at module level (already verified above by importing + # ``loaders`` itself in this test file). + robot = loaders.load_urdf(str(urdf_path)) + assert robot.num_joints == 7 + + +# --------------------------------------------------------------------------- +# Real-asset parity: robosuite robots that strands-robots' LIBERO adapter +# consumes via MJCF must round-trip through ``load_mjcf`` cleanly. +# +# Closes cagataycali's review on PR #51 asking for "tests for verifying the +# robots we have in strands-robots to smoothly maps into isaac". +# +# The strands-robots LIBERO adapter uses robosuite's bundled MJCF assets +# (``robosuite/models/assets/robots//robot.xml``) for the seven +# robot embodiments LIBERO ships against: baxter / iiwa / jaco / kinova3 +# / panda / sawyer / ur5e. Loading each via ``load_mjcf`` proves the +# loader handles the wire format the rest of the strands ecosystem +# already produces -- not just the synthetic fixtures above. +# +# The ``robosuite`` package is an optional / heavy dep (pulls mujoco + +# numpy etc.); these tests skip when it's not on PYTHONPATH, mirroring +# the ``pxr`` import-guard pattern on ``TestLoadUsd``. +# --------------------------------------------------------------------------- + + +_ROBOSUITE_ROBOT_EMBODIMENTS = { + "panda": {"min_joints": 7, "max_joints": 7}, # Franka Panda 7-DOF arm + "iiwa": {"min_joints": 7, "max_joints": 7}, # KUKA LBR iiwa 7-DOF + "kinova3": {"min_joints": 7, "max_joints": 7}, # Kinova Gen3 7-DOF + "jaco": {"min_joints": 7, "max_joints": 7}, # Kinova Jaco 7-DOF + "sawyer": {"min_joints": 7, "max_joints": 7}, # Rethink Sawyer 7-DOF + "ur5e": {"min_joints": 6, "max_joints": 6}, # Universal Robots UR5e 6-DOF + "baxter": {"min_joints": 14, "max_joints": 14}, # Rethink Baxter dual 7-DOF arms +} + + +def _robosuite_robot_xml_path(robot_name: str) -> Path | None: + """Return the path to the robosuite-bundled MJCF for ``robot_name``. + + Returns ``None`` if ``robosuite`` isn't installed, so each test can + skip individually without a session-level fixture. + """ + try: + import robosuite + except ImportError: + return None + rs_root = Path(robosuite.__file__).parent + candidate = rs_root / "models" / "assets" / "robots" / robot_name / "robot.xml" + return candidate if candidate.is_file() else None + + +_HAS_ROBOSUITE = _robosuite_robot_xml_path("panda") is not None + + +@pytest.mark.skipif( + not _HAS_ROBOSUITE, + reason="robosuite not installed; real-asset parity tests are gated on the optional dep", +) +class TestRobosuiteMjcfParity: + """Pin: every robot embodiment LIBERO consumes via robosuite's bundled + MJCFs must round-trip through ``load_mjcf`` and produce a sensible + ProceduralRobot with the documented joint count. + + Locking the joint counts catches two failure modes at once: + + 1. **Loader regression** (e.g. revolute / prismatic / fixed joint + handling drifts under future refactors) — would surface as a + ``num_joints`` mismatch on a known robot. + 2. **strands-robots upstream change** (e.g. robosuite ships a + different MJCF schema or renames the asset) — would surface as + a missing-file skip in CI rather than a silent zero-joints + phantom robot (the #33-class failure). + """ + + @pytest.mark.parametrize("robot_name", sorted(_ROBOSUITE_ROBOT_EMBODIMENTS)) + def test_robosuite_robot_loads_cleanly(self, robot_name: str) -> None: + """Each robosuite-bundled MJCF must load without raising.""" + xml_path = _robosuite_robot_xml_path(robot_name) + if xml_path is None: + pytest.skip(f"robosuite asset for {robot_name!r} not present (custom robosuite install?)") + + robot = loaders.load_mjcf(str(xml_path)) + + assert robot.name, f"loaded {robot_name!r} robot has empty name (MJCF model attribute missing?)" + assert len(robot.bodies) > 0, f"loaded {robot_name!r} robot has zero bodies" + # Every body referenced by a joint must be a real body in the dataclass. + body_count = len(robot.bodies) + for j in robot.joints: + assert 0 <= j.parent_body < body_count, ( + f"{robot_name}: joint {j.name!r} parent_body={j.parent_body} out of range " + f"(robot has {body_count} bodies)" + ) + assert 0 <= j.child_body < body_count, ( + f"{robot_name}: joint {j.name!r} child_body={j.child_body} out of range " + f"(robot has {body_count} bodies)" + ) + + @pytest.mark.parametrize("robot_name", sorted(_ROBOSUITE_ROBOT_EMBODIMENTS)) + def test_robosuite_robot_joint_count_matches_spec(self, robot_name: str) -> None: + """Loaded joint count must match the documented DOF for the embodiment. + + Locking these prevents silent regressions where a loader change + drops a joint type or duplicates one. Spec values are based on + each robot's published kinematic spec (from robosuite's docs). + """ + xml_path = _robosuite_robot_xml_path(robot_name) + if xml_path is None: + pytest.skip(f"robosuite asset for {robot_name!r} not present") + + spec = _ROBOSUITE_ROBOT_EMBODIMENTS[robot_name] + robot = loaders.load_mjcf(str(xml_path)) + + assert spec["min_joints"] <= robot.num_joints <= spec["max_joints"], ( + f"{robot_name}: loaded {robot.num_joints} joints, expected " + f"{spec['min_joints']}-{spec['max_joints']}. Either the loader regressed " + f"or robosuite's MJCF for this robot changed; verify the spec table above " + f"against the upstream asset." + ) + + @pytest.mark.parametrize("robot_name", sorted(_ROBOSUITE_ROBOT_EMBODIMENTS)) + def test_robosuite_robot_joints_are_revolute(self, robot_name: str) -> None: + """Every robosuite arm uses revolute joints; the loader must reflect that. + + All seven robots are revolute-only manipulators (no prismatic + actuation in the standard arm bodies). If the loader misclassifies + a hinge as prismatic or fixed, this test catches it. + """ + xml_path = _robosuite_robot_xml_path(robot_name) + if xml_path is None: + pytest.skip(f"robosuite asset for {robot_name!r} not present") + + robot = loaders.load_mjcf(str(xml_path)) + joint_types = {j.joint_type for j in robot.joints if j.joint_type != "fixed"} + assert joint_types == {"revolute"}, ( + f"{robot_name}: expected all actuated joints to be revolute; got {sorted(joint_types)}. " + f"The loader's hinge -> revolute mapping may be misclassifying joints." + ) + + def test_all_embodiments_at_least_load(self) -> None: + """Sanity check: every robosuite robot in the spec table loads. + + If this fails on a particular robot, the parametrized tests above + will give a more specific diagnostic. This test exists to make + the failure visible at a glance even when the parametrized cases + scroll off-screen. + """ + failures = [] + for robot_name in _ROBOSUITE_ROBOT_EMBODIMENTS: + xml_path = _robosuite_robot_xml_path(robot_name) + if xml_path is None: + continue + try: + loaders.load_mjcf(str(xml_path)) + except Exception as exc: # noqa: BLE001 - aggregate failures + failures.append(f" {robot_name}: {type(exc).__name__}: {exc}") + + assert not failures, ( + "robosuite-bundled MJCFs failed to load:\n" + + "\n".join(failures) + + "\nstrands-robots' LIBERO adapter consumes these directly; a regression here " + "would silently break the matrix's mujoco baseline for any of the affected robots." + ) diff --git a/strands_robots_sim/isaac/tests/test_procedural_g1_dof.py b/strands_robots_sim/isaac/tests/test_procedural_g1_dof.py index b5d1a4b..ca71e98 100644 --- a/strands_robots_sim/isaac/tests/test_procedural_g1_dof.py +++ b/strands_robots_sim/isaac/tests/test_procedural_g1_dof.py @@ -11,10 +11,13 @@ ``test_phase1_doc_banner.py`` (lands in PR-5 alongside the docs file itself). -Note: this pin does NOT cover the kinematic-tree topology defect on the -G1 joint graph (duplicate ``(parent_body, child_body)`` edges); that is -a Phase 2 fix that needs intermediate massless link bodies, and is -documented in ``procedural.py`` line 165 as deferred. +The kinematic-tree topology invariant (each non-root link has exactly +one inbound joint; G1 splits its 2-DOF compound joints through six +intermediate massless link bodies) is pinned in this same PR's companion +file ``test_procedural_kinematic_guard.py``. Split by concern: this +file pins doc/comment honesty against a stale literal value, that file +pins the invariant the literal value is asserting against. Both run on +every CI invocation; neither is deferred. """ from __future__ import annotations diff --git a/strands_robots_sim/isaac/tests/test_procedural_kinematic_guard.py b/strands_robots_sim/isaac/tests/test_procedural_kinematic_guard.py index 692c15b..8bb9e3e 100644 --- a/strands_robots_sim/isaac/tests/test_procedural_kinematic_guard.py +++ b/strands_robots_sim/isaac/tests/test_procedural_kinematic_guard.py @@ -1,100 +1,114 @@ -"""Phase-1 fail-fast guard pin: ``_validate_kinematic_tree``. - -cagataycali's review on PR #46 (procedural USD builders) flagged that -the documented duplicate-edge defect on G1 is silently returned by -``_build_unitree_g1`` and would only surface as a cryptic articulation -error when Phase 2 wires up USD instantiation. The guard is opt-in via -``STRANDS_ISAAC_VALIDATE_KINEMATICS`` so it does not break Phase-1 -callers (which never instantiate the articulation), but it MUST fire on -the known defect when Phase-2 development flips the env var on. - -This test pins three properties: - -1. Default behaviour: ``get_procedural_robot("unitree_g1")`` returns a - robot regardless of the topology defect (the env var defaults off). -2. When ``STRANDS_ISAAC_VALIDATE_KINEMATICS=1`` is set, building G1 - raises ``ValueError`` mentioning duplicate edges + the offending - joint names (so Phase 2 sees the defect at builder time with full - diagnostic context). -3. SO-100 and Panda still build cleanly under the same env var -- - their topologies are already valid trees, so the guard must be a - no-op on them. - -The guard intentionally lives in ``procedural.py``; pinning the -behaviour here means a future refactor that drops the guard or relaxes -its trigger logic will fail this test rather than silently regressing. +"""Fail-first kinematic-tree pin: ``_validate_kinematic_tree``. + +cagataycali's review on PR #46 first asked for a guard against the documented +duplicate-edge defect on G1 (which would have surfaced as a cryptic USD / +MuJoCo articulation error two layers down). The first round of that fix added +the guard but kept it opt-in via ``STRANDS_ISAAC_VALIDATE_KINEMATICS=1`` to +preserve the Phase-1-callers-don't-instantiate fiction. cagataycali's +follow-up review on PR #51 pushed back: shipping a robot we know cannot +instantiate has no good use case in this package, and the default should +fail first rather than silently producing a broken robot. + +This pin enforces the fail-first contract: + +1. ``_validate_kinematic_tree`` runs unconditionally on every procedural + builder. There is no env-var escape hatch. +2. All three shipped procedural robots (``so100``, ``panda``, ``unitree_g1``) + build cleanly under that contract -- so the G1 builder must keep its + topology a valid tree (intermediate massless link bodies between the + 2-DOF compound joints), not just punt the check off behind a flag. +3. A topology defect injected at builder time still surfaces with the body + indices + joint names so the offender is obvious from the traceback alone. + +A future refactor that re-introduces a duplicate ``(parent, child)`` edge +or relaxes the guard will fail this pin rather than silently regressing. """ from __future__ import annotations -import importlib -import os - import pytest - -def _reload_procedural(): - """Re-import ``procedural`` with whatever env state is currently set. - - The module's lazy registry caches built robots on first call, so - we reload it between tests to make the env-var check authoritative - on each build. - """ - import strands_robots_sim.isaac.procedural as procedural - - return importlib.reload(procedural) - - -class TestKinematicGuard: - """Pin: ``_validate_kinematic_tree`` opt-in semantics + G1 trigger.""" - - def test_g1_builds_by_default_when_guard_disabled(self, monkeypatch: pytest.MonkeyPatch) -> None: - """Without the env var, G1 builds fine despite the documented defect.""" - monkeypatch.delenv("STRANDS_ISAAC_VALIDATE_KINEMATICS", raising=False) - procedural = _reload_procedural() - robot = procedural.get_procedural_robot("unitree_g1") +from strands_robots_sim.isaac import procedural +from strands_robots_sim.isaac.procedural import ( + BodyDef, + JointDef, + ProceduralRobot, + _build_panda, + _build_so100, + _build_unitree_g1, + _validate_kinematic_tree, + get_procedural_robot, +) + + +class TestKinematicGuardFailFirst: + """Pin: ``_validate_kinematic_tree`` validates by default; no env-var gating.""" + + def test_g1_builds_cleanly_by_default(self) -> None: + """G1 must build without raising under the default fail-first contract.""" + robot = get_procedural_robot("unitree_g1") assert robot is not None assert robot.name == "unitree_g1" + # Six 2-DOF compound joints (hips, ankles, shoulder-yaw/elbow on each arm) + # are split through massless intermediate ``*_link`` bodies, so the + # actuated joint count stays 21. assert robot.num_joints == 21 - @pytest.mark.parametrize("env_value", ["1", "true", "yes", "TRUE", "Yes"]) - def test_g1_raises_when_guard_enabled(self, monkeypatch: pytest.MonkeyPatch, env_value: str) -> None: - """With the env var on, building G1 raises with the duplicate edges named.""" - monkeypatch.setenv("STRANDS_ISAAC_VALIDATE_KINEMATICS", env_value) - procedural = _reload_procedural() - with pytest.raises(ValueError) as excinfo: - procedural._build_unitree_g1() - msg = str(excinfo.value) - assert "unitree_g1" in msg - assert "duplicate parent->child body edges" in msg - # At least one of the documented duplicate-edge joint pairs must - # be named in the diagnostic so Phase-2 callers know which joints - # to split. - assert "l_hip_roll" in msg or "l_hip_pitch" in msg - - @pytest.mark.parametrize("env_value", ["", "0", "false", "no", "off"]) - def test_g1_guard_off_for_non_truthy_env_values(self, monkeypatch: pytest.MonkeyPatch, env_value: str) -> None: - """Only ``1``/``true``/``yes`` enable the guard; everything else is a no-op.""" - monkeypatch.setenv("STRANDS_ISAAC_VALIDATE_KINEMATICS", env_value) - procedural = _reload_procedural() - # Should not raise. - robot = procedural._build_unitree_g1() - assert robot.name == "unitree_g1" - - def test_so100_and_panda_pass_guard(self, monkeypatch: pytest.MonkeyPatch) -> None: - """SO-100 and Panda are valid trees; guard must accept them when enabled.""" - monkeypatch.setenv("STRANDS_ISAAC_VALIDATE_KINEMATICS", "1") - procedural = _reload_procedural() - # Both must build without raising; the guard is wired into the end of - # each ``_build_*`` so a regression that tightens the check too far - # would surface here. - so100 = procedural._build_so100() - panda = procedural._build_panda() + def test_g1_topology_has_no_duplicate_edges(self) -> None: + """The shipped G1 kinematic graph must already be a valid tree.""" + robot = _build_unitree_g1() + edges = [(j.parent_body, j.child_body) for j in robot.joints] + assert len(edges) == len(set(edges)), ( + f"G1 kinematic graph has duplicate (parent, child) edges: " + f"{[e for e in edges if edges.count(e) > 1]}. The builder must " + f"insert intermediate massless link bodies for compound joints." + ) + + def test_so100_and_panda_build_cleanly(self) -> None: + """SO-100 and Panda are valid trees under the default guard.""" + so100 = _build_so100() + panda = _build_panda() assert so100.name == "so100" assert panda.name == "panda" - -def teardown_module(module: object) -> None: - """Restore the module to its default state for downstream tests.""" - os.environ.pop("STRANDS_ISAAC_VALIDATE_KINEMATICS", None) - _reload_procedural() + def test_guard_raises_on_injected_duplicate_edge(self) -> None: + """A robot with a duplicate edge must raise with diagnostic context.""" + bad = ProceduralRobot( + name="broken_robot", + bodies=[ + BodyDef(name="root"), + BodyDef(name="child"), + ], + joints=[ + JointDef(name="joint_a", parent_body=0, child_body=1, axis=(1, 0, 0)), + JointDef(name="joint_b", parent_body=0, child_body=1, axis=(0, 1, 0)), + ], + ) + with pytest.raises(ValueError) as excinfo: + _validate_kinematic_tree(bad) + msg = str(excinfo.value) + assert "broken_robot" in msg + assert "duplicate parent->child body edges" in msg + # Both offending joint names must be named so the caller knows which + # joints to split through an intermediate massless link body. + assert "joint_a" in msg + assert "joint_b" in msg + + def test_guard_has_no_env_var_escape_hatch(self) -> None: + """The fail-first guard must not be gateable by an env var. + + Pin ensures ``procedural.py`` never re-introduces an opt-in switch: + a knowingly-broken robot has no good use case in this package, so + the validation is unconditional. A future refactor that adds an env + var here will fail this test. + """ + import inspect + + source = inspect.getsource(procedural) + assert "STRANDS_ISAAC_VALIDATE_KINEMATICS" not in source, ( + "procedural.py re-introduced the STRANDS_ISAAC_VALIDATE_KINEMATICS " + "env-var gate; the fail-first guard must run unconditionally." + ) + assert "os.environ" not in source, ( + "procedural.py reads from os.environ; the kinematic-tree guard " "must not be conditioned on env state." + )