2929_PANDARSCAN_MSGTYPE = "pandar_msgs/msg/PandarScan"
3030_TF_STATIC_TOPIC = "/tf_static"
3131
32+ _DEFAULT_TARGET_FRAME = "base_link"
3233_SUPPORTED_MSGTYPES = {_POINTCLOUD2_MSGTYPE , _PANDARSCAN_MSGTYPE }
3334
3435logger = logging .getLogger (__name__ )
@@ -43,20 +44,27 @@ def _quat_to_matrix(x: float, y: float, z: float, w: float) -> np.ndarray:
4344 ])
4445
4546
46- def _build_tf_to_base (typestore : object , reader : Reader ) -> dict [str , tuple [np .ndarray , np .ndarray ]]:
47- """Read /tf_static and compute transforms from each frame to base_link.
47+ def _make_transform (R : np .ndarray , t : np .ndarray ) -> np .ndarray :
48+ """Build a 4x4 homogeneous transform from a 3x3 rotation and 3-vector."""
49+ T = np .eye (4 )
50+ T [:3 , :3 ] = R
51+ T [:3 , 3 ] = t
52+ return T
53+
54+
55+ def _read_tf_static (typestore : object , reader : Reader ) -> dict [str , tuple [str , np .ndarray ]]:
56+ """Read ``/tf_static`` and return per-edge transforms.
4857
4958 Returns:
50- Dict mapping ``frame_id `` to ``(R, t )`` where ``R`` is a 3x3 rotation
51- matrix and ``t `` is a 3-element translation vector, representing the
52- transform from that frame to ``base_link ``.
59+ Dict mapping ``child_frame_id `` to ``(parent_frame_id, T_4x4 )``
60+ where ``T_4x4 `` is the 4x4 homogeneous matrix satisfying
61+ ``p_parent = T_4x4 @ p_child ``.
5362 """
5463 tf_conns = [c for c in reader .connections if c .topic == _TF_STATIC_TOPIC ]
5564 if not tf_conns :
5665 return {}
5766
58- # Parse the TF tree: child_frame -> (parent_frame, R, t)
59- tree : dict [str , tuple [str , np .ndarray , np .ndarray ]] = {}
67+ tree : dict [str , tuple [str , np .ndarray ]] = {}
6068 for conn , _ts , rawdata in reader .messages (connections = tf_conns ):
6169 msg = typestore .deserialize_cdr (rawdata , conn .msgtype )
6270 for tf in msg .transforms :
@@ -66,32 +74,52 @@ def _build_tf_to_base(typestore: object, reader: Reader) -> dict[str, tuple[np.n
6674 rot = tf .transform .rotation
6775 R = _quat_to_matrix (rot .x , rot .y , rot .z , rot .w )
6876 t = np .array ([tr .x , tr .y , tr .z ])
69- tree [child ] = (parent , R , t )
70-
71- # Compose transforms from each frame to base_link
72- result : dict [str , tuple [np .ndarray , np .ndarray ]] = {}
73- result ["base_link" ] = (np .eye (3 ), np .zeros (3 ))
74-
75- def _resolve (frame : str ) -> tuple [np .ndarray , np .ndarray ] | None :
76- if frame in result :
77- return result [frame ]
78- if frame not in tree :
79- return None
80- parent , R_child , t_child = tree [frame ]
81- parent_tf = _resolve (parent )
82- if parent_tf is None :
83- return None
84- R_parent , t_parent = parent_tf
85- # T_base = T_parent * T_child: p_base = R_parent*(R_child*p + t_child) + t_parent
86- R = R_parent @ R_child
87- t = R_parent @ t_child + t_parent
88- result [frame ] = (R , t )
89- return result [frame ]
90-
91- for frame in tree :
92- _resolve (frame )
93-
94- return result
77+ tree [child ] = (parent , _make_transform (R , t ))
78+
79+ return tree
80+
81+
82+ def _resolve_chain (
83+ tree : dict [str , tuple [str , np .ndarray ]],
84+ frame_id : str ,
85+ target_frame : str = _DEFAULT_TARGET_FRAME ,
86+ ) -> np .ndarray :
87+ """Walk the TF tree from *frame_id* up to *target_frame* and compose transforms.
88+
89+ Args:
90+ tree: Per-edge TF dict from :func:`_read_tf_static`.
91+ frame_id: Source sensor frame (e.g. ``"hesai_top"``).
92+ target_frame: Destination frame. Defaults to ``"base_link"``.
93+
94+ Returns:
95+ 4x4 homogeneous matrix ``sensor2ego`` such that
96+ ``p_target = sensor2ego @ p_sensor``.
97+
98+ Raises:
99+ ValueError: If no chain exists from *frame_id* to *target_frame*.
100+ """
101+ if frame_id == target_frame :
102+ return np .eye (4 )
103+
104+ visited : set [str ] = set ()
105+ T = np .eye (4 )
106+ current = frame_id
107+ while current != target_frame :
108+ if current in visited :
109+ break
110+ visited .add (current )
111+ if current not in tree :
112+ break
113+ parent , T_edge = tree [current ]
114+ T = T_edge @ T
115+ current = parent
116+ else :
117+ return T
118+
119+ raise ValueError (
120+ f"No TF chain from '{ frame_id } ' to '{ target_frame } '. "
121+ f"Available frames: { sorted (tree .keys ())} "
122+ )
95123
96124
97125class Rosbag2Reader :
@@ -184,18 +212,20 @@ def __init__(
184212 for conn in self ._connections :
185213 self ._topic_connections .setdefault (conn .topic , []).append (conn )
186214
187- # Read /tf_static and build transforms to base_link
188- self ._tf_to_base = _build_tf_to_base (self ._typestore , self ._reader )
215+ # Read /tf_static and build per-edge TF tree
216+ self ._tf_tree = _read_tf_static (self ._typestore , self ._reader )
189217
190- # For PandarScan topics with frame_id specified, look up TF transform
191- self ._channel_tf : dict [str , tuple [ np .ndarray , np . ndarray ] ] = {}
218+ # Pre-compute sensor2ego for channels with frame_id
219+ self ._channel_sensor2ego : dict [str , np .ndarray ] = {}
192220 if topic_mapping is not None :
193221 for m in topic_mapping :
194222 if m .frame_id is None :
195223 continue
196- if m .frame_id in self ._tf_to_base :
197- self ._channel_tf [m .channel ] = self ._tf_to_base [m .frame_id ]
198- else :
224+ try :
225+ self ._channel_sensor2ego [m .channel ] = _resolve_chain (
226+ self ._tf_tree , m .frame_id ,
227+ )
228+ except ValueError :
199229 logger .warning (
200230 "Channel '%s': frame_id='%s' not found in /tf_static. "
201231 "Points will be in sensor frame." ,
@@ -248,6 +278,34 @@ def has_channel(self, channel: str) -> bool:
248278 """
249279 return channel in self ._timestamp_ns
250280
281+ def get_sensor2ego (
282+ self ,
283+ frame_id : str ,
284+ target_frame : str = _DEFAULT_TARGET_FRAME ,
285+ ) -> np .ndarray :
286+ """Return the 4x4 homogeneous transform from *frame_id* to *target_frame*.
287+
288+ Resolves the ``/tf_static`` chain from the sensor frame to the target
289+ frame (usually ``base_link``). The returned matrix satisfies
290+ ``p_target = sensor2ego @ p_sensor``.
291+
292+ Args:
293+ frame_id: Source sensor frame (e.g. ``"hesai_top"``).
294+ target_frame: Destination frame. Defaults to ``"base_link"``.
295+
296+ Returns:
297+ 4x4 ``np.ndarray`` (float64) homogeneous transformation matrix.
298+
299+ Raises:
300+ ValueError: If ``/tf_static`` is unavailable or no chain exists.
301+ """
302+ if not self ._tf_tree :
303+ raise ValueError (
304+ "No /tf_static data available in the rosbag. "
305+ "Cannot compute sensor2ego transform."
306+ )
307+ return _resolve_chain (self ._tf_tree , frame_id , target_frame )
308+
251309 def get_pointcloud (
252310 self ,
253311 channel : str ,
@@ -259,6 +317,10 @@ def get_pointcloud(
259317 Automatically dispatches to the correct decoder based on the topic's
260318 message type (PointCloud2 or PandarScan).
261319
320+ When the channel has a ``frame_id`` in its ``TopicMapping`` and the
321+ corresponding ``/tf_static`` chain exists, decoded PandarScan points
322+ are automatically transformed to ``base_link``.
323+
262324 Args:
263325 channel: Sensor channel name.
264326 timestamp_us: Target timestamp in microseconds (T4 format).
@@ -312,11 +374,11 @@ def get_pointcloud(
312374 msg = self ._typestore .deserialize_cdr (rawdata , conn .msgtype )
313375 if conn .msgtype == _PANDARSCAN_MSGTYPE :
314376 pc = pandarscan_to_lidar (msg , self ._channel_to_sensor_type [channel ])
315- tf = self ._channel_tf .get (channel )
316- if tf is not None :
317- R , t = tf
318- xyz = pc . points [:3 , : ]
319- pc .points [:3 , :] = R @ xyz + t [:, np .newaxis ]
377+ sensor2ego = self ._channel_sensor2ego .get (channel )
378+ if sensor2ego is not None :
379+ R = sensor2ego [: 3 , : 3 ]
380+ t = sensor2ego [:3 , 3 ]
381+ pc .points [:3 , :] = R @ pc . points [: 3 , :] + t [:, np .newaxis ]
320382 return pc
321383 return pointcloud2_to_lidar (msg )
322384
0 commit comments