-
Notifications
You must be signed in to change notification settings - Fork 2
Feat/odometry #16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Feat/odometry #16
Changes from all commits
0323517
aa944bc
7bb5aab
60ac2bf
2ca1921
06c2344
b2f2ab4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| cmake_minimum_required(VERSION 3.8) | ||
| project(autonav_odometry) | ||
|
|
||
| if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") | ||
| add_compile_options(-Wall -Wextra -Wpedantic) | ||
| endif() | ||
|
|
||
| # find dependencies | ||
| find_package(ament_cmake_python REQUIRED) | ||
| find_package(rclpy REQUIRED) | ||
| find_package(autonav_shared REQUIRED) | ||
|
|
||
| # Install Python modules | ||
| ament_python_install_package(${PROJECT_NAME}) | ||
|
|
||
| # Install Python programs | ||
| install(PROGRAMS | ||
| # enumerate every file in src by hand here like a bozo because globs don't just work i guess | ||
| src/main.py | ||
| src/particlefilter.py | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. particlefilter.py just creates a class that the node uses. It doesn't need to be enumerated here since it's not a node. |
||
| DESTINATION lib/${PROJECT_NAME} | ||
| ) | ||
|
|
||
| ament_package() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| <?xml version="1.0"?> | ||
| <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> | ||
| <package format="3"> | ||
| <name>autonav_odometry</name> | ||
| <version>0.1.0</version> | ||
|
dorukayhan marked this conversation as resolved.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 2025.x.y, follow example in autonav_example_py |
||
| <description>Thingies to track the robot's position</description> | ||
| <maintainer email="dorukayhan@gmail.com">Doruk Ayhan</maintainer> | ||
| <license>MIT</license> | ||
|
|
||
| <buildtool_depend>ament_cmake_python</buildtool_depend> | ||
|
|
||
| <depend>rclpy</depend> | ||
| <depend>autonav_shared</depend> | ||
|
dorukayhan marked this conversation as resolved.
|
||
|
|
||
| <export> | ||
| <build_type>ament_cmake</build_type> | ||
| </export> | ||
| </package> | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| #!/usr/bin/env python3 | ||
| # filters.py from last year for the most part | ||
|
|
||
| import json | ||
| from types import SimpleNamespace | ||
| from autonav_msgs.msg import MotorFeedback, GPSFeedback, Position | ||
| from autonav_shared.types import DeviceState, SystemState, LogLevel # i am goig to beat pylance with a rock | ||
|
dorukayhan marked this conversation as resolved.
|
||
| from particlefilter import ParticleFilter | ||
| # from scr_msgs.msg import SystemState | ||
|
dorukayhan marked this conversation as resolved.
|
||
| from autonav_shared.node import Node | ||
| from enum import IntEnum | ||
| import rclpy | ||
|
|
||
| class FilterType(IntEnum): | ||
| PARTICLE_FILTER = 1 | ||
| # ADD NEW FILTERS HERE | ||
|
|
||
| class FiltersNodeConfig: | ||
| def __init__(self): | ||
| self.filter_type = FilterType.PARTICLE_FILTER | ||
|
|
||
| class FiltersNode(Node): | ||
| def __init__(self): | ||
| super().__init__("autonav_odometry") | ||
| # init gps and position | ||
| self.first_gps = None | ||
| self.last_gps = None | ||
| self.latitude_length = self.declare_parameter("latitude_length", 111086.2).get_parameter_value().double_value | ||
| self.longitude_length = self.declare_parameter("longitude_length", 81978.2).get_parameter_value().double_value | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this might be more of an in-person conversation (maybe probably with Tony) but I think lat and long length should be part of the config instead of being launch file parameters. |
||
| # init filters | ||
| # IF WE IMPLEMENT MORE FILTERS ADD THEM HERE (AND ANYWHERE THAT COMES UP WHEN YOU CTRL-F "NEW FILTERS")! | ||
| self.pf = ParticleFilter(self.latitude_length, self.longitude_length) | ||
| self.config = FiltersNodeConfig() | ||
| self.onReset() | ||
|
|
||
| def config_updated(self, jsonObject): | ||
|
dorukayhan marked this conversation as resolved.
|
||
| self.config = json.loads(self.jdump(jsonObject), object_hook=lambda d: SimpleNamespace(**d)) | ||
| # reinit filters after getting new config | ||
| self.onReset() | ||
|
|
||
| def init(self): | ||
| # [melee announcer voice] ready... | ||
| self.create_subscription(GPSFeedback, "/autonav/gps", self.onGPSReceived, 20) | ||
| self.create_subscription(MotorFeedback, "/autonav/MotorFeedback", self.onMotorFeedbackReceived, 20) | ||
|
dorukayhan marked this conversation as resolved.
|
||
| self.position_publisher = self.create_publisher(Position, "/autonav/position", 20) | ||
| # go! | ||
| self.set_device_state(DeviceState.OPERATING) | ||
|
|
||
| def onReset(self): | ||
| self.first_gps = None | ||
| self.last_gps = None | ||
| self.pf.init_particles() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should use the seed heading parameter of init_particles, or at least make whether we use it or not controlled by the config. this would've been handy last year, as we only ever started facing one direction, whereas in 2023 you could start facing north or south. |
||
| # INIT NEW FILTERS HERE | ||
|
|
||
| def system_state_transition(self, old: SystemState, updated: SystemState): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doesn't currently exist this year in the Core but this is probably a good argument for implementing it this year so leave this here for now, and if it breaks when building because this function doesn't exist/get called then nudge Dylan or I and we can add it |
||
| # reinit filters when robot changes modes | ||
| if old.state != SystemState.AUTONOMOUS and updated.state == SystemState.AUTONOMOUS: | ||
| self.onReset() | ||
|
|
||
| if old.state != SystemState.MANUAL and updated.state == SystemState.MANUAL: | ||
| self.onReset() | ||
| # or gets going | ||
| if old.mobility == False and updated.mobility == True: | ||
| self.onReset() | ||
|
|
||
| def onGPSReceived(self, msg: GPSFeedback): | ||
| if msg.gps_fix == 0 and msg.is_locked == False: | ||
|
dorukayhan marked this conversation as resolved.
|
||
| return | ||
|
|
||
| if self.first_gps is None: | ||
| self.first_gps = msg | ||
|
|
||
| self.last_gps = msg | ||
| # USE NEW FILTERS HERE | ||
| if self.config.filter_type == FilterType.PARTICLE_FILTER: | ||
| gps_x, gps_y = self.pf.gps(msg) | ||
| self.log(f"PF back from GPS ping w/ X={gps_x}, Y={gps_y}", LogLevel.DEBUG) | ||
| else: | ||
| self.log(f"{self.config.filter_type} isn't a valid FilterType! Did you implement a new filter and forget to use it?", LogLevel.FATAL) | ||
|
|
||
| def onMotorFeedbackReceived(self, msg: MotorFeedback): | ||
| averages = None | ||
| # USE NEW FILTERS HERE | ||
| if self.config.filter_type == FilterType.PARTICLE_FILTER: | ||
| avg_x, avg_y, avg_theta = self.pf.feedback(msg) | ||
| self.log(f"PF back from motor odom w/ X={avg_x}, Y={avg_y}, θ={avg_theta}", LogLevel.DEBUG) | ||
| else: | ||
| self.log(f"{self.config.filter_type} isn't a valid FilterType! Did you implement a new filter and forget to use it?", LogLevel.FATAL) | ||
|
|
||
| if averages is None: | ||
| return | ||
| # unpack what the filter just returned | ||
| position = Position() | ||
| position.x = averages[0] | ||
| position.y = averages[1] | ||
| position.theta = averages[2] | ||
|
dorukayhan marked this conversation as resolved.
|
||
|
|
||
| if self.first_gps is not None: | ||
| gps_x = self.first_gps.latitude + position.x / self.latitude_length | ||
| gps_y = self.first_gps.longitude - position.y / self.longitude_length | ||
| position.latitude = gps_x | ||
| position.longitude = gps_y | ||
|
|
||
| self.position_publisher.publish(position) | ||
|
|
||
| def main(): | ||
| rclpy.init() | ||
| node = FiltersNode() | ||
| rclpy.spin(node) | ||
| rclpy.shutdown() | ||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| ### particlefilter.py from last year ### | ||
| # TODO could all the for-each-particle math in this code be done ✨🚀faster🚀✨ with numpy than vanilla python? | ||
| from autonav_msgs.msg import MotorFeedback, GPSFeedback, Position | ||
| import numpy as np | ||
| import math | ||
| import random | ||
|
|
||
|
|
||
| class Particle: | ||
| def __init__(self, x=0, y=0, theta=0, weight=1) -> None: | ||
| self.x = x | ||
| self.y = y | ||
| self.theta = theta | ||
| self.weight = weight | ||
|
|
||
|
|
||
| class ParticleFilter: | ||
| def __init__(self, latitudeLength, longitudeLength) -> None: | ||
| self.num_particles = 750 | ||
| self.gps_noise = 0.45 | ||
| self.odom_noise = (0, 0, 0.1) | ||
| # TODO should these parameters be in a config file in autonav_odometry/resource? | ||
| self.init_particles() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. bit of a redundant call (as you call onReset() in init for FiltersNode line 34) but it's not that big of a deal.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. noise values should absolutely be in the config |
||
| # self.first_gps = None | ||
|
|
||
| self.latitude_length = latitudeLength | ||
| self.longitude_length = longitudeLength | ||
|
|
||
| def init_particles(self, seedHeading: float = 0.0, useSeedHeading: bool = False): | ||
| self.first_gps = None | ||
| if useSeedHeading: # particles generally point in an initial direction | ||
| self.particles = [Particle(0, 0, seedHeading + np.random.normal(0, 0.1)) for i in range(self.num_particles)] | ||
| else: # particles point literally everywhere | ||
| self.particles = [Particle(0, 0, i / self.num_particles * 2 * math.pi) for i in range(self.num_particles)] | ||
|
|
||
| def feedback(self, feedback: MotorFeedback) -> tuple[float, float, float]: | ||
| sum_x = 0 | ||
| sum_y = 0 | ||
| sum_theta_x = 0 | ||
| sum_theta_y = 0 | ||
| sum_weight = 0 | ||
|
|
||
| for particle in self.particles: | ||
| # update particles | ||
| # MotorFeedback gives robot-relative data (x is forward, y is left (???)) so we convert it to track-relative with trig magic | ||
| # TODO why are we scaling down delta_x and delta_theta again? | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that would be a Tony question but basically because magic number
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are fudge factors and they should probably go in the config. Last year I used 1.2 for example. Just depends on the robot/ simulator. |
||
| particle.x += feedback.delta_x * 0.95 * math.cos(particle.theta) + feedback.delta_y * math.sin(particle.theta) | ||
| particle.y += feedback.delta_x * 0.95 * math.sin(particle.theta) + feedback.delta_y * math.cos(particle.theta) | ||
| particle.theta += feedback.delta_theta * 0.8 | ||
| particle.theta = particle.theta % (2 * math.pi) | ||
| # weighted sum of particles | ||
| weight = particle.weight ** 2 | ||
| sum_x += particle.x * weight | ||
| sum_y += particle.y * weight | ||
| sum_theta_x += math.cos(particle.theta) * weight | ||
| sum_theta_y += math.sin(particle.theta) * weight | ||
| sum_weight += weight | ||
|
|
||
| if sum_weight < 0.000001: | ||
| sum_weight = 0.000001 | ||
|
|
||
| # now weighted average | ||
| avg_x = sum_x / sum_weight | ||
| avg_y = sum_y / sum_weight | ||
| avg_theta = math.atan2(sum_theta_y / sum_weight, sum_theta_x / sum_weight) % (2 * math.pi) | ||
|
|
||
| return (avg_x, avg_y, avg_theta) | ||
|
|
||
| def gps(self, gps: GPSFeedback) -> tuple[float, float]: | ||
| if self.first_gps is None: | ||
| self.first_gps = gps | ||
|
|
||
| # convert latlon to distance offset from first gps reading | ||
| gps_x = (gps.latitude - self.first_gps.latitude) * self.latitude_length | ||
| gps_y = (self.first_gps.longitude - gps.longitude) * self.longitude_length | ||
|
|
||
| # adjust particle weights - closer to gps reading is better | ||
| for particle in self.particles: | ||
| dist_sqrt = np.sqrt((particle.x - gps_x) ** 2 + (particle.y - gps_y) ** 2) # root mean square error | ||
| particle.weight = math.exp(-dist_sqrt / (2 * self.gps_noise ** 2)) # less accurate gps = weights decrease smoother further from reading | ||
| if particle.weight <= 0.00001 / len(self.particles): # prevent resample() from blowing up if bot teleports 100 km away | ||
| particle.weight = 0.00001 / len(self.particles) | ||
| self.resample() | ||
| return (gps_x, gps_y) | ||
|
|
||
| def resample(self) -> None: | ||
| weights = [particle.weight for particle in self.particles] | ||
| weights_sum = sum(weights) | ||
| if weights_sum <= 0.00001: | ||
| weights_sum = 0.00001 | ||
| weights = [weight / weights_sum for weight in weights] | ||
|
|
||
| new_particles = random.choices(self.particles, weights, k=self.num_particles) | ||
| self.particles = [] | ||
|
|
||
| for particle in new_particles: | ||
| rand_x = np.random.normal(0, self.odom_noise[0]) | ||
| rand_y = np.random.normal(0, self.odom_noise[1]) | ||
| x = particle.x + rand_x * math.cos(particle.theta) + rand_y * math.sin(particle.theta) | ||
| y = particle.y + rand_x * math.sin(particle.theta) + rand_y * math.cos(particle.theta) | ||
| theta = np.random.normal(particle.theta, self.odom_noise[2]) % (2 * math.pi) | ||
| self.particles.append(Particle(x, y, theta, particle.weight)) | ||
Uh oh!
There was an error while loading. Please reload this page.