1- """
2- @file: circular_agent.py
3- @breif: CircularAgent class for the environment.
4- @author: Wu Maojia
5- @update: 2025.9.20
6- """
71from typing import List , Dict , Tuple , Optional
8-
92import numpy as np
103
4+ from python_motion_planning .common .utils .geometry import Geometry
5+
116
127class BaseRobot :
138 """
14- Agent 基类:只包含物理参数和状态容器,负责定义观测格式。
15- - dim: 空间维度(N)
16- - mass: 质量
17- - radius: 形状半径(用于碰撞)
18- - pos, vel: 当前状态(numpy arrays)
19- - acc: 当前瞬时加速度(set by controller before env.step 使用)
9+ Base class for robots.
10+
11+ Args:
12+ dim: Space dimension
13+ mass: Mass of the robot
14+ pose: Current pose (position + orientation) (world frame)
15+ vel: Current velocity (linear + angular) (world frame)
16+ max_lin_speed: Maximum linear speed
17+ max_ang_speed: Maximum angular speed
18+ action_min: Minimum action bounds (robot frame)
19+ action_max: Maximum action bounds (robot frame)
2020 """
21- def __init__ (self , dim : int = 2 , mass : float = 1.0 , radius : float = 0.1 ,
22- pos : Optional [np .ndarray ] = None , vel : Optional [np .ndarray ] = None ,
21+ def __init__ (self , dim : int = 2 , mass : float = 1.0 ,
22+ pose : Optional [np .ndarray ] = None , vel : Optional [np .ndarray ] = None ,
23+ max_lin_speed : float = np .inf , max_ang_speed : float = np .inf ,
2324 action_min : Optional [np .ndarray ] = None , action_max : Optional [np .ndarray ] = None ):
2425 self .dim = dim
26+ if dim not in (2 , 3 ):
27+ raise NotImplementedError (f"Only 2D and 3D are supported, got { dim } D" )
28+
29+ self .pose_dim = 3 if dim == 2 else 6
30+
2531 self .mass = float (mass )
26- self .radius = float (radius )
27- self .pos = np .zeros (dim ) if pos is None else np .array (pos , dtype = float )
28- self .vel = np .zeros (dim ) if vel is None else np .array (vel , dtype = float )
32+
33+ # Pose: position + orientation
34+ # 2D: [x, y, theta] where theta is angle in radians
35+ # 3D: [x, y, z, roll, pitch, yaw]
36+ self ._pose = np .zeros (self .pose_dim ) if pose is None else np .array (pose , dtype = float )
37+ self ._vel = np .zeros (self .pose_dim ) if vel is None else np .array (vel , dtype = float )
38+
39+ if len (self ._pose ) != self .pose_dim :
40+ raise ValueError (f"len(pose) must be { self .pose_dim } if dim=={ self .dim } , got { len (pose )} " )
41+
42+ if len (self ._vel ) != self .pose_dim :
43+ raise ValueError (f"len(vel) must be { self .pose_dim } if dim=={ self .dim } , got { len (vel )} " )
44+
45+ self .max_lin_speed = max_lin_speed
46+ self .max_ang_speed = max_ang_speed
47+
2948 # acceleration is set externally by controller each step
30- self .acc = np .zeros (dim )
49+ # 2D actions: [longitudinal_acc, lateral_acc, angular_acc]
50+ # 3D actions: [x_acc, y_acc, z_acc, roll_acc, pitch_acc, yaw_acc]
51+ self .acc = np .zeros (self .pose_dim )
52+
3153 # action bounds per-dim (controller output bounds)
3254 if action_min is None :
33- action_min = - np .ones (dim ) * 1.0
55+ action_min = - np .ones (self . pose_dim ) * 1.0
3456 if action_max is None :
35- action_max = np .ones (dim ) * 1.0
57+ action_max = np .ones (self . pose_dim ) * 1.0
3658 self .action_min = np .array (action_min , dtype = float )
3759 self .action_max = np .array (action_max , dtype = float )
3860
61+ @property
62+ def pose (self ):
63+ """Get position + orientation"""
64+ return self ._pose
65+
66+ @pose .setter
67+ def pose (self , value : np .ndarray ) -> None :
68+ """Set position + orientation"""
69+ self ._pose [:self .dim ] = value [:self .dim ]
70+ self ._pose [self .dim :] = Geometry .regularize_orient (value [self .dim :])
71+
72+ @property
73+ def vel (self ):
74+ """Get linear + angular velocity"""
75+ return self ._vel
76+
77+ @vel .setter
78+ def vel (self , value : np .ndarray ) -> None :
79+ """Set linear + angular velocity"""
80+ self ._vel = value
81+
82+ @property
83+ def pos (self ):
84+ """Get position from pose"""
85+ return self ._pose [:self .dim ]
86+
87+ @pos .setter
88+ def pos (self , value : np .ndarray ):
89+ """Set position in pose"""
90+ self ._pose [:self .dim ] = value
91+
92+ @property
93+ def orient (self ):
94+ """Get orientation from pose"""
95+ return self ._pose [self .dim :]
96+
97+ @orient .setter
98+ def orient (self , value : np .ndarray ):
99+ """Set orientation in pose"""
100+ self ._pose [self .dim :] = Geometry .regularize_orient (value )
101+
102+ @property
103+ def lin_vel (self ):
104+ """Get linear velocity"""
105+ return self ._vel [:self .dim ]
106+
107+ @lin_vel .setter
108+ def lin_vel (self , value : np .ndarray ):
109+ """Set linear velocity"""
110+ self ._vel [:self .dim ] = value
111+
112+ @property
113+ def ang_vel (self ):
114+ """Get angular velocity"""
115+ return self ._vel [self .dim :]
116+
117+ @ang_vel .setter
118+ def ang_vel (self , value : np .ndarray ):
119+ """Set angular velocity"""
120+ self ._vel [self .dim :] = value
121+
39122 def observation_size (self , env ) -> int :
40123 """
41- 默认观测:自身 pos, vel (2*dim) + 所有其他 agent 的相对位置 ( (n-1)*dim )
42- 你可以重载该函数改变观测结构。
124+ Default observation space: [pos, orientation, vel, ang_vel]
43125 """
44- n_agents = len (env .agents )
45- return 2 * self .dim + (n_agents - 1 ) * self .dim
126+ # Pose (Position (dim) + orientation (1 if dim==2 or 3 if dim==3)) +
127+ # velocity (linear velocity (dim) + angular velocity (1 if dim==2 or 3 if dim==3))
128+ if self .dim == 2 :
129+ orient_dim = 1
130+ elif self .dim == 3 :
131+ orient_dim = 3
132+ else :
133+ raise ValueError ("Invalid dimension" )
134+
135+ return 2 * self .dim + 2 * orient_dim
46136
47137 def get_observation (self , env ) -> np .ndarray :
48138 """
49- 返回观测向量(1D numpy array)。
50- 默认格式: [pos, vel, rel_pos_agent1, rel_pos_agent2, ...]
51- 相对位置按照 env.agents 列表顺序(跳过 self)。
139+ Get observation vector for this robot including orientation.
52140 """
53141 obs = []
54- obs .extend (self .pos .tolist ())
55- obs .extend (self .vel .tolist ())
56- for a in env .agents :
57- if a is self :
58- continue
59- rel = (a .pos - self .pos )
60- obs .extend (rel .tolist ())
142+ obs .extend (self .pos .tolist ()) # Position
143+ obs .extend (self .orient .tolist ()) # Orientation
144+ obs .extend (self .lin_vel .tolist ()) # Linear velocity
145+ obs .extend (self .ang_vel .tolist ()) # Angular velocity
146+
61147 return np .array (obs , dtype = float )
62148
149+ def kinematic_model (self , pose : np .ndarray , vel : np .ndarray , acc : np .ndarray , env_acc : np .ndarray , dt : float ) -> Tuple [np .ndarray , np .ndarray , dict ]:
150+ """
151+ Kinematic model (used to simulate the robot motion without updating the robot state)
152+
153+ Args:
154+ pose: robot pose (world frame)
155+ vel: robot velocity (world frame)
156+ acc: robot acceleration action (world frame)
157+ env_acc: environment acceleration (world frame)
158+ dt: time step length
159+
160+ Returns:
161+ pose: new robot pose (world frame)
162+ vel: new robot velocity (world frame)
163+ info: auxiliary information
164+ """
165+ net_acc = acc + env_acc # acc is clipped. env_acc no need to clip.
166+
167+ # semi-implicit Euler integration
168+ vel = vel + net_acc * dt
169+
170+ # clip linear and angular velocity
171+ vel = self .clip_velocity (vel )
172+
173+ # update pose
174+ pose = pose + vel * dt
175+
176+ return pose , vel , {}
177+
178+ def step (self , env_acc : np .ndarray , dt : float ) -> None :
179+ """
180+ Take a step in simulation using differential drive kinematics.
181+ self.acc and self.vel are in world frame. You have to transform them into robot frame if needed.
182+
183+ Args:
184+ env_acc: acceleration vector from environment
185+ dt: time step size
186+ """
187+ self .pose , self .vel , info = self .kinematic_model (self .pose , self .vel , self .acc , env_acc , dt )
188+
189+ def clip_linear_velocity (self , lv : np .ndarray ) -> np .ndarray :
190+ """Clip linear velocity to maximum allowed value."""
191+ return lv if np .linalg .norm (lv ) <= self .max_lin_speed else lv / np .linalg .norm (lv ) * self .max_lin_speed
192+
193+ def clip_angular_velocity (self , av : float or np .ndarray ) -> float or np .ndarray :
194+ """Clip angular velocity to maximum allowed value."""
195+ if self .dim == 2 :
196+ return av if abs (av ) <= self .max_ang_speed else np .sign (av ) * self .max_ang_speed
197+ else : # 3D
198+ norm = np .linalg .norm (av )
199+ return av if norm <= self .max_ang_speed else av / norm * self .max_ang_speed
200+
201+ def clip_velocity (self , v : np .ndarray ) -> np .ndarray :
202+ """Clip linear and angular velocity to maximum allowed value."""
203+ lv = v [:self .dim ]
204+ av = v [self .dim :self .pose_dim ]
205+ return np .concatenate ([self .clip_linear_velocity (lv ), self .clip_angular_velocity (av )])
206+
63207 def clip_action (self , a : np .ndarray ) -> np .ndarray :
64- return np .minimum (np .maximum (a , self .action_min ), self .action_max )
208+ """Clip action to action bounds."""
209+ return np .minimum (np .maximum (a , self .action_min ), self .action_max )
0 commit comments