11from __future__ import annotations
22
3+ from t4_devkit .common .io import load_json
34import struct
45from abc import abstractmethod
56from typing import TYPE_CHECKING , ClassVar , TypeVar
1819 "RadarPointCloud" ,
1920 "SegmentationPointCloud" ,
2021 "PointCloudLike" ,
22+ "PointCloudMetainfo" ,
23+ "PointcloudSourceInfo" ,
24+ "Stamp" ,
2125]
2226
2327
28+ @define
29+ class Stamp :
30+ """A dataclass to represent timestamp.
31+
32+ Attributes:
33+ sec (int): Seconds.
34+ nanosec (int): Nanoseconds.
35+ """
36+
37+ sec : int
38+ nanosec : int
39+
40+
41+ @define
42+ class PointcloudSourceInfo :
43+ """A dataclass to represent pointcloud source information.
44+
45+ Attributes:
46+ id (str): source identifier.
47+ idx_begin (int): Begin index of points for the source in the concatenated pointcloud structure.
48+ length (int): Length of points for the source in the concatenated pointcloud structure.
49+ stamp (Stamp): Timestamp.
50+ """
51+
52+ id : str
53+ idx_begin : int
54+ length : int
55+ stamp : Stamp = field (converter = lambda x : Stamp (** x ) if isinstance (x , dict ) else x )
56+
57+
58+ @define
59+ class PointCloudMetainfo :
60+ """A dataclass to represent pointcloud metadata.
61+
62+ Attributes:
63+ stamp (Stamp): Timestamp.
64+ sources (list[PointcloudSourceInfo]): List of source information.
65+ """
66+
67+ stamp : Stamp = field (converter = lambda x : Stamp (** x ) if isinstance (x , dict ) else x )
68+ sources : list [PointcloudSourceInfo ] = field (factory = list )
69+
70+ @classmethod
71+ def from_file (cls , filepath : str ) -> Self :
72+ """Create an instance from a JSON file.
73+
74+ Args:
75+ filepath (str): Path to the JSON file containing metadata.
76+
77+ Returns:
78+ Self: PointCloudMetainfo instance.
79+ """
80+ data = load_json (filepath )
81+ stamp = Stamp (** data ["stamp" ])
82+ sources = []
83+ for source_data in data .get ("sources" , []):
84+ sources .append (PointcloudSourceInfo (** source_data ))
85+ return cls (stamp = stamp , sources = sources )
86+
87+ @property
88+ def source_ids (self ) -> list [str ]:
89+ """Get the list of source sensor IDs.
90+
91+ Returns:
92+ list[str]: List of sensor names.
93+ """
94+ return [source .id for source in self .sources ]
95+
96+
2497@define
2598class PointCloud :
2699 """Abstract base dataclass for pointcloud data."""
27100
28101 points : NDArrayFloat = field (converter = np .array )
102+ metainfo : PointCloudMetainfo | None = field (default = None )
29103
30104 @points .validator
31105 def _check_dims (self , attribute , value ) -> None :
@@ -34,6 +108,68 @@ def _check_dims(self, attribute, value) -> None:
34108 f"Expected point dimension is { self .num_dims ()} , but got { value .shape [0 ]} "
35109 )
36110
111+ @metainfo .validator
112+ def _validate_metainfo (self , attribute , value ) -> None :
113+ """Validate that sources in metainfo form non-overlapping parts covering all points.
114+
115+ This validator ensures backward compatibility by allowing None metainfo.
116+ """
117+ if value is None :
118+ # Backward compatibility: metainfo is optional
119+ return
120+
121+ if not value .sources :
122+ # No sources to validate
123+ return
124+
125+ num_points = self .num_points ()
126+
127+ # Collect all intervals defined by sources
128+ intervals = []
129+ for source_info in value .sources :
130+ source_id = source_info .id
131+ idx_begin = source_info .idx_begin
132+ length = source_info .length
133+ idx_end = idx_begin + length
134+
135+ # Check bounds
136+ if idx_begin < 0 :
137+ raise ValueError (f"Source '{ source_id } ' has negative idx_begin: { idx_begin } " )
138+ if length < 0 :
139+ raise ValueError (f"Source '{ source_id } ' has negative length: { length } " )
140+ if idx_end > num_points :
141+ raise ValueError (
142+ f"Source '{ source_id } ' exceeds point cloud size: "
143+ f"idx_begin={ idx_begin } , length={ length } , but num_points={ num_points } "
144+ )
145+
146+ intervals .append ((idx_begin , idx_end , source_id ))
147+
148+ # Sort intervals by start index
149+ intervals .sort (key = lambda x : x [0 ])
150+
151+ # Check for non-overlapping and complete coverage
152+ expected_start = 0
153+ for idx_begin , idx_end , source_id in intervals :
154+ if idx_begin != expected_start :
155+ if idx_begin > expected_start :
156+ raise ValueError (
157+ f"Gap detected: points [{ expected_start } :{ idx_begin } ) are not covered by any source"
158+ )
159+ else :
160+ raise ValueError (
161+ f"Overlap detected: source '{ source_id } ' starts at { idx_begin } , "
162+ f"but previous source ends at { expected_start } "
163+ )
164+ expected_start = idx_end
165+
166+ # Check if all points are covered
167+ if expected_start != num_points :
168+ raise ValueError (
169+ f"Incomplete coverage: sources cover up to index { expected_start } , "
170+ f"but num_points={ num_points } "
171+ )
172+
37173 @staticmethod
38174 @abstractmethod
39175 def num_dims () -> int :
@@ -91,12 +227,19 @@ def num_dims() -> int:
91227 return 4
92228
93229 @classmethod
94- def from_file (cls , filepath : str ) -> Self :
230+ def from_file (cls , filepath : str , metainfo_filepath : str | None = None ) -> Self :
95231 assert filepath .endswith (".bin" ), f"Unexpected filetype: { filepath } "
96232
97233 scan = np .fromfile (filepath , dtype = np .float32 )
98234 points = scan .reshape ((- 1 , 5 ))[:, : cls .num_dims ()]
99- return cls (points .T )
235+
236+ metainfo = (
237+ PointCloudMetainfo .from_file (metainfo_filepath )
238+ if metainfo_filepath is not None
239+ else None
240+ )
241+
242+ return cls (points .T , metainfo = metainfo )
100243
101244
102245@define
@@ -123,6 +266,7 @@ def from_file(
123266 invalid_states : list [int ] | None = None ,
124267 dynprop_states : list [int ] | None = None ,
125268 ambig_states : list [int ] | None = None ,
269+ metainfo_filepath : str | None = None ,
126270 ) -> Self :
127271 assert filepath .endswith (".pcd" ), f"Unexpected filetype: { filepath } "
128272
@@ -177,7 +321,12 @@ def from_file(
177321 # A NaN in the first point indicates an empty pointcloud.
178322 point = np .array (points [0 ])
179323 if np .any (np .isnan (point )):
180- return cls (np .zeros ((feature_count , 0 )))
324+ metainfo = (
325+ PointCloudMetainfo .from_file (metainfo_filepath )
326+ if metainfo_filepath is not None
327+ else None
328+ )
329+ return cls (np .zeros ((feature_count , 0 )), metainfo = metainfo )
181330
182331 # Convert to numpy matrix.
183332 points = np .array (points ).transpose ()
@@ -199,7 +348,12 @@ def from_file(
199348 valid = [p in ambig_states for p in points [11 , :]]
200349 points = points [:, valid ]
201350
202- return cls (points )
351+ metainfo = (
352+ PointCloudMetainfo .from_file (metainfo_filepath )
353+ if metainfo_filepath is not None
354+ else None
355+ )
356+ return cls (points , metainfo = metainfo )
203357
204358
205359@define
@@ -211,18 +365,25 @@ class SegmentationPointCloud(PointCloud):
211365 labels (NDArrayU8): Label matrix.
212366 """
213367
214- labels : NDArrayU8 = field (converter = lambda x : np .array (x , dtype = np .uint8 ))
368+ labels : NDArrayU8 = field (converter = lambda x : np .array (x , dtype = np .uint8 ), kw_only = True )
215369
216370 @staticmethod
217371 def num_dims () -> int :
218372 return 4
219373
220374 @classmethod
221- def from_file (cls , point_filepath : str , label_filepath : str ) -> Self :
375+ def from_file (
376+ cls , point_filepath : str , label_filepath : str , metainfo_filepath : str | None = None
377+ ) -> Self :
222378 scan = np .fromfile (point_filepath , dtype = np .float32 )
223379 points = scan .reshape ((- 1 , 5 ))[:, : cls .num_dims ()]
224380 labels = np .fromfile (label_filepath , dtype = np .uint8 )
225- return cls (points .T , labels )
381+ metainfo = (
382+ PointCloudMetainfo .from_file (metainfo_filepath )
383+ if metainfo_filepath is not None
384+ else None
385+ )
386+ return cls (points .T , labels = labels , metainfo = metainfo )
226387
227388
228389PointCloudLike = TypeVar ("PointCloudLike" , bound = PointCloud )
0 commit comments