Skip to content

Commit a17ad7b

Browse files
author
Cornelius Krupp
committed
Add joint_states publisher.
1 parent 2107064 commit a17ad7b

3 files changed

Lines changed: 128 additions & 89 deletions

File tree

src/bitbots_lowlevel/bitbots_x02_joint_control/src/example_server.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ impl ArmService for DummyArmService {
4141
Ok(Response::new(Empty::default()))
4242
}
4343

44-
async fn set_arm_command_stream(&self, mut request: Request<tonic::Streaming<DroidCommandRequest>>) -> Result<Response<Empty>, Status> {
44+
async fn set_arm_command_stream(&self, request: Request<tonic::Streaming<DroidCommandRequest>>) -> Result<Response<Empty>, Status> {
4545
let mut stream = request.into_inner();
4646
while let Some(_cmd) = stream.next().await {
4747
// Echo/Process logic here
@@ -101,7 +101,7 @@ impl LegService for DummyLegService {
101101
Ok(Response::new(Empty::default()))
102102
}
103103

104-
async fn set_leg_command_stream(&self, mut request: Request<tonic::Streaming<DroidCommandRequest>>) -> Result<Response<Empty>, Status> {
104+
async fn set_leg_command_stream(&self, request: Request<tonic::Streaming<DroidCommandRequest>>) -> Result<Response<Empty>, Status> {
105105
let mut stream = request.into_inner();
106106
while let Some(Ok(_cmd)) = stream.next().await {
107107
// Echo logic could be added here for processing
@@ -138,8 +138,8 @@ impl LegService for DummyLegService {
138138
#[tokio::main]
139139
async fn main() -> Result<(), Box<dyn std::error::Error>> {
140140
// 1. Define the addresses
141-
let leg_addr = "[::1]:50051".parse()?;
142-
let arm_addr = "[::1]:50052".parse()?;
141+
let leg_addr = "127.0.0.1:50051".parse()?;
142+
let arm_addr = "127.0.0.1:50052".parse()?;
143143

144144
println!("LegService starting on {}", leg_addr);
145145
println!("ArmService starting on {}", arm_addr);
Lines changed: 123 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,101 +1,140 @@
1-
use std::sync::{Arc, Mutex as StdMutex};
2-
use tokio::sync::Mutex;
3-
4-
use futures::StreamExt;
5-
use r2r::bitbots_msgs::msg::Workload;
6-
use r2r::bitbots_msgs::srv::Leds;
7-
use r2r::std_msgs::msg::Empty;
8-
use r2r::RosParams;
9-
10-
#[derive(RosParams, Default, Debug)]
11-
struct Params {
12-
/// Parameter description (Yes this comment will appear in the parameter description)
13-
par1: f64,
14-
/// Dummy parameter [m/s]
15-
par2: i32,
16-
nested: NestedParams,
1+
use r2r;
2+
use std::collections::HashMap;
3+
use std::env;
4+
use std::sync::{Arc, Mutex};
5+
use std::time::Duration;
6+
use tokio::time::interval;
7+
8+
pub mod droidgrpc {
9+
tonic::include_proto!("droidgrpc");
1710
}
1811

19-
#[derive(RosParams, Default, Debug)]
20-
struct NestedParams {
21-
par3: String,
22-
par4: u16,
12+
use droidgrpc::{Empty};
13+
// Note: Ensure these matches your generated code exactly
14+
use droidgrpc::arm_service_client::ArmServiceClient;
15+
use droidgrpc::leg_service_client::LegServiceClient;
16+
17+
// This struct holds the latest known positions for all joints
18+
#[derive(Default)]
19+
struct RobotState {
20+
// Key: ROS joint name, Value: position
21+
joint_data: HashMap<String, f64>,
2322
}
2423

2524
#[tokio::main]
2625
async fn main() -> Result<(), Box<dyn std::error::Error>> {
27-
let ctx = r2r::Context::create()?;
26+
let args: Vec<String> = env::args().collect();
27+
let ip = args.get(1).map(|s| s.as_str()).unwrap_or("127.0.0.1");
2828

29-
// Create a node
30-
let mut node = r2r::Node::create(ctx, "test_node", "")?;
31-
32-
// Get a logger name for easier logging
33-
let nl = node.logger().to_string();
34-
35-
// Create subscribers, publishers, services, timers, etc.
36-
let mut sub = node
37-
.subscribe::<Empty>("/hello_world", r2r::QosProfile::default())?
38-
.fuse();
39-
let mut timer = node.create_timer(std::time::Duration::from_secs_f32(0.05))?;
40-
let mut timer1 = node.create_timer(std::time::Duration::from_secs_f32(0.1))?;
41-
let publ = node.create_publisher::<Empty>("/out", r2r::QosProfile::default())?;
42-
let mut srv = node
43-
.create_service::<Leds::Service>("/led", r2r::QosProfile::default())?
44-
.fuse();
45-
46-
// Parameter handling
47-
let params = Arc::new(StdMutex::new(Params::default()));
48-
let (paramater_handler, _) = node.make_derived_parameter_handler(params.clone())?;
49-
tokio::task::spawn(paramater_handler);
50-
51-
println!(
52-
"Sim time is set to: {}",
53-
node.get_parameter::<bool>("use_sim_time").unwrap()
54-
);
55-
56-
// Share the node between tasks / threads
29+
let ctx = r2r::Context::create()?;
30+
let mut node = r2r::Node::create(ctx, "x02_joint_control", "")?;
31+
let state_pub = node.create_publisher::<r2r::sensor_msgs::msg::JointState>("/joint_states", r2r::QosProfile::default())?;
5732
let node = Arc::new(Mutex::new(node));
58-
let spin_node = node.clone();
5933

60-
tokio::task::spawn(async move {
61-
// Define a thread-local state
62-
let mut state = Workload::default();
34+
// --- 1. Mapping & Virtual Joints ---
35+
let mut name_map = HashMap::new();
36+
let mappings = [
37+
("muT", "LHipYaw"), ("msL", "LHipRoll"), ("mhL", "LHipPitch"), ("mkL", "LKnee"), ("maL", "LAnklePitch"),
38+
("mbT", "RHipYaw"), ("msR", "RHipRoll"), ("mhR", "RHipPitch"), ("mkR", "RKnee"), ("maR", "RAnklePitch"),
39+
("UL1", "LShoulderPitch"), ("UL2", "LShoulderRoll"), ("UL3", "LShoulderYaw"), ("UL4", "LElbow"),
40+
("UR1", "RShoulderPitch"), ("UR2", "RShoulderRoll"), ("UR3", "RShoulderYaw"), ("UR4", "RElbow"),
41+
];
42+
for (hw, ros) in mappings { name_map.insert(hw.to_string(), ros.to_string()); }
43+
44+
let virtual_joints = [];
45+
46+
// --- 2. Shared State Initialization ---
47+
let mut initial_state = RobotState::default();
48+
mappings.iter().map(|(_, ros)| ros).chain(virtual_joints.iter())
49+
.for_each(|&name| { initial_state.joint_data.insert(name.to_string(), 0.0); });
50+
let shared_state = Arc::new(Mutex::new(initial_state));
51+
52+
// --- 3. gRPC Setup & Config ---
53+
let mut leg_client = LegServiceClient::connect(format!("http://{}:50051", ip)).await?;
54+
let mut arm_client = ArmServiceClient::connect(format!("http://{}:50052", ip)).await?;
55+
56+
let leg_joint_names = leg_client.get_leg_config(Empty {}).await?.into_inner().joint_name;
57+
let arm_joint_names = arm_client.get_arm_config(Empty {}).await?.into_inner().joint_name;
58+
59+
// --- 4. Writer Tasks: Update shared state when data arrives ---
60+
61+
// Leg Poller
62+
let mut l_client = leg_client.clone();
63+
let l_state = shared_state.clone();
64+
let l_names = leg_joint_names.clone();
65+
let l_map = name_map.clone();
66+
tokio::spawn(async move {
67+
let mut ticker = interval(Duration::from_millis(20)); // Poll at 50Hz
68+
loop {
69+
ticker.tick().await;
70+
if let Ok(resp) = l_client.get_leg_state(Empty {}).await {
71+
let positions = resp.into_inner().position;
72+
let mut state = l_state.lock().unwrap();
73+
for (i, hw_id) in l_names.iter().enumerate() {
74+
if let Some(ros_name) = l_map.get(hw_id) {
75+
state.joint_data.insert(ros_name.clone(), positions.get(i).cloned().unwrap_or(0.0) as f64);
76+
}
77+
}
78+
}
79+
}
80+
});
81+
82+
// Arm Poller
83+
let mut a_client = arm_client.clone();
84+
let a_state = shared_state.clone();
85+
let a_names = arm_joint_names.clone();
86+
let a_map = name_map.clone();
87+
tokio::spawn(async move {
88+
let mut ticker = interval(Duration::from_millis(20));
89+
loop {
90+
ticker.tick().await;
91+
if let Ok(resp) = a_client.get_arm_state(Empty {}).await {
92+
let positions = resp.into_inner().position;
93+
let mut state = a_state.lock().unwrap();
94+
for (i, hw_id) in a_names.iter().enumerate() {
95+
if let Some(ros_name) = a_map.get(hw_id) {
96+
state.joint_data.insert(ros_name.clone(), positions.get(i).cloned().unwrap_or(0.0) as f64);
97+
}
98+
}
99+
}
100+
}
101+
});
63102

64-
// Handle events
103+
// --- 5. Reader Task: Publish at Fixed ROS Rate ---
104+
let pub_state = shared_state.clone();
105+
tokio::spawn(async move {
106+
let mut ticker = interval(Duration::from_millis(10)); // Publish at 100Hz
65107
loop {
66-
tokio::select! {
67-
req = srv.next() => {
68-
let req = req.unwrap();
69-
r2r::log_info!(&nl, "leds: {:?}", req.message);
70-
// Flush the stdout buffer to ensure the print appears in the logs
71-
let resp = Leds::Response::default();
72-
req.respond(resp).unwrap();
73-
},
74-
msg = sub.next() => {
75-
r2r::log_info!(&nl, "message: {:?}", msg);
76-
// Flush the stdout buffer to ensure the print appears in the logs
77-
state.memory_used += 1;
78-
},
79-
_ = timer.tick() => {
80-
publ.publish(&Empty {}).inspect_err(|e| {
81-
r2r::log_error!(&nl, "Error publishing message: {}", e);
82-
}).ok();
83-
r2r::log_info!(&nl, "timer tick {}", state.memory_used);
84-
},
85-
_ = timer1.tick() => {
86-
let my_param = node.lock().await.get_parameter::<f64>("par1").unwrap();
87-
r2r::log_info!(&nl, "Parameter par1: {}", my_param);
88-
r2r::log_info!(&nl, "timer1 tick {}", state.memory_used);
89-
},
108+
ticker.tick().await;
109+
let mut ros_msg = r2r::sensor_msgs::msg::JointState::default();
110+
111+
// Lock briefly to copy data
112+
{
113+
let state = pub_state.lock().unwrap();
114+
for (name, pos) in &state.joint_data {
115+
ros_msg.name.push(name.clone());
116+
ros_msg.position.push(*pos);
117+
}
118+
}
119+
120+
// Always add virtual joints (ensures consistency even before 1st gRPC response)
121+
for v_joint in virtual_joints {
122+
if !ros_msg.name.contains(&v_joint.to_string()) {
123+
ros_msg.name.push(v_joint.to_string());
124+
ros_msg.position.push(0.0);
125+
}
126+
}
127+
128+
if !ros_msg.name.is_empty() {
129+
let _ = state_pub.publish(&ros_msg);
90130
}
91131
}
92132
});
93133

94-
// Spin the underlying rcl node object
134+
let spin_node = node.clone();
135+
136+
// --- 6. ROS Spin ---
95137
loop {
96-
spin_node
97-
.lock()
98-
.await
99-
.spin_once(std::time::Duration::from_millis(100));
138+
spin_node.lock().expect("Node died").spin_once(std::time::Duration::from_millis(100));
100139
}
101-
}
140+
}

src/bitbots_lowlevel/bitbots_x02_joint_control/src/test_proto_comm.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub mod droidgrpc {
1111
async fn main() -> Result<(), Box<dyn std::error::Error>> {
1212
// 1. Get IP from CLI argument or default to localhost
1313
let args: Vec<String> = env::args().collect();
14-
let ip_str = args.get(1).map(|s| s.as_str()).unwrap_or("[::1]");
14+
let ip_str = args.get(1).map(|s| s.as_str()).unwrap_or("127.0.0.1");
1515

1616
// Parse the string into an IpAddr to ensure it's a valid IPv4/IPv6
1717
let leg_uri = format!("http://{}:50051", ip_str);

0 commit comments

Comments
 (0)