diff --git a/autonav_ws/src/autonav_odometry/CMakeLists.txt b/autonav_ws/src/autonav_odometry/CMakeLists.txt new file mode 100644 index 0000000..7764169 --- /dev/null +++ b/autonav_ws/src/autonav_odometry/CMakeLists.txt @@ -0,0 +1,24 @@ +cmake_minimum_required(VERSION 3.8) +project(autonav_odometry) + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +# find dependencies +find_package(ament_cmake_python REQUIRED) +find_package(rclpy REQUIRED) +find_package(autonav_shared REQUIRED) + +# Install Python modules +ament_python_install_package(${PROJECT_NAME}) + +# Install Python programs +install(PROGRAMS + # enumerate every file in src by hand here like a bozo because globs don't just work i guess + src/main.py + src/particlefilter.py + DESTINATION lib/${PROJECT_NAME} +) + +ament_package() diff --git a/autonav_ws/src/autonav_odometry/autonav_odometry/__init__.py b/autonav_ws/src/autonav_odometry/autonav_odometry/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/autonav_ws/src/autonav_odometry/package.xml b/autonav_ws/src/autonav_odometry/package.xml new file mode 100644 index 0000000..0f7ed09 --- /dev/null +++ b/autonav_ws/src/autonav_odometry/package.xml @@ -0,0 +1,18 @@ + + + + autonav_odometry + 0.1.0 + Thingies to track the robot's position + Doruk Ayhan + MIT + + ament_cmake_python + + rclpy + autonav_shared + + + ament_cmake + + \ No newline at end of file diff --git a/autonav_ws/src/autonav_odometry/resource/autonav_odometry b/autonav_ws/src/autonav_odometry/resource/autonav_odometry new file mode 100644 index 0000000..e69de29 diff --git a/autonav_ws/src/autonav_odometry/src/main.py b/autonav_ws/src/autonav_odometry/src/main.py new file mode 100644 index 0000000..5b5c972 --- /dev/null +++ b/autonav_ws/src/autonav_odometry/src/main.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +# filters.py from last year for the most part + +import json +from types import SimpleNamespace +from autonav_msgs.msg import MotorFeedback, GPSFeedback, Position +from autonav_shared.types import DeviceState, SystemState, LogLevel # i am goig to beat pylance with a rock +from particlefilter import ParticleFilter +# from scr_msgs.msg import SystemState +from autonav_shared.node import Node +from enum import IntEnum +import rclpy + +class FilterType(IntEnum): + PARTICLE_FILTER = 1 + # ADD NEW FILTERS HERE + +class FiltersNodeConfig: + def __init__(self): + self.filter_type = FilterType.PARTICLE_FILTER + +class FiltersNode(Node): + def __init__(self): + super().__init__("autonav_odometry") + # init gps and position + self.first_gps = None + self.last_gps = None + self.latitude_length = self.declare_parameter("latitude_length", 111086.2).get_parameter_value().double_value + self.longitude_length = self.declare_parameter("longitude_length", 81978.2).get_parameter_value().double_value + # init filters + # IF WE IMPLEMENT MORE FILTERS ADD THEM HERE (AND ANYWHERE THAT COMES UP WHEN YOU CTRL-F "NEW FILTERS")! + self.pf = ParticleFilter(self.latitude_length, self.longitude_length) + self.config = FiltersNodeConfig() + self.onReset() + + def config_updated(self, jsonObject): + self.config = json.loads(self.jdump(jsonObject), object_hook=lambda d: SimpleNamespace(**d)) + # reinit filters after getting new config + self.onReset() + + def init(self): + # [melee announcer voice] ready... + self.create_subscription(GPSFeedback, "/autonav/gps", self.onGPSReceived, 20) + self.create_subscription(MotorFeedback, "/autonav/MotorFeedback", self.onMotorFeedbackReceived, 20) + self.position_publisher = self.create_publisher(Position, "/autonav/position", 20) + # go! + self.set_device_state(DeviceState.OPERATING) + + def onReset(self): + self.first_gps = None + self.last_gps = None + self.pf.init_particles() + # INIT NEW FILTERS HERE + + def system_state_transition(self, old: SystemState, updated: SystemState): + # reinit filters when robot changes modes + if old.state != SystemState.AUTONOMOUS and updated.state == SystemState.AUTONOMOUS: + self.onReset() + + if old.state != SystemState.MANUAL and updated.state == SystemState.MANUAL: + self.onReset() + # or gets going + if old.mobility == False and updated.mobility == True: + self.onReset() + + def onGPSReceived(self, msg: GPSFeedback): + if msg.gps_fix == 0 and msg.is_locked == False: + return + + if self.first_gps is None: + self.first_gps = msg + + self.last_gps = msg + # USE NEW FILTERS HERE + if self.config.filter_type == FilterType.PARTICLE_FILTER: + gps_x, gps_y = self.pf.gps(msg) + self.log(f"PF back from GPS ping w/ X={gps_x}, Y={gps_y}", LogLevel.DEBUG) + else: + self.log(f"{self.config.filter_type} isn't a valid FilterType! Did you implement a new filter and forget to use it?", LogLevel.FATAL) + + def onMotorFeedbackReceived(self, msg: MotorFeedback): + averages = None + # USE NEW FILTERS HERE + if self.config.filter_type == FilterType.PARTICLE_FILTER: + avg_x, avg_y, avg_theta = self.pf.feedback(msg) + self.log(f"PF back from motor odom w/ X={avg_x}, Y={avg_y}, θ={avg_theta}", LogLevel.DEBUG) + else: + self.log(f"{self.config.filter_type} isn't a valid FilterType! Did you implement a new filter and forget to use it?", LogLevel.FATAL) + + if averages is None: + return + # unpack what the filter just returned + position = Position() + position.x = averages[0] + position.y = averages[1] + position.theta = averages[2] + + if self.first_gps is not None: + gps_x = self.first_gps.latitude + position.x / self.latitude_length + gps_y = self.first_gps.longitude - position.y / self.longitude_length + position.latitude = gps_x + position.longitude = gps_y + + self.position_publisher.publish(position) + +def main(): + rclpy.init() + node = FiltersNode() + rclpy.spin(node) + rclpy.shutdown() + +if __name__ == "__main__": + main() diff --git a/autonav_ws/src/autonav_odometry/src/particlefilter.py b/autonav_ws/src/autonav_odometry/src/particlefilter.py new file mode 100644 index 0000000..2013f48 --- /dev/null +++ b/autonav_ws/src/autonav_odometry/src/particlefilter.py @@ -0,0 +1,102 @@ +### particlefilter.py from last year ### +# TODO could all the for-each-particle math in this code be done ✨🚀faster🚀✨ with numpy than vanilla python? +from autonav_msgs.msg import MotorFeedback, GPSFeedback, Position +import numpy as np +import math +import random + + +class Particle: + def __init__(self, x=0, y=0, theta=0, weight=1) -> None: + self.x = x + self.y = y + self.theta = theta + self.weight = weight + + +class ParticleFilter: + def __init__(self, latitudeLength, longitudeLength) -> None: + self.num_particles = 750 + self.gps_noise = 0.45 + self.odom_noise = (0, 0, 0.1) + # TODO should these parameters be in a config file in autonav_odometry/resource? + self.init_particles() + # self.first_gps = None + + self.latitude_length = latitudeLength + self.longitude_length = longitudeLength + + def init_particles(self, seedHeading: float = 0.0, useSeedHeading: bool = False): + self.first_gps = None + if useSeedHeading: # particles generally point in an initial direction + self.particles = [Particle(0, 0, seedHeading + np.random.normal(0, 0.1)) for i in range(self.num_particles)] + else: # particles point literally everywhere + self.particles = [Particle(0, 0, i / self.num_particles * 2 * math.pi) for i in range(self.num_particles)] + + def feedback(self, feedback: MotorFeedback) -> tuple[float, float, float]: + sum_x = 0 + sum_y = 0 + sum_theta_x = 0 + sum_theta_y = 0 + sum_weight = 0 + + for particle in self.particles: + # update particles + # MotorFeedback gives robot-relative data (x is forward, y is left (???)) so we convert it to track-relative with trig magic + # TODO why are we scaling down delta_x and delta_theta again? + particle.x += feedback.delta_x * 0.95 * math.cos(particle.theta) + feedback.delta_y * math.sin(particle.theta) + particle.y += feedback.delta_x * 0.95 * math.sin(particle.theta) + feedback.delta_y * math.cos(particle.theta) + particle.theta += feedback.delta_theta * 0.8 + particle.theta = particle.theta % (2 * math.pi) + # weighted sum of particles + weight = particle.weight ** 2 + sum_x += particle.x * weight + sum_y += particle.y * weight + sum_theta_x += math.cos(particle.theta) * weight + sum_theta_y += math.sin(particle.theta) * weight + sum_weight += weight + + if sum_weight < 0.000001: + sum_weight = 0.000001 + + # now weighted average + avg_x = sum_x / sum_weight + avg_y = sum_y / sum_weight + avg_theta = math.atan2(sum_theta_y / sum_weight, sum_theta_x / sum_weight) % (2 * math.pi) + + return (avg_x, avg_y, avg_theta) + + def gps(self, gps: GPSFeedback) -> tuple[float, float]: + if self.first_gps is None: + self.first_gps = gps + + # convert latlon to distance offset from first gps reading + gps_x = (gps.latitude - self.first_gps.latitude) * self.latitude_length + gps_y = (self.first_gps.longitude - gps.longitude) * self.longitude_length + + # adjust particle weights - closer to gps reading is better + for particle in self.particles: + dist_sqrt = np.sqrt((particle.x - gps_x) ** 2 + (particle.y - gps_y) ** 2) # root mean square error + particle.weight = math.exp(-dist_sqrt / (2 * self.gps_noise ** 2)) # less accurate gps = weights decrease smoother further from reading + if particle.weight <= 0.00001 / len(self.particles): # prevent resample() from blowing up if bot teleports 100 km away + particle.weight = 0.00001 / len(self.particles) + self.resample() + return (gps_x, gps_y) + + def resample(self) -> None: + weights = [particle.weight for particle in self.particles] + weights_sum = sum(weights) + if weights_sum <= 0.00001: + weights_sum = 0.00001 + weights = [weight / weights_sum for weight in weights] + + new_particles = random.choices(self.particles, weights, k=self.num_particles) + self.particles = [] + + for particle in new_particles: + rand_x = np.random.normal(0, self.odom_noise[0]) + rand_y = np.random.normal(0, self.odom_noise[1]) + x = particle.x + rand_x * math.cos(particle.theta) + rand_y * math.sin(particle.theta) + y = particle.y + rand_x * math.sin(particle.theta) + rand_y * math.cos(particle.theta) + theta = np.random.normal(particle.theta, self.odom_noise[2]) % (2 * math.pi) + self.particles.append(Particle(x, y, theta, particle.weight)) \ No newline at end of file