Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions autonav_ws/src/autonav_odometry/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
cmake_minimum_required(VERSION 3.8)
project(autonav_odometry)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# find dependencies
find_package(ament_cmake_python REQUIRED)
find_package(rclpy REQUIRED)
find_package(autonav_shared REQUIRED)
Comment thread
dorukayhan marked this conversation as resolved.

# Install Python modules
ament_python_install_package(${PROJECT_NAME})

# Install Python programs
install(PROGRAMS
# enumerate every file in src by hand here like a bozo because globs don't just work i guess
src/main.py
src/particlefilter.py

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

particlefilter.py just creates a class that the node uses. It doesn't need to be enumerated here since it's not a node.

DESTINATION lib/${PROJECT_NAME}
)

ament_package()
Empty file.
18 changes: 18 additions & 0 deletions autonav_ws/src/autonav_odometry/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>autonav_odometry</name>
<version>0.1.0</version>
Comment thread
dorukayhan marked this conversation as resolved.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2025.x.y, follow example in autonav_example_py

<description>Thingies to track the robot's position</description>
<maintainer email="dorukayhan@gmail.com">Doruk Ayhan</maintainer>
<license>MIT</license>

<buildtool_depend>ament_cmake_python</buildtool_depend>

<depend>rclpy</depend>
<depend>autonav_shared</depend>
Comment thread
dorukayhan marked this conversation as resolved.

<export>
<build_type>ament_cmake</build_type>
</export>
</package>
Empty file.
113 changes: 113 additions & 0 deletions autonav_ws/src/autonav_odometry/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#!/usr/bin/env python3
# filters.py from last year for the most part

import json
from types import SimpleNamespace
from autonav_msgs.msg import MotorFeedback, GPSFeedback, Position
from autonav_shared.types import DeviceState, SystemState, LogLevel # i am goig to beat pylance with a rock
Comment thread
dorukayhan marked this conversation as resolved.
from particlefilter import ParticleFilter
# from scr_msgs.msg import SystemState
Comment thread
dorukayhan marked this conversation as resolved.
from autonav_shared.node import Node
from enum import IntEnum
import rclpy

class FilterType(IntEnum):
PARTICLE_FILTER = 1
# ADD NEW FILTERS HERE

class FiltersNodeConfig:
def __init__(self):
self.filter_type = FilterType.PARTICLE_FILTER

class FiltersNode(Node):
def __init__(self):
super().__init__("autonav_odometry")
# init gps and position
self.first_gps = None
self.last_gps = None
self.latitude_length = self.declare_parameter("latitude_length", 111086.2).get_parameter_value().double_value
self.longitude_length = self.declare_parameter("longitude_length", 81978.2).get_parameter_value().double_value

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be more of an in-person conversation (maybe probably with Tony) but I think lat and long length should be part of the config instead of being launch file parameters.

# init filters
# IF WE IMPLEMENT MORE FILTERS ADD THEM HERE (AND ANYWHERE THAT COMES UP WHEN YOU CTRL-F "NEW FILTERS")!
self.pf = ParticleFilter(self.latitude_length, self.longitude_length)
self.config = FiltersNodeConfig()
self.onReset()

def config_updated(self, jsonObject):
Comment thread
dorukayhan marked this conversation as resolved.
self.config = json.loads(self.jdump(jsonObject), object_hook=lambda d: SimpleNamespace(**d))
# reinit filters after getting new config
self.onReset()

def init(self):
# [melee announcer voice] ready...
self.create_subscription(GPSFeedback, "/autonav/gps", self.onGPSReceived, 20)
self.create_subscription(MotorFeedback, "/autonav/MotorFeedback", self.onMotorFeedbackReceived, 20)
Comment thread
dorukayhan marked this conversation as resolved.
self.position_publisher = self.create_publisher(Position, "/autonav/position", 20)
# go!
self.set_device_state(DeviceState.OPERATING)

