diff --git a/README.md b/README.md index fdc6773..44c5efb 100644 --- a/README.md +++ b/README.md @@ -62,5 +62,5 @@ source .venv/bin/activate | | Image Segmentation | ✅ | | | Raw Image | ✅ | | | Raw PointCloud on Image | ✅ | -| Map | Vector Map | | +| Map | Vector Map | ✅ | | | Ego Position on Street View | ✅ | diff --git a/docs/assets/render_map.png b/docs/assets/render_map.png new file mode 100644 index 0000000..046db53 Binary files /dev/null and b/docs/assets/render_map.png differ diff --git a/docs/index.md b/docs/index.md index f50a573..5818990 100644 --- a/docs/index.md +++ b/docs/index.md @@ -15,11 +15,11 @@ | 3D | 3D Boxes | ✅ | | | PointCloud Segmentation | | | | Raw PointCloud | ✅ | -| | 3D Trajectories | | +| | 3D Trajectories | ✅ | | | TF Links | ✅ | | 2D | 2D Boxes | ✅ | | | Image Segmentation | ✅ | | | Raw Image | ✅ | | | Raw PointCloud on Image | ✅ | -| Map | Vector Map | | +| Map | Vector Map | ✅ | | | Ego Position on Street View | ✅ | diff --git a/docs/tutorials/render.md b/docs/tutorials/render.md index 5f935b6..98599a1 100644 --- a/docs/tutorials/render.md +++ b/docs/tutorials/render.md @@ -60,7 +60,9 @@ When you specify `save_dir`, viewer will not be spawned on your screen. ## Rendering with `RerunViewer` -If you want to visualize your components, such as boxes that your ML-model estimated, `RerunViewer` allows you to visualize these components. +### Rendering boxes + +If you want to visualize your components, such as boxes that your ML-model estimated, `RerunViewer` allows you to visualize these components. For details, please refer to the API references. ```python @@ -81,3 +83,14 @@ It allows you to render boxes by specifying elements of boxes directly. # Rendering 2D boxes >>> viewer.render_box2ds(seconds, rois, class_ids) ``` + +### Rendering lanelet map + +![Render Lanelet Map](../assets/render_map.png) + +You can also render lanelet map by specifying `lanelet_path`: + +```python +# Rendering lanelet map +>>> viewer.render_map(lanelet_path) +``` diff --git a/t4_devkit/helper/rendering.py b/t4_devkit/helper/rendering.py index e9b3bda..2787e59 100644 --- a/t4_devkit/helper/rendering.py +++ b/t4_devkit/helper/rendering.py @@ -3,6 +3,7 @@ import concurrent import concurrent.futures import os.path as osp +import warnings from concurrent.futures import Future from typing import TYPE_CHECKING, Sequence @@ -139,6 +140,8 @@ def render_scene( save_dir=save_dir, ) + self._render_map(viewer) + scene: Scene = self._t4.scene[0] first_sample: Sample = self._t4.get("sample", scene.first_sample_token) max_timestamp_us = first_sample.timestamp + sec2us(max_time_seconds) @@ -241,6 +244,8 @@ def render_instance( save_dir=save_dir, ) + self._render_map(viewer) + concurrent.futures.wait( self._render_lidar_and_ego( viewer=viewer, @@ -296,6 +301,8 @@ def render_pointcloud( app_id = f"pointcloud@{self._t4.dataset_id}" viewer = self._init_viewer(app_id, render_ann=False, save_dir=save_dir) + self._render_map(viewer) + # search first lidar sample data token first_lidar_token: str | None = None for sensor in self._t4.sensor: @@ -323,6 +330,14 @@ def render_pointcloud( ), ) + def _render_map(self, viewer: RerunViewer) -> None: + lanelet_path = osp.join(self._t4.map_dir, "lanelet2_map.osm") + if not osp.exists(lanelet_path): + warnings.warn(f"Lanelet map not found at {lanelet_path}") + return + + viewer.render_map(lanelet_path) + def _render_sensor_calibration(self, viewer: RerunViewer, sample_data_token: str) -> None: sample_data: SampleData = self._t4.get("sample_data", sample_data_token) calibration: CalibratedSensor = self._t4.get( diff --git a/t4_devkit/lanelet/__init__.py b/t4_devkit/lanelet/__init__.py new file mode 100644 index 0000000..592c9ac --- /dev/null +++ b/t4_devkit/lanelet/__init__.py @@ -0,0 +1,3 @@ +from __future__ import annotations + +from .parser import * # noqa diff --git a/t4_devkit/lanelet/parser.py b/t4_devkit/lanelet/parser.py new file mode 100644 index 0000000..7e20c6c --- /dev/null +++ b/t4_devkit/lanelet/parser.py @@ -0,0 +1,243 @@ +from __future__ import annotations + +import xml.etree.ElementTree as ET +from collections import defaultdict +from typing import Final + +from attrs import define, field + +from t4_devkit.typing import Vector3 + +__all__ = ["LaneletParser"] + + +@define +class Node: + """Represents an OSM node (point) with coordinates and attributes.""" + + id: str + lat: float + lon: float + local_x: float | None = None + local_y: float | None = None + ele: float | None = None + tags: dict[str, str] = field(factory=dict) + + +@define +class Way: + """Represents an OSM way (line/polyline) with node references and attributes.""" + + id: str + node_refs: list[str] = field(factory=list) + tags: dict[str, str] = field(factory=dict) + + +@define +class Relation: + """Represents an OSM relation with member references and attributes.""" + + id: str + members: list[Member] = field(factory=list) + tags: dict[str, str] = field(factory=dict) + + +@define +class Member: + """Represents an OSM relation member.""" + + type: str + ref: str + role: str + + +class LaneletParser: + """Parses an OSM XML file into a dictionary of nodes, ways, and relations.""" + + elevation_scale: float = 1.0 + default_elevation: float = 0.0 + + def __init__(self, filepath: str, *, verbose: bool = False): + """Initializes the parser with the given file path. + + Args: + filepath (str): The path to the OSM XML file to parse. + verbose (bool, optional): Whether to print basic statistics about the parsed OSM data. + """ + + tree = ET.parse(filepath) + root = tree.getroot() + + self._nodes = _parse_nodes(root) + self._ways = _parse_ways(root) + self._relations = _parse_relations(root) + + if verbose: + self._print_statistics() + + def _print_statistics(self) -> None: + """Print basic statistics about the parsed OSM data.""" + num_lines: Final[int] = 50 + + print("\n" + "=" * num_lines) + print("OSM MAP STATISTICS") + print("=" * num_lines) + print(f"Nodes: {len(self.nodes)}") + print(f"Ways: {len(self.ways)}") + print(f"Relations: {len(self.relations)}") + + # Analyze way types + way_types = defaultdict(int) + for way in self.ways.values(): + way_type = way.tags.get("type", "unknown") + subtype = way.tags.get("subtype", "") + key = f"{way_type}:{subtype}" if subtype else way_type + way_types[key] += 1 + + print("\nWay Types:") + for way_type, count in sorted(way_types.items()): + print(f" {way_type}: {count}") + + # Analyze relation types + relation_types = defaultdict(int) + for relation in self.relations.values(): + rel_type = relation.tags.get("type", "unknown") + subtype = relation.tags.get("subtype", "") + key = f"{rel_type}:{subtype}" if subtype else rel_type + relation_types[key] += 1 + + print("\nRelation Types:") + for rel_type, count in sorted(relation_types.items()): + print(f" {rel_type}: {count}") + + # Coordinate system info + local_coord_nodes = sum( + 1 + for node in self.nodes.values() + if node.local_x is not None and node.local_y is not None + ) + print(f"\nNodes with local coordinates: {local_coord_nodes}/{len(self.nodes)}") + print("=" * num_lines) + + @property + def nodes(self) -> dict[str, Node]: + return self._nodes + + @property + def ways(self) -> dict[str, Way]: + return self._ways + + @property + def relations(self) -> dict[str, Relation]: + return self._relations + + def coordinates(self, node: Node, *, as_geographic: bool = False) -> Vector3: + """Return coordinates of a node, preferring local coordinates if available. + + Args: + node (Node): The node to get coordinates for. + as_geographic (bool): Whether to return coordinates in geographic (lat, lon, elevation) format. + + Returns: + A Vector3 coordinate for the node. + """ + if node.local_x is not None and node.local_y is not None and not as_geographic: + x, y = node.local_x, node.local_y + else: + # Convert lat/lon to a simple projection (not accurate for large areas) + x, y = node.lat, node.lon + + z = node.ele * self.elevation_scale if node.ele is not None else self.default_elevation + + return Vector3(x, y, z) + + def way_coordinates(self, way: Way, *, as_geographic: bool = False) -> list[Vector3]: + """Return coordinates of a way, preferring local coordinates if available. + + Args: + way (Way): The way to get coordinates for. + as_geographic: Whether to return coordinates in geographic (lat, lon, elevation) format. + + Returns: + A list of Vector3 coordinates for the way. + """ + return [ + self.coordinates(self.nodes[node_ref], as_geographic=as_geographic) + for node_ref in way.node_refs + if node_ref in self.nodes + ] + + +def _parse_nodes(root: ET.Element) -> dict[str, Node]: + output: dict[str, Node] = {} + for node_elem in root.findall("node"): + node_id = node_elem.get("id") + node_lat = float(node_elem.get("lat")) + node_lon = float(node_elem.get("lon")) + + tags: dict[str, str] = {} + local_x = None + local_y = None + ele = None + for tag_elem in node_elem.findall("tag"): + key = tag_elem.get("k") + value = tag_elem.get("v") + tags[key] = value + + # extract local coordinates if available + if key == "local_x": + local_x = float(value) + elif key == "local_y": + local_y = float(value) + elif key == "ele": + ele = float(value) + + output[node_id] = Node( + id=node_id, + lat=node_lat, + lon=node_lon, + tags=tags, + local_x=local_x, + local_y=local_y, + ele=ele, + ) + + return output + + +def _parse_ways(root: ET.Element) -> dict[str, Way]: + output: dict[str, Way] = {} + for way_elem in root.findall("way"): + way_id = way_elem.get("id") + node_refs = [nd.get("ref") for nd in way_elem.findall("nd")] + + tags: dict[str, str] = { + tag_elem.get("k"): tag_elem.get("v") for tag_elem in way_elem.findall("tag") + } + + output[way_id] = Way(id=way_id, node_refs=node_refs, tags=tags) + + return output + + +def _parse_relations(root: ET.Element) -> dict[str, Relation]: + output: dict[str, Relation] = {} + for relation_elem in root.findall("relation"): + relation_id = relation_elem.get("id") + + members = [ + Member( + type=member_elem.get("type"), + ref=member_elem.get("ref"), + role=member_elem.get("role", ""), + ) + for member_elem in relation_elem.findall("member") + ] + + tags: dict[str, str] = { + tag_elem.get("k"): tag_elem.get("v") for tag_elem in relation_elem.findall("tag") + } + + output[relation_id] = Relation(id=relation_id, members=members, tags=tags) + + return output diff --git a/t4_devkit/tier4.py b/t4_devkit/tier4.py index 698d66b..dcb0c70 100644 --- a/t4_devkit/tier4.py +++ b/t4_devkit/tier4.py @@ -204,6 +204,16 @@ def version(self) -> str | None: """Return the dataset version, or None if it is failed to lookup.""" return self._metadata.version + @property + def annotation_dir(self) -> str: + """Return the path to annotation directory.""" + return osp.join(self.data_root, "annotation") + + @property + def map_dir(self) -> str: + """Return the path to map directory.""" + return osp.join(self.data_root, "map") + def __load_table__(self, schema: SchemaName) -> list[SchemaTable]: """Load schema table from a json file. diff --git a/t4_devkit/viewer/lanelet.py b/t4_devkit/viewer/lanelet.py new file mode 100644 index 0000000..ea2525f --- /dev/null +++ b/t4_devkit/viewer/lanelet.py @@ -0,0 +1,204 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numpy as np +import rerun as rr + +if TYPE_CHECKING: + from t4_devkit.lanelet import LaneletParser + +LANELET_COLORS = { + # Road markings + "solid_line": [1.0, 1.0, 1.0, 0.9], # White + "dashed_line": [1.0, 1.0, 0.0, 0.8], # Yellow + "virtual_line": [0.7, 0.7, 0.7, 0.6], # Gray + # Lanelets + "lanelet_road": [0.3, 0.6, 1.0, 0.4], # Light blue + "lanelet_crosswalk": [1.0, 1.0, 0.0, 0.5], # Yellow + "lanelet_shoulder": [0.8, 0.6, 1.0, 0.4], # Purple + # Infrastructure + "road_border": [0.2, 0.2, 0.2, 0.9], # Dark gray + "curbstone": [0.5, 0.5, 0.5, 0.9], # Gray + "traffic_sign": [1.0, 0.2, 0.2, 0.9], # Red + "traffic_light": [1.0, 0.6, 0.0, 0.9], # Orange + "vegetation": [0.2, 0.8, 0.2, 0.6], # Green + "crosswalk": [1.0, 1.0, 0.0, 0.7], # Yellow + # Elevation visualization + "elevation_low": [0.0, 0.0, 1.0, 0.8], # Blue + "elevation_high": [1.0, 0.0, 0.0, 0.8], # Red +} + + +def render_lanelets(parser: LaneletParser, root_entity: str) -> None: + """Render lanelet polygons based on relations. + + Args: + parser (LaneletParser): The LaneletParser instance. + root_entity (str): The root entity to render. + """ + for relation in parser.relations.values(): + if relation.tags.get("type") != "lanelet": + continue + + left_bound = None + right_bound = None + for member in relation.members: + if member.type == "way" and member.role == "left": + left_bound = parser.ways.get(member.ref) + elif member.type == "way" and member.role == "right": + right_bound = parser.ways.get(member.ref) + + if left_bound and right_bound: + left_coords = parser.way_coordinates(left_bound) + right_coords = parser.way_coordinates(right_bound) + + vertices = np.array(left_coords + right_coords[::-1]) + triangles = np.array([(0, i, i + 1) for i in range(1, len(vertices) - 1)]) + subtype = relation.tags.get("subtype", "road") + if subtype == "road": + color = LANELET_COLORS["lanelet_road"] + element_type = "road" + elif subtype == "crosswalk": + color = LANELET_COLORS["lanelet_crosswalk"] + element_type = "crosswalk" + else: + color = LANELET_COLORS["lanelet_shoulder"] + element_type = "shoulder" + + entity_path = f"{root_entity}/lanelet/{element_type}/{relation.id}" + rr.log( + entity_path, + rr.Mesh3D( + vertex_positions=vertices, + triangle_indices=triangles, + vertex_colors=[color] * len(vertices), + ), + static=True, + ) + + +def render_traffic_elements(parser: LaneletParser, root_entity: str) -> None: + """Render traffic signs, lights, and other regulatory elements. + + Args: + parser (LaneletParser): The lanelet parser. + root_entity (str): The root entity to render. + """ + for relation in parser.relations.values(): + if relation.tags.get("type") != "regulatory_element": + continue + + subtype = relation.tags.get("subtype", "") + for member in relation.members: + if member.type == "way" and member.role in ["ref_line", "refers"]: + way = parser.ways.get(member.ref) + if not way: + continue + coords = parser.way_coordinates(way) + if "sign" in subtype: + color = LANELET_COLORS["traffic_sign"] + size = [0.8, 0.8, 0.8] + element_type = "sign" + elif "light" in subtype: + color = LANELET_COLORS["traffic_light"] + size = [0.6, 1.2, 0.6] + element_type = "light" + else: + color = [0.8, 0.0, 0.8, 0.9] # Purple + size = [0.5, 0.5, 0.5] + element_type = "other" + + for i, center in enumerate(coords): + entity_path = f"{root_entity}/traffic_elements/{element_type}/{relation.id}_{i}" + + rr.log( + entity_path, + rr.Boxes3D(sizes=[size], centers=[center], colors=[color]), + static=True, + ) + + +def render_ways(parser: LaneletParser, root_entity: str) -> None: + """Render lanelet ways. + + Args: + parser (LaneletParser): The lanelet parser. + root_entity (str): The root entity to render. + """ + for way in parser.ways.values(): + way_type = way.tags.get("type", "") + subtype = way.tags.get("subtype", "") + + if not ( + "line_thin" in way_type + or "line_thick" in way_type + or "curbstone" in way_type + or "virtual" == way_type + or "road_border" == subtype + ): + continue + + coords = parser.way_coordinates(way) + if len(coords) < 2: + continue + + if "solid" in subtype: + color = LANELET_COLORS["solid_line"] + element_type = "road_marking/solid" + elif "dashed" in subtype: + color = LANELET_COLORS["dashed_line"] + element_type = "road_marking/dashed" + elif "virtual" == way_type: + color = LANELET_COLORS["virtual_line"] + element_type = "road_marking/virtual" + elif "curbstone" == way_type: + color = LANELET_COLORS["curbstone"] + element_type = "road_border/curbstone" + elif "road_border" == subtype: + color = LANELET_COLORS["road_border"] + element_type = "road_border/road_border" + else: + color = LANELET_COLORS["solid_line"] + element_type = "road_marking/other" + + entity_path = f"{root_entity}/{element_type}/{way.id}" + + rr.log( + entity_path, + rr.LineStrips3D( + strips=[np.array(coords)], + colors=[color], + radii=[0.1 if "thin" in way_type else 0.2], + ), + static=True, + ) + + +def render_geographic_borders(parser: LaneletParser, root_entity: str) -> None: + """Render road borders on geographical space. + + Args: + parser (LaneletParser): The LaneletParser object. + root_entity (str): The root entity path. + """ + for way in parser.ways.values(): + way_type = way.tags.get("type", "") + subtype = way.tags.get("subtype", "") + + if not ("curbstone" == way_type or "road_border" == subtype): + continue + + coords = parser.way_coordinates(way, as_geographic=True) + lat_lon = np.array([c[:2] for c in coords]) + if len(coords) < 2: + continue + + color = LANELET_COLORS["road_border"] + entity_path = f"{root_entity}/{way.id}" + + rr.log( + entity_path, + rr.GeoLineStrings(lat_lon=[lat_lon], colors=[color], radii=[2.0]), + static=True, + ) diff --git a/t4_devkit/viewer/viewer.py b/t4_devkit/viewer/viewer.py index fc6443b..ec0e6ae 100644 --- a/t4_devkit/viewer/viewer.py +++ b/t4_devkit/viewer/viewer.py @@ -10,11 +10,18 @@ from typing_extensions import Self from t4_devkit.common.timestamp import us2sec +from t4_devkit.lanelet import LaneletParser from t4_devkit.schema import SensorModality from t4_devkit.typing import Quaternion, Roi, Vector3 from .color import distance_color from .geography import calculate_geodetic_point +from .lanelet import ( + render_geographic_borders, + render_lanelets, + render_traffic_elements, + render_ways, +) from .record import BatchBox2D, BatchBox3D, BatchSegmentation2D if TYPE_CHECKING: @@ -639,3 +646,18 @@ def _render_calibration_without_schema( rr.Pinhole(image_from_camera=camera_intrinsic), static=True, ) + + def render_map(self, filepath: str) -> None: + """Render vector map. + + Args: + filepath (str): Path to OSM file. + """ + parser = LaneletParser(filepath, verbose=False) + + root_entity = format_entity(self.map_entity, "vector_map") + render_lanelets(parser, root_entity) + render_traffic_elements(parser, root_entity) + render_ways(parser, root_entity) + + render_geographic_borders(parser, f"{self.geocoordinate_entity}/vector_map") diff --git a/tests/lanelet/test_parser.py b/tests/lanelet/test_parser.py new file mode 100644 index 0000000..20eceee --- /dev/null +++ b/tests/lanelet/test_parser.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +from t4_devkit.lanelet import LaneletParser + + +def test_lanelet_parser() -> None: + """Test `LaneletParser`.""" + lanelet_path = "tests/sample/map/lanelet2_map.osm" + _ = LaneletParser(lanelet_path) diff --git a/tests/sample/map/lanelet2_map.osm b/tests/sample/map/lanelet2_map.osm new file mode 100644 index 0000000..6f7149a --- /dev/null +++ b/tests/sample/map/lanelet2_map.osm @@ -0,0 +1,195 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/viewer/test_viewer.py b/tests/viewer/test_viewer.py index aca098f..0fd0f72 100644 --- a/tests/viewer/test_viewer.py +++ b/tests/viewer/test_viewer.py @@ -133,3 +133,9 @@ def test_render_calibration(dummy_viewer, dummy_camera_calibration) -> None: camera_distortion=[0, 0, 0, 0, 0], ) dummy_viewer.render_calibration(sensor, calibration) + + +def test_render_map(dummy_viewer) -> None: + """Test rendering map with `RerunViewer`.""" + lanelet_path = "tests/sample/map/lanelet2_map.osm" + dummy_viewer.render_map(lanelet_path)