|
| 1 | +import math |
| 2 | +from abc import ABC, abstractmethod |
| 3 | +from typing import Optional |
| 4 | + |
| 5 | +import numpy as np |
| 6 | +import numpy.typing as npt |
| 7 | +import soccer_vision_3d_msgs.msg as sv3dm |
| 8 | +import tf2_ros as tf2 |
| 9 | +from bitbots_rust_nav import RoundObstacle |
| 10 | +from geometry_msgs.msg import Pose, PoseStamped |
| 11 | +from nav_msgs.msg import Path |
| 12 | +from rclpy.time import Time |
| 13 | +from ros2_numpy import numpify |
| 14 | +from tf2_geometry_msgs import PointStamped, PoseWithCovarianceStamped |
| 15 | +from tf_transformations import euler_from_quaternion |
| 16 | + |
| 17 | +from bitbots_path_planning import NodeWithConfig |
| 18 | + |
| 19 | + |
| 20 | +class FootstepPlanner(ABC): |
| 21 | + @abstractmethod |
| 22 | + def set_goal(self, pose: PoseStamped) -> None: |
| 23 | + pass |
| 24 | + |
| 25 | + @abstractmethod |
| 26 | + def cancel_goal(self) -> None: |
| 27 | + pass |
| 28 | + |
| 29 | + @abstractmethod |
| 30 | + def set_robots(self, robots: sv3dm.RobotArray) -> None: |
| 31 | + pass |
| 32 | + |
| 33 | + @abstractmethod |
| 34 | + def set_ball(self, ball: PoseWithCovarianceStamped) -> None: |
| 35 | + pass |
| 36 | + |
| 37 | + @abstractmethod |
| 38 | + def avoid_ball(self, state: bool) -> None: |
| 39 | + pass |
| 40 | + |
| 41 | + @abstractmethod |
| 42 | + def step(self) -> Path: |
| 43 | + pass |
| 44 | + |
| 45 | + |
| 46 | +class VisibilityFinalstepPlanner(FootstepPlanner): |
| 47 | + def __init__(self, node: NodeWithConfig, buffer: tf2.BufferInterface) -> None: |
| 48 | + self.node = node |
| 49 | + self.buffer = buffer |
| 50 | + self.robots: list[RoundObstacle] = [] |
| 51 | + self.ball: Optional[RoundObstacle] = None |
| 52 | + self.goal: Optional[PoseStamped] = None |
| 53 | + self.base_footprint_frame: str = self.node.config.base_footprint_frame |
| 54 | + self.ball_obstacle_active: bool = True |
| 55 | + self.frame: str = self.node.config.map.planning_frame |
| 56 | + self.ready_for_final_step: bool = False |
| 57 | + |
| 58 | + # Roboter erstmal außenvor gelassen |
| 59 | + def set_robots(self, robots: sv3dm.RobotArray): |
| 60 | + new_buffer: list[RoundObstacle] = [] |
| 61 | + for robot in robots.robots: |
| 62 | + point = PointStamped() |
| 63 | + point.header.frame_id = robots.header.frame_id |
| 64 | + point.point = robot.bb.center.position |
| 65 | + # Use the maximum dimension of the bounding box as the radius |
| 66 | + radius = max(numpify(robot.bb.size)[:2]) / 2 |
| 67 | + try: |
| 68 | + # Transform the point to the planning frame |
| 69 | + position = self.buffer.transform(point, self.frame).point |
| 70 | + # Add the robot to the buffer if the transformation was successful |
| 71 | + new_buffer.append(RoundObstacle(center=(position.x, position.y), radius=radius)) |
| 72 | + except (tf2.ConnectivityException, tf2.LookupException, tf2.ExtrapolationException) as e: |
| 73 | + self.node.get_logger().warn(str(e)) |
| 74 | + self.robots = new_buffer |
| 75 | + |
| 76 | + def set_ball(self, ball: PoseWithCovarianceStamped) -> None: |
| 77 | + point = PointStamped() |
| 78 | + point.header.frame_id = ball.header.frame_id |
| 79 | + point.point = ball.pose.pose.position |
| 80 | + try: |
| 81 | + # Transform the point to the planning frame |
| 82 | + tf_point = self.buffer.transform(point, self.frame).point |
| 83 | + # Create a new ball obstacle |
| 84 | + self.ball = RoundObstacle(center=(tf_point.x, tf_point.y), radius=self.node.config.map.ball_diameter / 2.0) |
| 85 | + except (tf2.ConnectivityException, tf2.LookupException, tf2.ExtrapolationException) as e: |
| 86 | + self.ball = None |
| 87 | + self.node.get_logger().warn(str(e)) |
| 88 | + |
| 89 | + def set_goal(self, pose: PoseStamped) -> None: |
| 90 | + """ |
| 91 | + Updates the goal pose |
| 92 | + """ |
| 93 | + pose.header.stamp = Time(clock_type=self.node.get_clock().clock_type).to_msg() |
| 94 | + self.goal = pose |
| 95 | + |
| 96 | + def avoid_ball(self, state: bool) -> None: |
| 97 | + """ |
| 98 | + Activates or deactivates the ball obstacle |
| 99 | + """ |
| 100 | + self.ball_obstacle_active = state |
| 101 | + |
| 102 | + def cancel_goal(self) -> None: |
| 103 | + """ |
| 104 | + Removes the current goal |
| 105 | + """ |
| 106 | + self.goal = None |
| 107 | + self.path = None |
| 108 | + |
| 109 | + def step(self, published_steps: int) -> npt.NDArray[np.float64]: |
| 110 | + """ |
| 111 | + Computes the next step to the goal |
| 112 | + """ |
| 113 | + assert self.goal is not None, "No goal set" |
| 114 | + |
| 115 | + offset_vec = self.rotate_vector_2d(0.06, -0.02, self._get_yaw(self.goal.pose)) |
| 116 | + # Define goal |
| 117 | + goal = ( |
| 118 | + self.goal.pose.position.x - offset_vec[0], |
| 119 | + self.goal.pose.position.y - offset_vec[1], |
| 120 | + self.goal.pose.orientation.z, |
| 121 | + self.goal.pose.orientation.w, |
| 122 | + ) |
| 123 | + |
| 124 | + # Create an default pose in the origin of our base footprint |
| 125 | + pose_geometry_msgs = PoseStamped() |
| 126 | + pose_geometry_msgs.header.frame_id = self.node.config.base_footprint_frame |
| 127 | + |
| 128 | + # We get our pose in respect to the map frame (also frame of the path message) |
| 129 | + # by transforming the pose above into this frame |
| 130 | + try: |
| 131 | + current_pose: Pose = self.buffer.transform(pose_geometry_msgs, self.frame).pose |
| 132 | + except (tf2.ConnectivityException, tf2.LookupException, tf2.ExtrapolationException) as e: |
| 133 | + self.node.get_logger().warn("Failed to perform controller step: " + str(e)) |
| 134 | + return np.array([0, 0, 0.0, 0.0]) |
| 135 | + |
| 136 | + start = (current_pose.position.x, current_pose.position.y) |
| 137 | + |
| 138 | + robot_angle = self._get_yaw(current_pose) |
| 139 | + |
| 140 | + rot_vec = self.rotate_vector_2d(goal[0] - start[0], goal[1] - start[1], -robot_angle) |
| 141 | + |
| 142 | + if published_steps < 2: |
| 143 | + self.node.get_logger().info( |
| 144 | + "final_step published: nr: " |
| 145 | + + str(published_steps) |
| 146 | + + " x,y: " |
| 147 | + + str(rot_vec[0]) |
| 148 | + + " , " |
| 149 | + + str(rot_vec[1]) |
| 150 | + ) |
| 151 | + return np.array([rot_vec[0] * 0.5, (rot_vec[1] * 0.5), 0.0, 0.0]) |
| 152 | + elif published_steps < 3: |
| 153 | + self.node.get_logger().info( |
| 154 | + "final_step published: nr: " |
| 155 | + + str(published_steps) |
| 156 | + + " x,y: " |
| 157 | + + str(rot_vec[0]) |
| 158 | + + " , " |
| 159 | + + str(rot_vec[1]) |
| 160 | + ) |
| 161 | + return np.array([rot_vec[0], (rot_vec[1]), 0.0, 0.0]) |
| 162 | + elif published_steps < 4: |
| 163 | + self.node.get_logger().info( |
| 164 | + "final_step published: nr: " |
| 165 | + + str(published_steps) |
| 166 | + + " x,y: " |
| 167 | + + str(rot_vec[0]) |
| 168 | + + " , " |
| 169 | + + str(rot_vec[1]) |
| 170 | + ) |
| 171 | + return np.array([rot_vec[0], (rot_vec[1]), 0.0, 0.0]) |
| 172 | + else: |
| 173 | + if abs(rot_vec[1]) > 0.05: |
| 174 | + if abs(rot_vec[0]) > 0.05: |
| 175 | + self.node.get_logger().info( |
| 176 | + "final_step published: overhead" + " x,y: " + str(0.1) + " , " + str(0.1) |
| 177 | + ) |
| 178 | + return np.array([0.1, 0.1, 0.0, 0.0]) |
| 179 | + else: |
| 180 | + self.node.get_logger().info( |
| 181 | + "final_step published: overhead" + " x,y: " + str(0.0) + " , " + str(0.1) |
| 182 | + ) |
| 183 | + return np.array([0.0, 0.1, 0.0, 0.0]) |
| 184 | + elif abs(rot_vec[0]) > 0.05: |
| 185 | + self.node.get_logger().info("final_step published: overhead" + " x,y: " + str(0.1) + " , " + str(0.0)) |
| 186 | + return np.array([0.1, 0.0, 0.0, 0.0]) |
| 187 | + else: |
| 188 | + self.node.get_logger().info("final_step published: overhead" + " x,y: " + str(0.0) + " , " + str(0.0)) |
| 189 | + return np.array([0.0, 0.0, 0.0, 0.0]) |
| 190 | + |
| 191 | + def rotate_vector_2d(self, x, y, a) -> npt.NDArray[np.float64]: |
| 192 | + return np.array([x * math.cos(a) - y * math.sin(a), x * math.sin(a) + y * math.cos(a)]) |
| 193 | + |
| 194 | + def _get_yaw(self, pose: Pose) -> float: |
| 195 | + """ |
| 196 | + Returns the yaw angle of a given pose |
| 197 | + """ |
| 198 | + return euler_from_quaternion(numpify(pose.orientation))[2] |
0 commit comments