def onReset(self):
self.first_gps = None
self.last_gps = None
self.pf.init_particles()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use the seed heading parameter of init_particles, or at least make whether we use it or not controlled by the config. this would've been handy last year, as we only ever started facing one direction, whereas in 2023 you could start facing north or south.

# INIT NEW FILTERS HERE

def system_state_transition(self, old: SystemState, updated: SystemState):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't currently exist this year in the Core but this is probably a good argument for implementing it this year so leave this here for now, and if it breaks when building because this function doesn't exist/get called then nudge Dylan or I and we can add it

# reinit filters when robot changes modes
if old.state != SystemState.AUTONOMOUS and updated.state == SystemState.AUTONOMOUS:
self.onReset()

if old.state != SystemState.MANUAL and updated.state == SystemState.MANUAL:
self.onReset()
# or gets going
if old.mobility == False and updated.mobility == True:
self.onReset()

def onGPSReceived(self, msg: GPSFeedback):
if msg.gps_fix == 0 and msg.is_locked == False:
Comment thread
dorukayhan marked this conversation as resolved.
return

if self.first_gps is None:
self.first_gps = msg

self.last_gps = msg
# USE NEW FILTERS HERE
if self.config.filter_type == FilterType.PARTICLE_FILTER:
gps_x, gps_y = self.pf.gps(msg)
self.log(f"PF back from GPS ping w/ X={gps_x}, Y={gps_y}", LogLevel.DEBUG)
else:
self.log(f"{self.config.filter_type} isn't a valid FilterType! Did you implement a new filter and forget to use it?", LogLevel.FATAL)

def onMotorFeedbackReceived(self, msg: MotorFeedback):
averages = None
# USE NEW FILTERS HERE
if self.config.filter_type == FilterType.PARTICLE_FILTER:
avg_x, avg_y, avg_theta = self.pf.feedback(msg)
self.log(f"PF back from motor odom w/ X={avg_x}, Y={avg_y}, θ={avg_theta}", LogLevel.DEBUG)
else:
self.log(f"{self.config.filter_type} isn't a valid FilterType! Did you implement a new filter and forget to use it?", LogLevel.FATAL)

if averages is None:
return
# unpack what the filter just returned
position = Position()
position.x = averages[0]
position.y = averages[1]
position.theta = averages[2]
Comment thread
dorukayhan marked this conversation as resolved.

if self.first_gps is not None:
gps_x = self.first_gps.latitude + position.x / self.latitude_length
gps_y = self.first_gps.longitude - position.y / self.longitude_length
position.latitude = gps_x
position.longitude = gps_y

self.position_publisher.publish(position)

def main():
rclpy.init()
node = FiltersNode()
rclpy.spin(node)
rclpy.shutdown()

if __name__ == "__main__":
main()
102 changes: 102 additions & 0 deletions autonav_ws/src/autonav_odometry/src/particlefilter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
### particlefilter.py from last year ###
# TODO could all the for-each-particle math in this code be done ✨🚀faster🚀✨ with numpy than vanilla python?
from autonav_msgs.msg import MotorFeedback, GPSFeedback, Position
import numpy as np
import math
import random


class Particle:
def __init__(self, x=0, y=0, theta=0, weight=1) -> None:
self.x = x
self.y = y
self.theta = theta
self.weight = weight


class ParticleFilter:
def __init__(self, latitudeLength, longitudeLength) -> None:
self.num_particles = 750
self.gps_noise = 0.45
self.odom_noise = (0, 0, 0.1)
# TODO should these parameters be in a config file in autonav_odometry/resource?
self.init_particles()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bit of a redundant call (as you call onReset() in init for FiltersNode line 34) but it's not that big of a deal.
what's a bigger deal is using the seed heading, which I think. . . actually, nevermind, because init_particles is called in FiltersNode after initialization when we would actually be able to send it a seed heading, so that's a problem for FiltersNode not ParticleFilter

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noise values should absolutely be in the config

