|
| 1 | +# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). |
| 2 | +# All rights reserved. |
| 3 | +# |
| 4 | +# SPDX-License-Identifier: BSD-3-Clause |
| 5 | + |
| 6 | +"""Shared multi-mesh ray-caster target-tracker prep. |
| 7 | +
|
| 8 | +Both backends need: the covering :class:`~isaaclab.cloner.ClonePlan`, one rigid-body |
| 9 | +ancestor + body→mesh offset per prototype, and the per-env prototype assignment from |
| 10 | +``clone_mask``. Backends differ only in what they do with the result (PhysX: |
| 11 | +``RigidObjectView``; Newton: sites). |
| 12 | +""" |
| 13 | + |
| 14 | +from __future__ import annotations |
| 15 | + |
| 16 | +import re |
| 17 | + |
| 18 | +from pxr import UsdPhysics |
| 19 | + |
| 20 | +import isaaclab.sim as sim_utils |
| 21 | +from isaaclab.cloner import ClonePlan |
| 22 | + |
| 23 | +_ENV_NAMESPACE_RE = re.compile(r"^/World/envs/env_(\d+)/(.+)$") |
| 24 | + |
| 25 | + |
| 26 | +def resolve_rigid_body_anchor(prim) -> tuple[object, list[float]] | None: |
| 27 | + """Return ``(rigid_body_ancestor, [px,py,pz,qx,qy,qz,qw])`` or ``None``. |
| 28 | +
|
| 29 | + Offset is ``prim`` resolved relative to ``ancestor`` (the constant body→prim |
| 30 | + transform). Sensor body tracker uses ``None`` as a static-parent signal; target |
| 31 | + tracker treats ``None`` as a contract violation (tracked targets need rigid-body |
| 32 | + anchors). |
| 33 | + """ |
| 34 | + ancestor = sim_utils.get_first_matching_ancestor_prim( |
| 35 | + prim.GetPath(), predicate=lambda p: p.HasAPI(UsdPhysics.RigidBodyAPI) |
| 36 | + ) |
| 37 | + if ancestor is None: |
| 38 | + return None |
| 39 | + pos, quat = sim_utils.resolve_prim_pose(prim, ancestor) |
| 40 | + return ancestor, [*pos, *quat] |
| 41 | + |
| 42 | + |
| 43 | +def split_env_path(path: str) -> tuple[int | None, str]: |
| 44 | + """Split ``/World/envs/env_<N>/<rest>`` → ``(N, <rest>)``; ``(None, path)`` for globals. |
| 45 | +
|
| 46 | + PhysX rebuilds the full env path (``f"/World/envs/env_{e}/{rest}"``) for the |
| 47 | + ``RigidObjectView``; Newton uses ``rest`` as the proto-local body label for |
| 48 | + :meth:`~isaaclab_newton.physics.NewtonManager.cl_register_site`. |
| 49 | + """ |
| 50 | + m = _ENV_NAMESPACE_RE.match(path) |
| 51 | + return (int(m.group(1)), m.group(2)) if m else (None, path) |
| 52 | + |
| 53 | + |
| 54 | +def walk_target_prototypes( |
| 55 | + target_prim_expr: str, |
| 56 | + plans: list[ClonePlan], |
| 57 | + num_envs: int, |
| 58 | + ctx_path: str, |
| 59 | +) -> tuple[list[list[tuple[str, list[float]]]], list[int]]: |
| 60 | + """Walk one cloned env per prototype; return ``(per_proto_entries, env_proto_idx)``. |
| 61 | +
|
| 62 | + ``per_proto_entries[p]`` is a list of ``(body_full_path_in_first_env, [px..qw])`` |
| 63 | + for prototype ``p`` (empty if unused). ``env_proto_idx[e]`` is the prototype index |
| 64 | + env ``e`` uses. Plan resolution: global targets (no ``/env_`` token) need no plan; |
| 65 | + env-replicated targets must be covered (raises otherwise — env-0 fallback would |
| 66 | + silently mis-track heterogeneous scenes). |
| 67 | + """ |
| 68 | + plan = _find_covering_plan(target_prim_expr, plans, ctx_path) |
| 69 | + |
| 70 | + if plan is not None: |
| 71 | + num_protos = plan.clone_mask.size(0) |
| 72 | + first_env_per_proto = [int(plan.clone_mask[p].nonzero(as_tuple=False)[0].item()) for p in range(num_protos)] |
| 73 | + env_proto_idx = [int(plan.clone_mask[:, e].nonzero(as_tuple=False)[0].item()) for e in range(num_envs)] |
| 74 | + else: |
| 75 | + first_env_per_proto = [0] |
| 76 | + env_proto_idx = [0] * num_envs |
| 77 | + |
| 78 | + per_proto_entries: list[list[tuple[str, list[float]]]] = [[] for _ in range(len(first_env_per_proto))] |
| 79 | + used_proto_indices = set(env_proto_idx) |
| 80 | + for proto_idx, first_env in enumerate(first_env_per_proto): |
| 81 | + if proto_idx not in used_proto_indices: |
| 82 | + continue |
| 83 | + env_pattern = target_prim_expr.replace("/env_.*/", f"/env_{first_env}/") |
| 84 | + prims = sim_utils.find_matching_prims(env_pattern) |
| 85 | + if not prims and env_pattern == target_prim_expr: |
| 86 | + prims = sim_utils.find_matching_prims(target_prim_expr) # global, no env tag |
| 87 | + for prim in prims: |
| 88 | + anchor = resolve_rigid_body_anchor(prim) |
| 89 | + if anchor is None: |
| 90 | + raise RuntimeError( |
| 91 | + f"MultiMeshRayCaster '{ctx_path}': tracked target prim '{prim.GetPath()}'" |
| 92 | + " has no rigid-body ancestor. Disable track_mesh_transforms if static." |
| 93 | + ) |
| 94 | + ancestor, offset = anchor |
| 95 | + per_proto_entries[proto_idx].append((str(ancestor.GetPath()), offset)) |
| 96 | + return per_proto_entries, env_proto_idx |
| 97 | + |
| 98 | + |
| 99 | +def _find_covering_plan(prim_expr: str, plans: list[ClonePlan], ctx_path: str) -> ClonePlan | None: |
| 100 | + """Most-specific plan (longest ``dest_template`` prefix) covering ``prim_expr``, or ``None``. |
| 101 | +
|
| 102 | + ``None`` for globals (no ``/env_`` token, no plan needed). Raises for env-replicated |
| 103 | + targets with no covering plan — silent fallback would mis-track heterogeneous scenes. |
| 104 | + """ |
| 105 | + if "/env_" not in prim_expr: |
| 106 | + return None |
| 107 | + best: ClonePlan | None = None |
| 108 | + best_len = -1 |
| 109 | + for plan in plans: |
| 110 | + prefix = plan.dest_template.split("{}")[0] |
| 111 | + if prim_expr.startswith(prefix) and len(prefix) > best_len: |
| 112 | + best, best_len = plan, len(prefix) |
| 113 | + if best is None: |
| 114 | + raise RuntimeError( |
| 115 | + f"MultiMeshRayCaster '{ctx_path}': tracked target '{prim_expr}' is env-replicated" |
| 116 | + " (contains '/env_') but no ClonePlan covers it. Tracked env-replicated targets" |
| 117 | + " must be produced by the cloner." |
| 118 | + ) |
| 119 | + return best |
0 commit comments