Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions t4_devkit/rosbag/pandar_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def __post_init__(self) -> None:
object.__setattr__(self, "cos_el", np.cos(el_rad))
object.__setattr__(self, "sin_el", np.sin(el_rad))
object.__setattr__(
self, "azimuth_offset_rad",
self,
"azimuth_offset_rad",
np.radians(np.array(self.azimuth_offset_deg, dtype=np.float32)),
)

Expand Down Expand Up @@ -137,11 +138,13 @@ def __post_init__(self) -> None:
# Model lookup by sensor type name.
HESAI_MODELS: dict[str, _HesaiModelConfig] = {
"XT32": _HesaiModelConfig(
name="XT32", elevation_deg=_XT32_ELEVATION_DEG,
name="XT32",
elevation_deg=_XT32_ELEVATION_DEG,
azimuth_offset_deg=_XT32_AZIMUTH_OFFSET_DEG,
),
"OT128": _HesaiModelConfig(
name="OT128", elevation_deg=_OT128_ELEVATION_DEG,
name="OT128",
elevation_deg=_OT128_ELEVATION_DEG,
azimuth_offset_deg=_OT128_AZIMUTH_OFFSET_DEG,
),
}
Expand Down
23 changes: 16 additions & 7 deletions t4_devkit/rosbag/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@

def _quat_to_matrix(x: float, y: float, z: float, w: float) -> np.ndarray:
"""Convert quaternion (x, y, z, w) to a 3x3 rotation matrix."""
return np.array([
[1 - 2 * (y * y + z * z), 2 * (x * y - w * z), 2 * (x * z + w * y)],
[2 * (x * y + w * z), 1 - 2 * (x * x + z * z), 2 * (y * z - w * x)],
[2 * (x * z - w * y), 2 * (y * z + w * x), 1 - 2 * (x * x + y * y)],
])
return np.array(
[
[1 - 2 * (y * y + z * z), 2 * (x * y - w * z), 2 * (x * z + w * y)],
[2 * (x * y + w * z), 1 - 2 * (x * x + z * z), 2 * (y * z - w * x)],
[2 * (x * z - w * y), 2 * (y * z + w * x), 1 - 2 * (x * x + y * y)],
]
)


def _make_transform(R: np.ndarray, t: np.ndarray) -> np.ndarray:
Expand Down Expand Up @@ -223,7 +225,8 @@ def __init__(
continue
try:
self._channel_sensor2ego[m.channel] = _resolve_chain(
self._tf_tree, m.frame_id,
self._tf_tree,
m.frame_id,
)
except ValueError:
logger.warning(
Expand Down Expand Up @@ -366,9 +369,15 @@ def get_pointcloud(
raise ValueError(f"No connections found for topic '{topic}' (channel '{channel}')")

with self._lock:
# Workaround for a rosbags MCAP off-by-one (storage_mcap.py:591):
# `start < x.message_end_time` excludes the chunk when start equals
# the chunk's last-message ts (which is inclusive), causing 1ns-window
# queries at chunk boundaries to return empty. Widening start by 1ns
# selects the correct chunk; the [start, stop) interval still uniquely
# identifies the target packet.
for conn, ts_ns, rawdata in self._reader.messages(
connections=conns_for_topic,
start=target_ns,
start=target_ns - 1,
stop=target_ns + 1,
):
msg = self._typestore.deserialize_cdr(rawdata, conn.msgtype)
Expand Down
26 changes: 26 additions & 0 deletions tests/rosbag/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,32 @@ def test_context_manager(self, bag_with_pointclouds: Path) -> None:
with Rosbag2Reader(str(bag_with_pointclouds)) as reader:
assert len(reader.channels) > 0

def test_get_pointcloud_start_widened(
self,
bag_with_pointclouds: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Regression: start is widened by 1ns to dodge rosbags MCAP chunk-filter off-by-one."""
mapping = [TopicMapping(channel="LIDAR_TOP", topic="/sensing/lidar/top/pointcloud")]
with Rosbag2Reader(str(bag_with_pointclouds), topic_mapping=mapping) as reader:
captured_starts: list[int] = []
original_messages = reader._reader.messages

def patched_messages(*args, **kwargs):
if "start" in kwargs:
captured_starts.append(kwargs["start"])
return original_messages(*args, **kwargs)

monkeypatch.setattr(reader._reader, "messages", patched_messages)

target_ts_us = 1_704_067_200_000_000
target_ts_ns = target_ts_us * 1_000
pc = reader.get_pointcloud("LIDAR_TOP", target_ts_us)

assert pc.points.shape == (4, 3)
assert len(captured_starts) == 1
assert captured_starts[0] == target_ts_ns - 1

def test_multiple_timestamps(self, bag_with_pointclouds: Path) -> None:
mapping = [TopicMapping(channel="LIDAR_TOP", topic="/sensing/lidar/top/pointcloud")]
with Rosbag2Reader(str(bag_with_pointclouds), topic_mapping=mapping) as reader:
Expand Down