# self.first_gps = None

self.latitude_length = latitudeLength
self.longitude_length = longitudeLength

def init_particles(self, seedHeading: float = 0.0, useSeedHeading: bool = False):
self.first_gps = None
if useSeedHeading: # particles generally point in an initial direction
self.particles = [Particle(0, 0, seedHeading + np.random.normal(0, 0.1)) for i in range(self.num_particles)]
else: # particles point literally everywhere
self.particles = [Particle(0, 0, i / self.num_particles * 2 * math.pi) for i in range(self.num_particles)]

def feedback(self, feedback: MotorFeedback) -> tuple[float, float, float]:
sum_x = 0
sum_y = 0
sum_theta_x = 0
sum_theta_y = 0
sum_weight = 0

for particle in self.particles:
# update particles
# MotorFeedback gives robot-relative data (x is forward, y is left (???)) so we convert it to track-relative with trig magic
# TODO why are we scaling down delta_x and delta_theta again?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would be a Tony question but basically because magic number
we might not need it with the swerve drive (or we might need a different magic number), that remains to be seen

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are fudge factors and they should probably go in the config. Last year I used 1.2 for example. Just depends on the robot/ simulator.

particle.x += feedback.delta_x * 0.95 * math.cos(particle.theta) + feedback.delta_y * math.sin(particle.theta)
particle.y += feedback.delta_x * 0.95 * math.sin(particle.theta) + feedback.delta_y * math.cos(particle.theta)
particle.theta += feedback.delta_theta * 0.8
particle.theta = particle.theta % (2 * math.pi)
# weighted sum of particles
weight = particle.weight ** 2
sum_x += particle.x * weight
sum_y += particle.y * weight
sum_theta_x += math.cos(particle.theta) * weight
sum_theta_y += math.sin(particle.theta) * weight
sum_weight += weight

if sum_weight < 0.000001:
sum_weight = 0.000001

# now weighted average
avg_x = sum_x / sum_weight
avg_y = sum_y / sum_weight
avg_theta = math.atan2(sum_theta_y / sum_weight, sum_theta_x / sum_weight) % (2 * math.pi)

return (avg_x, avg_y, avg_theta)

def gps(self, gps: GPSFeedback) -> tuple[float, float]:
if self.first_gps is None:
self.first_gps = gps

# convert latlon to distance offset from first gps reading
gps_x = (gps.latitude - self.first_gps.latitude) * self.latitude_length
gps_y = (self.first_gps.longitude - gps.longitude) * self.longitude_length

# adjust particle weights - closer to gps reading is better
for particle in self.particles:
dist_sqrt = np.sqrt((particle.x - gps_x) ** 2 + (particle.y - gps_y) ** 2) # root mean square error
particle.weight = math.exp(-dist_sqrt / (2 * self.gps_noise ** 2)) # less accurate gps = weights decrease smoother further from reading
if particle.weight <= 0.00001 / len(self.particles): # prevent resample() from blowing up if bot teleports 100 km away
particle.weight = 0.00001 / len(self.particles)
self.resample()
return (gps_x, gps_y)

def resample(self) -> None:
weights = [particle.weight for particle in self.particles]
weights_sum = sum(weights)
if weights_sum <= 0.00001:
weights_sum = 0.00001
weights = [weight / weights_sum for weight in weights]

new_particles = random.choices(self.particles, weights, k=self.num_particles)
self.particles = []

for particle in new_particles:
rand_x = np.random.normal(0, self.odom_noise[0])
rand_y = np.random.normal(0, self.odom_noise[1])
x = particle.x + rand_x * math.cos(particle.theta) + rand_y * math.sin(particle.theta)
y = particle.y + rand_x * math.sin(particle.theta) + rand_y * math.cos(particle.theta)
theta = np.random.normal(particle.theta, self.odom_noise[2]) % (2 * math.pi)
self.particles.append(Particle(x, y, theta, particle.weight))