1- import abc
21import typing
3- from abc import abstractmethod
42from dataclasses import dataclass
5- from enum import Enum
63
74import numpy as np
85from scipy .linalg import solve_discrete_are
96
7+ from biped_walking_controller .state_machine import WalkingState , Foot
8+
109
1110def linear_interpolation (t : np .ndarray , pos_begin : np .ndarray , pos_end : np .ndarray ) -> np .ndarray :
1211 return (1 - t )[:, None ] * pos_begin + t [:, None ] * pos_end
@@ -15,6 +14,13 @@ def linear_interpolation(t: np.ndarray, pos_begin: np.ndarray, pos_end: np.ndarr
1514def cubic_spline_interpolation (
1615 t : np .ndarray , pos_begin : np .ndarray , pos_end : np .ndarray
1716) -> np .ndarray :
17+ if isinstance (t , float ):
18+ return (
19+ pos_begin
20+ + 3.0 * (pos_end - pos_begin ) * np .square (t )
21+ - 2.0 * (pos_end - pos_begin ) * np .pow (t , 3 )
22+ )
23+
1824 return (
1925 pos_begin
2026 + 3.0 * (pos_end - pos_begin ) * np .square (t )[:, None ]
@@ -255,3 +261,243 @@ def update_control(ctrl_mat: PreviewControllerMatrices, current_zmp, zmp_ref, x,
255261 y_next [1 :] = ctrl_mat .A @ y [1 :] + ctrl_mat .B .ravel () * u [1 ]
256262
257263 return u , x_next , y_next
264+
265+
266+ def build_zmp_horizon (
267+ com_initial_target ,
268+ t_horizon : float ,
269+ t_state : float ,
270+ state : WalkingState ,
271+ delta_t : float ,
272+ current_step_idx : int ,
273+ steps_sequence : np .ndarray ,
274+ steps_foot : typing .List [Foot ],
275+ ss_t : float ,
276+ ds_t : float ,
277+ t_init : float ,
278+ t_end : float ,
279+ interp_fn = cubic_spline_interpolation ,
280+ ) -> tuple [np .ndarray , np .ndarray ]:
281+ """
282+ Build a ZMP reference over a preview horizon based on the current walking state.
283+
284+ Parameters
285+ ----------
286+ t_horizon : float
287+ Length of the preview horizon [s].
288+ t_state : float
289+ Time elapsed in the current state [s].
290+ state : WalkingState
291+ Current walking state (INIT, DS, SS_LEFT, SS_RIGHT, END).
292+ delta_t : float
293+ Sampling period of the preview horizon [s].
294+ current_step_idx : int
295+ Index of the current step in `steps_sequence`. Interpreted as:
296+ - In SS: index of the current support foot.
297+ - In DS: index of the *target* foot (previous is current_step_idx - 1).
298+ steps_sequence : np.ndarray
299+ Sequence of footsteps. Shape (N, 2) or (N, >=2).
300+ Only the (x, y) components are used as ZMP targets.
301+ ss_t : float
302+ Duration of a single-support phase [s].
303+ ds_t : float
304+ Duration of a double-support phase [s].
305+ t_init : float
306+ Duration of the INIT phase [s].
307+ t_end : float
308+ Duration of the END phase [s].
309+ interp_fn : callable
310+ Interpolation function used in double support.
311+ Expected signature: interp_fn(alpha, p0, p1)
312+ where alpha in [0, 1], p0, p1 are 2D numpy arrays.
313+
314+ Returns
315+ -------
316+ t_samples : np.ndarray, shape (N,)
317+ Relative time samples over the horizon, starting at 0.
318+ zmp_horizon : np.ndarray, shape (N, 2)
319+ ZMP reference (x, y) at each time sample.
320+
321+ Notes
322+ -----
323+ - This function does NOT enforce continuity with the *previous* ZMP reference.
324+ At each call, the horizon is built from scratch from the current state.
325+ - State-transition logic is a simple cyclic model:
326+ INIT -> DS -> SS -> DS -> SS -> ... -> END
327+ and the step index increments when leaving an SS state.
328+ """
329+ # Sanity checks and normalization
330+ steps = np .asarray (steps_sequence , dtype = float )
331+ if steps .ndim != 2 or steps .shape [0 ] == 0 :
332+ raise ValueError ("steps_sequence must be a non-empty (N, D) array" )
333+
334+ # Use only x, y
335+ steps_xy = steps [:, :2 ]
336+ n_steps = steps_xy .shape [0 ]
337+
338+ step_idx = int (np .clip (current_step_idx , 0 , n_steps - 1 ))
339+
340+ def state_duration (s : WalkingState ) -> float :
341+ if s == WalkingState .INIT :
342+ return t_init
343+ if s == WalkingState .DS :
344+ return ds_t
345+ if s in (WalkingState .SS_LEFT , WalkingState .SS_RIGHT ):
346+ return ss_t
347+ if s == WalkingState .END :
348+ return t_end
349+ raise ValueError (f"Unknown state: { s } " )
350+
351+ def next_state_and_step (
352+ s : WalkingState , idx : int , steps_foot : typing .List [Foot ]
353+ ) -> tuple [WalkingState , int ]:
354+ """Simple progression model over footsteps.
355+
356+ INIT -> DS(step 0)
357+ DS(k) -> SS(k)
358+ SS(k) -> DS(k+1) (clamped to last step)
359+ END stays END
360+ """
361+ if s == WalkingState .DS or s == WalkingState .INIT :
362+ # Land on target step
363+ if idx < n_steps - 1 :
364+ return (
365+ WalkingState .SS_LEFT if steps_foot [idx ] is Foot .LEFT else WalkingState .SS_RIGHT
366+ ), idx
367+ else :
368+ return WalkingState .END , idx
369+
370+ if s in (WalkingState .SS_LEFT , WalkingState .SS_RIGHT ):
371+ # Move to next DS between current and next step
372+ if idx < n_steps - 1 :
373+ return WalkingState .DS , idx + 1
374+
375+ if s == WalkingState .END :
376+ return WalkingState .END , idx
377+
378+ raise ValueError (f"Unknown state: { s } " )
379+
380+ def zmp_for_state (s : WalkingState , idx : int , t_in_state : float ) -> np .ndarray :
381+ """Return ZMP position (x, y) for a given state and time in state."""
382+ # Clamp index
383+ idx = int (np .clip (idx , 0 , n_steps - 1 ))
384+
385+ if s == WalkingState .INIT :
386+ # Keep ZMP on the first support
387+ p0 = com_initial_target
388+ p1 = steps_xy [0 ]
389+
390+ alpha = np .clip (t_in_state / t_init , 0.0 , 1.0 )
391+ return interp_fn (alpha , p0 , p1 )
392+
393+ if s in (WalkingState .SS_LEFT , WalkingState .SS_RIGHT ):
394+ # ZMP fixed at current support foot
395+ return steps_xy [idx ]
396+
397+ if s == WalkingState .DS :
398+ # ZMP moves between previous and current step
399+ if idx <= 0 :
400+ p0 = com_initial_target
401+ p1 = steps_xy [0 ]
402+ else :
403+ p0 = steps_xy [idx - 1 ]
404+ p1 = steps_xy [idx ]
405+
406+ alpha = np .clip (t_in_state / max (ds_t , 1e-6 ), 0.0 , 1.0 )
407+ return interp_fn (alpha , p0 , p1 )
408+
409+ if s == WalkingState .END :
410+ # Keep ZMP on last support
411+ return (steps_xy [- 2 ] + steps_xy [- 1 ]) / 2.0
412+
413+ raise ValueError (f"Unknown state: { s } " )
414+
415+ # Allocate horizon
416+ n_samples = int (np .floor (t_horizon / delta_t ))
417+ t_samples = np .arange (n_samples , dtype = float ) * delta_t
418+ zmp_horizon = np .zeros ((n_samples , 2 ), dtype = float )
419+
420+ # Simulated state over the horizon
421+ sim_state = state
422+ time_in_state = t_state
423+ sim_step_idx = step_idx
424+
425+ for k in range (n_samples ):
426+ # Advance state machine if needed (except END which just saturates)
427+ while sim_state != WalkingState .END and time_in_state >= state_duration (sim_state ):
428+ time_in_state -= state_duration (sim_state )
429+ sim_state , sim_step_idx = next_state_and_step (sim_state , sim_step_idx , steps_foot )
430+
431+ # Compute ZMP for current simulated state
432+ zmp_horizon [k ] = zmp_for_state (sim_state , sim_step_idx , time_in_state )
433+
434+ # Advance local time
435+ time_in_state += delta_t
436+
437+ return t_samples , zmp_horizon
438+
439+
440+ class CentroidalPlanner :
441+ def __init__ (
442+ self ,
443+ dt : float ,
444+ com_initial_target : np .ndarray ,
445+ params : PreviewControllerParams ,
446+ ):
447+ self .params = params
448+ self .dt = dt
449+ self .ctrler_mat = compute_preview_control_matrices (params , dt )
450+ self .x = np .array ([0.0 , com_initial_target [0 ], 0.0 , 0.0 ], dtype = float )
451+ self .y = np .array ([0.0 , com_initial_target [1 ], 0.0 , 0.0 ], dtype = float )
452+ self .steps_sequence = None
453+ self .steps_foot = None
454+ self .step_idx = 0
455+ self .zmp_ref_horizon = None
456+
457+ def set_steps_sequence (self , steps_sequence : np .ndarray , steps_foot : typing .List [Foot ]):
458+ # We assign the new sequence and reset the step counter
459+ self .steps_sequence = steps_sequence
460+ self .steps_foot = steps_foot
461+ self .step_idx = 0
462+
463+ def update (
464+ self ,
465+ com_initial_target ,
466+ state : WalkingState ,
467+ step_idx : int ,
468+ t_state : float ,
469+ t_init : float ,
470+ t_end : float ,
471+ t_ss : float ,
472+ t_ds : float ,
473+ ):
474+ # Update control
475+ _ , self .zmp_ref_horizon = build_zmp_horizon (
476+ com_initial_target ,
477+ t_horizon = (self .params .n_preview_steps - 1 ) * self .dt ,
478+ t_state = t_state ,
479+ state = state ,
480+ delta_t = self .dt ,
481+ current_step_idx = step_idx ,
482+ steps_sequence = self .steps_sequence ,
483+ steps_foot = self .steps_foot ,
484+ ss_t = t_ss ,
485+ ds_t = t_ds ,
486+ t_init = t_init ,
487+ t_end = t_end ,
488+ interp_fn = cubic_spline_interpolation ,
489+ )
490+
491+ _ , self .x , self .y = update_control (
492+ self .ctrler_mat ,
493+ self .zmp_ref_horizon [0 ],
494+ self .zmp_ref_horizon ,
495+ self .x .copy (),
496+ self .y .copy (),
497+ )
498+
499+ def get_com_pos (self ) -> typing .Tuple [float , float ]:
500+ return self .x [1 ], self .y [1 ]
501+
502+ def get_ref_horizon (self ):
503+ return self .zmp_ref_horizon
0 commit comments