1+ """
2+ robust_least_squares_detector.py
3+
4+ Author: Shreyansh Shethia
5+ """
6+
7+ from asyncio .log import logger
8+ import sys
9+ import copy
10+ import itertools
11+ import numpy as np
12+ from collections import deque
13+ from pathlib import Path
14+ from math import pi , sin , cos
15+ from scipy .optimize import least_squares
16+ from math import sin , cos , atan2
17+ import matplotlib .pyplot as plt
18+ abs_dir_path = str (Path (__file__ ).absolute ().parent )
19+ sys .path .append (abs_dir_path + "/../../search/kd_tree" )
20+
21+ from l_shape_fitting_detector import LShapeFittingDetector
22+ from kd_tree import KdTree
23+ from rectangle import Rectangle
24+ import numpy as np
25+ from scipy .optimize import least_squares
26+ from math import sin , cos , atan2
27+ from scipy .spatial .distance import pdist , squareform
28+
29+ class FittingDataTracker :
30+ """
31+ Telemetry and performance tracking class for OBB fitting analysis.
32+
33+ This class captures frame-by-frame metrics from both the geometric
34+ baseline and the optimized refinement stage. It facilitates the
35+ calculation of Root Mean Square Error (RMSE) and tracks the
36+ evolution of spatial parameters (cx, cy, theta) over time.
37+ """
38+ def __init__ (self ):
39+ """
40+ Initialize the tracker with Ground Truth placeholders and data storage.
41+ """
42+ self .gt_cx = 20.0 # replace with
43+ self .gt_cy = 0.0 # Example ground truth center y
44+ self .data = {
45+ "time" : [],
46+ "rmse_geo" : [],
47+ "rmse_opt" : [],
48+ "theta_geo" : [],
49+ "theta_opt" : [],
50+ "cx_geo" : [],
51+ "cy_geo" : [],
52+ "cx_opt" : [],
53+ "cy_opt" : []
54+ }
55+
56+ def log (self , t , r_geo , r_opt , p_geo , p_opt ):
57+ """
58+ Record performance metrics for the current simulation frame.
59+
60+ Calculates the instantaneous RMSE for both methods and logs the
61+ predicted pose and dimensions.
62+
63+ Args:
64+ t (float/int): Current frame index or simulation time.
65+ r_geo (numpy.ndarray): Residual vector from the geometric fit.
66+ r_opt (numpy.ndarray): Residual vector from the optimized fit.
67+ p_geo (list/tuple): Pose parameters from geometric fitting [cx, cy, theta, ...].
68+ p_opt (list/tuple): Pose parameters from optimized fitting [cx, cy, theta, ...].
69+ """
70+ self .data ["time" ].append (t )
71+ self .data ["rmse_geo" ].append (np .sqrt (np .mean (r_geo ** 2 )))
72+ self .data ["rmse_opt" ].append (np .sqrt (np .mean (r_opt ** 2 )))
73+ self .data ["theta_geo" ].append (np .degrees (p_geo [2 ]))
74+ self .data ["theta_opt" ].append (np .degrees (p_opt [2 ]))
75+ self .data ["cx_geo" ].append (p_geo [0 ])
76+ self .data ["cy_geo" ].append (p_geo [1 ])
77+ self .data ["cx_opt" ].append (p_opt [0 ])
78+ self .data ["cy_opt" ].append (p_opt [1 ])
79+
80+ def plot (self ):
81+ """
82+ Generate a multi-panel performance report using Matplotlib.
83+
84+ Produces:
85+ 1. An RMSE comparison plot to visualize fitting precision.
86+ 2. A two-panel plot comparing predicted Center-X and Center-Y
87+ against baseline geometric results.
88+ """
89+ if not self .data ["time" ]:
90+ print ("No data collected to plot." )
91+ return
92+
93+ plt .figure (figsize = (10 , 5 ))
94+ plt .plot (self .data ["time" ], self .data ["rmse_geo" ], 'b--' , label = 'Geometric RMSE' )
95+ plt .plot (self .data ["time" ], self .data ["rmse_opt" ], 'r-' , label = 'Optimized RMSE' )
96+ plt .ylabel ("RMSE [m]" ); plt .legend (); plt .grid (True )
97+
98+ # Create a 2x1 plot specifically for Center Accuracy
99+ fig , (ax1 , ax2 ) = plt .subplots (2 , 1 , figsize = (10 , 10 ))
100+
101+ # Plot 1: CX Accuracy
102+ ax1 .plot (self .data ["time" ], self .data ["cx_geo" ], 'b--' , label = 'Geometric CX' , alpha = 0.5 )
103+ ax1 .plot (self .data ["time" ], self .data ["cx_opt" ], 'r-' , label = 'Optimized CX' )
104+ #ax1.axhline(y=self.gt_cx, color='g', linestyle='-', linewidth=2, label='Ground Truth CX')
105+ ax1 .set_ylabel ("X Position [m]" )
106+ ax1 .set_title ("Center X: Ground Truth vs. Predictions" )
107+ ax1 .legend (); ax1 .grid (True )
108+
109+ # Plot 2: CY Accuracy
110+ ax2 .plot (self .data ["time" ], self .data ["cy_geo" ], 'b--' , label = 'Geometric CY' , alpha = 0.5 )
111+ ax2 .plot (self .data ["time" ], self .data ["cy_opt" ], 'r-' , label = 'Optimized CY' )
112+ #ax2.axhline(y=self.gt_cy, color='g', linestyle='-', linewidth=2, label='Ground Truth CY')
113+ ax2 .set_ylabel ("Y Position [m]" )
114+ ax2 .set_xlabel ("Frames" )
115+ ax2 .set_title ("Center Y: Ground Truth vs. Predictions" )
116+ ax2 .legend (); ax2 .grid (True )
117+
118+ plt .tight_layout ()
119+ plt .show ()
120+
121+ # Create the specific instance
122+ tracker = FittingDataTracker ()
123+
124+ class OptimizedLShapeDetector (LShapeFittingDetector ):
125+ def __init__ (self , min_rng_th_m = 3.0 , rng_th_rate = 0.1 , change_angle_deg = 1.0 , lossfunc = 'huber' , show_logger_debug = True ):
126+ """
127+ Initialize the Robust L-Shape detector with Least-Squares optimization.
128+
129+ Args:
130+ min_rng_th_m (float): Minimum range threshold for clustering.
131+ rng_th_rate (float): Range threshold rate.
132+ change_angle_deg (float): Step size for the geometric orientation sweep.
133+ lossfunc (str): Robust M-estimator to use ('huber', 'cauchy', etc.).
134+ show_logger_debug (bool): Flag to print optimization results to console.
135+ """
136+ super ().__init__ (min_rng_th_m , rng_th_rate , change_angle_deg )
137+ self .f_scale = 0.05 # Tight tolerance for the final polish
138+ self .prev_rects = {}
139+ self .lossfunc = lossfunc # Can be Cauchy, Huber, soft_l1, linear
140+ self .show_logger_debug = show_logger_debug
141+
142+ def _calculate_residuals (self , params , points , weights ):
143+ """
144+ Calculate the Signed Distance Field (SDF) residuals for the optimizer.
145+
146+ This function transforms points into the local frame of the candidate box
147+ and calculates the distance of each point to the nearest rectangle edge.
148+
149+ Args:
150+ params (list): [cx, cy, theta, w, l]
151+ points (numpy.ndarray): (2, N) array of LiDAR points in world frame.
152+ weights (numpy.ndarray): (N,) array of weights for each point.
153+
154+ Returns:
155+ numpy.ndarray: Flattened residual vector scaled by weights.
156+ """
157+ cx , cy , theta , w , l = params
158+ cos_t , sin_t = np .cos (theta ), np .sin (theta )
159+
160+ # Transform points to box-local frame
161+ dx = cos_t * (points [0 ,:] - cx ) + sin_t * (points [1 ,:] - cy )
162+ dy = - sin_t * (points [0 ,:] - cx ) + cos_t * (points [1 ,:] - cy )
163+
164+ # Distance to box edges
165+ err_x = np .abs (dx ) - (w / 2.0 )
166+ err_y = np .abs (dy ) - (l / 2.0 )
167+
168+ # Signed Distance Field (SDF) logic
169+ res = np .sqrt (np .maximum (err_x , 0 )** 2 + np .maximum (err_y , 0 )** 2 ) + \
170+ np .minimum (np .maximum (err_x , err_y ), 0 )
171+
172+ return np .concatenate ([res * np .sqrt (weights )])
173+
174+ def _get_initial_guess (self , points_array ):
175+ """
176+ Generate a high-quality initial guess using a hybrid geometric approach.
177+
178+ 1. Uses Yano's variance criterion to find the global orientation.
179+ 2. Uses the midpoint of the farthest-point pair for the spatial center.
180+ 3. Projects points onto the chosen axes to estimate initial dimensions.
181+
182+ Args:
183+ points_array (numpy.ndarray): (2, N) array of LiDAR points.
184+
185+ Returns:
186+ list: [cx, cy, theta_init, w_init, l_init] as an initial guess for the solver.
187+ """
188+ # points_array shape is (2, N)
189+ points_t = points_array .T # (N, 2)
190+
191+ # 1. Find Best Angle via Variance Criterion -- Global Heading Search
192+ min_cost_angle = (- float ("inf" ), None )
193+ for angle_rad in np .arange (0 , np .pi / 2 - self .CHANGE_ANGLE_RAD , self .CHANGE_ANGLE_RAD ):
194+ rotated_points = self ._rotate_points (points_array , angle_rad )
195+ cost = self ._calculate_variance_criterion (rotated_points )
196+ if min_cost_angle [0 ] < cost :
197+ min_cost_angle = (cost , angle_rad )
198+
199+ theta_init = min_cost_angle [1 ]
200+
201+ # 2. Find the two farthest points (the diameter of the point cloud)
202+ # and estimate center (cx, cy)
203+
204+ if points_t .shape [0 ] > 2 :
205+ # Finding the absolute farthest pair
206+ dists = pdist (points_t )
207+ dist_matrix = squareform (dists )
208+ i , j = np .unravel_index (np .argmax (dist_matrix ), dist_matrix .shape )
209+ p1 , p2 = points_t [i ], points_t [j ]
210+
211+ # Use the midpoint of these two as the center guess
212+ cx , cy = (p1 + p2 ) / 2.0
213+ else :
214+ cx , cy = np .mean (points_array , axis = 1 )
215+
216+ # 3. Box Dimensions (w, l) Estimation
217+ cos_t , sin_t = np .cos (theta_init ), np .sin (theta_init )
218+
219+ # Local coordinates relative to the chosen center and angle
220+ dx = cos_t * (points_array [0 ,:] - cx ) + sin_t * (points_array [1 ,:] - cy )
221+ dy = - sin_t * (points_array [0 ,:] - cx ) + cos_t * (points_array [1 ,:] - cy )
222+
223+ # Now w and l are guaranteed to be aligned with the best angle
224+ w = np .max (dx ) - np .min (dx )
225+ l = np .max (dy ) - np .min (dy )
226+
227+ return [cx , cy , theta_init , max (0.5 , w ), max (0.5 , l )] # Ensure minimum size to avoid degenerate boxes
228+
229+
230+ def _calculate_rectangle (self , points_array ):
231+ """
232+ Execute the full fitting pipeline: Initialization -> Optimization -> Logging.
233+
234+ This is the main entry point for the detector, providing a polished OBB
235+ by refining the geometric guess with a non-linear Huber-weighted solver.
236+
237+ Args:
238+ points_array (numpy.ndarray): (2, N) array of clustered LiDAR points.
239+
240+ Returns:
241+ Rectangle: A specialized rectangle object containing the refined OBB parameters.
242+ """
243+ self .prev_rects = {}
244+
245+ # --- STAGE 1: Get Initial Guess from L-Shape Fitting (Your Farthest-Point Logic) ---
246+ init_guess = self ._get_initial_guess (points_array )
247+
248+ # --- STAGE 2: Domain for Optimization ---
249+ theta_window = np .radians (2.0 ) # Small window is fine now because theta is good
250+ lower = np .array ([- np .inf , - np .inf , init_guess [2 ] - theta_window , init_guess [3 ]* 0.9 , init_guess [4 ]* 0.9 ])
251+ upper = np .array ([np .inf , np .inf , init_guess [2 ] + theta_window , init_guess [3 ]* 1.1 , init_guess [4 ]* 1.1 ])
252+
253+ # Simple uniform weights
254+ # TODO: Implement a New method for weights based on point distribution
255+ # This is because points closer to Lidar will have less noise and more denser distribution
256+ weights = np .ones (points_array .shape [1 ])
257+
258+ # Calculate initial cost
259+ res_before = self ._calculate_residuals (init_guess , points_array , weights )
260+ cost_before = 0.5 * np .sum (res_before ** 2 ) # Least squares cost is 0.5 * sum(r^2)
261+
262+ try :
263+ res = least_squares (
264+ self ._calculate_residuals , init_guess ,
265+ args = (points_array , weights ),
266+ bounds = (lower , upper ),
267+ loss = self .lossfunc , f_scale = .02
268+ )
269+ cx , cy , theta , w , l = res .x
270+ self .prev_rects = res .x
271+
272+ except :
273+ # Fallback to Initial Guess if optimization fails
274+ cx , cy , theta , w , l = init_guess
275+ self .prev_rects = init_guess
276+
277+ # --- STAGE 3: After Optimization: Logging for Analysis ---
278+ if 'res' in locals ():
279+ # Calculate residuals after optimization
280+
281+ final_params = res .x
282+ res_after = self ._calculate_residuals (final_params , points_array , weights )
283+ cost_after = 0.5 * np .sum (res_after ** 2 )
284+ improvement = (cost_before - cost_after ) / (cost_before + 1e-6 ) * 100
285+ theta_diff = np .rad2deg (final_params [2 ] - init_guess [2 ])
286+ tracker .log (len (tracker .data ["time" ]), res_before , res_after , init_guess , res .x )
287+
288+ # --- PRINT REPORT ---
289+ if self .show_logger_debug :
290+ print ("-" * 30 )
291+ print (f"Optimization Report (Status: { res .message } )" )
292+ print (f" Cost: { cost_before :.4f} -> { cost_after :.4f} ({ improvement :.1f} % improvement)" )
293+ print (f" Theta: { np .rad2deg (init_guess [2 ]):.2f} ° -> { np .rad2deg (final_params [2 ]):.2f} ° (Δ: { theta_diff :.2f} °)" )
294+ print ("-" * 30 )
295+
296+ else :
297+ print ("Optimization failed; using Initial Guess." )
298+
299+ # --- STAGE 4: Create Final Rectangle ---
300+ cos_t , sin_t = cos (theta ), sin (theta )
301+ c1_mid = cx * cos_t + cy * sin_t
302+ c2_mid = - cx * sin_t + cy * cos_t
303+
304+ return Rectangle (a = [cos_t , - sin_t , cos_t , - sin_t ],
305+ b = [sin_t , cos_t , sin_t , cos_t ],
306+ c = [c1_mid - w / 2.0 , c2_mid - l / 2.0 , c1_mid + w / 2.0 , c2_mid + l / 2.0 ])
307+
308+
309+ class DualLShapeDetector :
310+ """
311+ A comparative detector class that runs both geometric and optimized L-shape fitting.
312+
313+ This class facilitates side-by-side visualization and performance analysis by
314+ encapsulating a standard LShapeFittingDetector and an OptimizedLShapeDetector.
315+ It is primarily used for validating the accuracy gains of non-linear optimization
316+ over traditional geometric search methods.
317+ """
318+ def __init__ (self , show_logger_debug = True ):
319+ """
320+ Initialize the dual detector with both fitting pipelines.
321+
322+ Args:
323+ show_logger_debug (bool): If True, enables console output for the
324+ optimized fitting stage.
325+ """
326+ self .geometric_detector = LShapeFittingDetector ()
327+ self .optimized_detector = OptimizedLShapeDetector (show_logger_debug = show_logger_debug )
328+ self .latest_rectangles_list = []
329+
330+ def update (self , point_cloud ):
331+ """
332+ Update both detection pipelines with the latest LiDAR point cloud data.
333+
334+ Processes the raw cluster through the geometric sweep first, followed
335+ by the optimized Huber-weighted polish. Results are aggregated into
336+ a single list for state management and visualization.
337+
338+ Args:
339+ point_cloud (numpy.ndarray): The raw (2, N) LiDAR point cloud cluster.
340+ """
341+ # 1. Update both detectors
342+ self .geometric_detector .update (point_cloud )
343+ self .optimized_detector .update (point_cloud )
344+
345+ # 2. Combine results for the vehicle state if needed
346+ # We show both in the list so they can both be drawn
347+ self .latest_rectangles_list = (self .geometric_detector .latest_rectangles_list +
348+ self .optimized_detector .latest_rectangles_list )
349+
350+ def draw (self , axes , elems , x_m , y_m , yaw_rad ):
351+ """
352+ Visualize the OBB results from both detectors on a 2D plot.
353+
354+ Renders the geometric fit as a thin blue line and the optimized fit
355+ as a thick red line to clearly distinguish the refinement polish.
356+
357+ Args:
358+ axes (matplotlib.axes.Axes): The axes object for the current plot.
359+ elems (list): A list of plot elements to be updated/cleared.
360+ x_m (float): Current vehicle X position in world frame.
361+ y_m (float): Current vehicle Y position in world frame.
362+ yaw_rad (float): Current vehicle heading in radians.
363+ """
364+ def custom_draw (rect_obj , color_code , label_prefix , lw = 2 ):
365+ """Internal helper to handle homogeneous transformation and plotting."""
366+ transformed_contour = rect_obj .contour .homogeneous_transformation (x_m , y_m , yaw_rad )
367+ rect_plot , = axes .plot (transformed_contour .get_x_data (),
368+ transformed_contour .get_y_data (),
369+ color = color_code , ls = '-' , linewidth = lw , label = label_prefix )
370+ elems .append (rect_plot )
371+
372+ transformed_center = rect_obj .center_xy .homogeneous_transformation (x_m , y_m , yaw_rad )
373+ center_plot , = axes .plot (transformed_center .get_x_data (),
374+ transformed_center .get_y_data (),
375+ marker = '.' , color = color_code )
376+ elems .append (center_plot )
377+
378+ # Draw Geometric (Blue) - Thin line
379+ for rect in self .geometric_detector .latest_rectangles_list :
380+ custom_draw (rect , 'b' , 'Geometric' , lw = 1.5 )
381+
382+ # Draw Optimized (Red) - Thick line
383+ for rect in self .optimized_detector .latest_rectangles_list :
384+ custom_draw (rect , 'r' , 'Optimized' , lw = 3.0 )
0 commit comments