11#!/usr/bin/env python3
2- import rclpy
3- from rclpy .executors import MultiThreadedExecutor
4- from rclpy .node import Node
5- import threading
6- import pdb
2+ from collections import deque
73import os
8- import time
4+ import queue
95import signal
10- import yaml
116import subprocess
127import sys
13- import queue
8+ import threading
9+ import time
10+
11+ import rclpy
12+ from rclpy .executors import MultiThreadedExecutor
13+ from rclpy .node import Node
1414import std_msgs .msg
15- from collections import deque
15+ import yaml
1616
1717ON_POSIX = 'posix' in sys .builtin_module_names
1818
19+
1920def ping (host ):
20- command = [" ping" , "-c" , "1" , host ]
21+ command = [' ping' , '-c' , '1' , host ]
2122 try :
2223 result = subprocess .run (command , stdout = subprocess .DEVNULL ,
2324 stderr = subprocess .DEVNULL )
2425 return result .returncode == 0
2526 except Exception as e :
26- print (f" Error pinging { host } : { e } " )
27+ print (f' Error pinging { host } : { e } ' )
2728 return False
2829
30+
2931class PeerPublisher ():
3032 # QUEUE_LENGTH is used to filter repeated RSSI
3133 QUEUE_LENGTH = 3
@@ -44,8 +46,9 @@ def __init__(self, target_name, ros_node):
4446 self .repeated_counter = 0
4547
4648 # Create a publisher for the node
47- topic_name = f"mocha/rajant/rssi/{ self .target_name } "
48- self .ros_node .get_logger ().info (f"{ ros_node .this_robot } - Rajant Peer RSSI - Topic /{ topic_name } created" )
49+ topic_name = f'mocha/rajant/rssi/{ self .target_name } '
50+ self .ros_node .get_logger ().info (
51+ f'{ ros_node .this_robot } - Rajant Peer RSSI - Topic /{ topic_name } created' )
4952 self .pub = self .ros_node .create_publisher (std_msgs .msg .Int32 ,
5053 topic_name , 10 )
5154
@@ -57,18 +60,26 @@ def filter_rssi(self, ts, rssi):
5760 def publish_all (self , ts ):
5861 # Skip if we did not collect info for this node in this session
5962 if self .last_registered_timestamp is None :
60- return
63+ return
6164 # Check that current end timestamp and msg timestamps agree
6265 if ts != self .last_registered_timestamp :
63- self .ros_node .get_logger ().error (f"{ self .ros_node .this_robot } - Rajant Peer RSSI - Timestamp of end message different than last registered timestamp in publish_all" )
66+ self .ros_node .get_logger ().error (
67+ f'{ self .ros_node .this_robot } - Rajant Peer RSSI'
68+ ' - Timestamp of end message different than'
69+ ' last registered timestamp in publish_all' )
6470 return
6571 # Verify that all the timestamps are the same for all the radios
6672 all_ts = [i [0 ] for i in self .rssi_ts ]
6773 if not len (all_ts ):
68- self .ros_node .get_logger ().error (f"{ self .ros_node .this_robot } - Rajant Peer RSSI - Empty list of timestamps in publish_all." )
74+ self .ros_node .get_logger ().error (
75+ f'{ self .ros_node .this_robot } - Rajant Peer RSSI'
76+ ' - Empty list of timestamps in publish_all.' )
6977 return
7078 if len (set (all_ts )) != 1 :
71- self .ros_node .get_logger ().error (f"{ self .ros_node .this_robot } - Rajant Peer RSSI - Multiple different timestamps for the same group in publish_all." )
79+ self .ros_node .get_logger ().error (
80+ f'{ self .ros_node .this_robot } - Rajant Peer RSSI'
81+ ' - Multiple different timestamps for the'
82+ ' same group in publish_all.' )
7283 return
7384
7485 # Find out the largest RSSI and sum of RSSI
@@ -96,13 +107,18 @@ def publish_all(self, ts):
96107 #
97108 # print(self.published_rssi_queue, set(self.published_rssi_queue))
98109 if len (set (self .published_rssi_queue )) == 1 and \
99- len (self .published_rssi_queue ) == self .QUEUE_LENGTH :
110+ len (self .published_rssi_queue ) == self .QUEUE_LENGTH :
100111 if self .repeated_counter < 4 :
101112 self .repeated_counter += 1
102- self .ros_node .get_logger ().debug (f"{ self .ros_node .this_robot } - Rajant Peer RSSI - Repeated RSSI for { self .target_name } for the last { self .QUEUE_LENGTH * 3 } seconds. Throttling counter { self .repeated_counter } " )
113+ self .ros_node .get_logger ().debug (
114+ f'{ self .ros_node .this_robot } - Rajant Peer'
115+ f' RSSI - Repeated RSSI for { self .target_name } '
116+ f' for the last { self .QUEUE_LENGTH * 3 } seconds.'
117+ f' Throttling counter { self .repeated_counter } ' )
103118 return
104119
105- self .ros_node .get_logger ().debug (f"{ self .ros_node .this_robot } - Rajant Peer RSSI - Publishing { self .target_name } " )
120+ self .ros_node .get_logger ().debug (
121+ f'{ self .ros_node .this_robot } - Rajant Peer RSSI - Publishing { self .target_name } ' )
106122 self .repeated_counter = 0
107123 msg = std_msgs .msg .Int32 ()
108124 msg .data = max_rssi
@@ -111,7 +127,7 @@ def publish_all(self, ts):
111127
112128class RajantPeerRSSI (Node ):
113129 def __init__ (self ):
114- super ().__init__ (" rajant_peer_rssi" )
130+ super ().__init__ (' rajant_peer_rssi' )
115131 self .logger = self .get_logger ()
116132
117133 # Handle shutdown signal
@@ -120,78 +136,89 @@ def __init__(self):
120136
121137 def signal_handler (sig , frame ):
122138 if not self .shutdownTriggered .is_set ():
123- self .logger .warning (f"{ self .this_robot } - Rajant Peer RSSI - Got SIGINT. Triggering shutdown." )
139+ self .logger .warning (
140+ f'{ self .this_robot } - Rajant Peer RSSI - Got SIGINT. Triggering shutdown.' )
124141 self .shutdown ()
125142 signal .signal (signal .SIGINT , signal_handler )
126143
127144 # Declare parameters
128- self .declare_parameter (" robot_name" , "" )
129- self .declare_parameter (" robot_configs" , "" )
130- self .declare_parameter (" radio_configs" , "" )
131- self .declare_parameter (" bcapi_jar_file" , "" )
145+ self .declare_parameter (' robot_name' , '' )
146+ self .declare_parameter (' robot_configs' , '' )
147+ self .declare_parameter (' radio_configs' , '' )
148+ self .declare_parameter (' bcapi_jar_file' , '' )
132149
133- self .this_robot = self .get_parameter (" robot_name" ).get_parameter_value ().string_value
150+ self .this_robot = self .get_parameter (' robot_name' ).get_parameter_value ().string_value
134151
135152 if len (self .this_robot ) == 0 :
136- self .logger .error (f" { self .this_robot } - Rajant Peer RSSI - Empty robot name" )
137- raise ValueError (" Empty robot name" )
153+ self .logger .error (f' { self .this_robot } - Rajant Peer RSSI - Empty robot name' )
154+ raise ValueError (' Empty robot name' )
138155
139156 # Load and check robot configs
140- self .robot_configs_file = self .get_parameter ("robot_configs" ).get_parameter_value ().string_value
157+ self .robot_configs_file = self .get_parameter (
158+ 'robot_configs' ).get_parameter_value ().string_value
141159 try :
142- with open (self .robot_configs_file , "r" ) as f :
160+ with open (self .robot_configs_file , 'r' ) as f :
143161 self .robot_configs = yaml .load (f , Loader = yaml .FullLoader )
144162 except Exception as e :
145- self .logger .error (f" { self .this_robot } - Rajant Peer RSSI - robot_configs file" )
163+ self .logger .error (f' { self .this_robot } - Rajant Peer RSSI - robot_configs file' )
146164 raise e
147165 if self .this_robot not in self .robot_configs .keys ():
148- self .logger .error (f" { self .this_robot } - Rajant Peer RSSI - robot_configs file" )
149- raise ValueError (" Robot not in config file" )
166+ self .logger .error (f' { self .this_robot } - Rajant Peer RSSI - robot_configs file' )
167+ raise ValueError (' Robot not in config file' )
150168
151169 # Load and check radio configs
152- self .radio_configs_file = self .get_parameter ("radio_configs" ).get_parameter_value ().string_value
170+ self .radio_configs_file = self .get_parameter (
171+ 'radio_configs' ).get_parameter_value ().string_value
153172 try :
154- with open (self .radio_configs_file , "r" ) as f :
173+ with open (self .radio_configs_file , 'r' ) as f :
155174 self .radio_configs = yaml .load (f , Loader = yaml .FullLoader )
156175 except Exception as e :
157- self .logger .error (f" { self .this_robot } - Rajant Peer RSSI - radio_configs file" )
176+ self .logger .error (f' { self .this_robot } - Rajant Peer RSSI - radio_configs file' )
158177 raise e
159- self .radio = self .robot_configs [self .this_robot ][" using-radio" ]
178+ self .radio = self .robot_configs [self .this_robot ][' using-radio' ]
160179 if self .radio not in self .radio_configs .keys ():
161- self .logger .error (f" { self .this_robot } - Rajant Peer RSSI - radio_configs file" )
162- raise ValueError (" Radio {self.radio} not in config file" )
180+ self .logger .error (f' { self .this_robot } - Rajant Peer RSSI - radio_configs file' )
181+ raise ValueError (' Radio {self.radio} not in config file' )
163182
164183 # Get the location of the jar file for bcapi
165- self .bcapi_jar_file = self .get_parameter ("bcapi_jar_file" ).get_parameter_value ().string_value
184+ self .bcapi_jar_file = self .get_parameter (
185+ 'bcapi_jar_file' ).get_parameter_value ().string_value
166186 if not (os .path .isfile (self .bcapi_jar_file ) and
167187 self .bcapi_jar_file .endswith ('.jar' )):
168- self .get_logger ().error (f"{ self .this_robot } - Rajant Peer RSSI - Erroneous BCAPI jar file" )
169- raise ValueError (f"Erroneous BCAPI file" )
188+ self .get_logger ().error (
189+ f'{ self .this_robot } - Rajant Peer RSSI - Erroneous BCAPI jar file' )
190+ raise ValueError ('Erroneous BCAPI file' )
170191
171192 # Get the target ip for the local rajant
172193 rajant_name = self .robot_configs [self .this_robot ]['using-radio' ]
173194 if rajant_name in self .radio_configs .keys ():
174195 self .target_ip = self .radio_configs [rajant_name ]['computed-IP-address' ]
175196 else :
176- self .get_logger ().error (f"{ self .this_robot } - Rajant Peer RSSI - Radio { rajant_name } for robot { self .this_robot } not found in configs" )
177- raise ValueError (f"Radio { rajant_name } for robot { self .this_robot } not found in configs" )
197+ self .get_logger ().error (
198+ f'{ self .this_robot } - Rajant Peer RSSI'
199+ f' - Radio { rajant_name } for robot'
200+ f' { self .this_robot } not found in configs' )
201+ raise ValueError (
202+ f'Radio { rajant_name } for robot'
203+ f' { self .this_robot } not found in configs' )
178204
179205 # Ping the local rajant
180206 if not ping (self .target_ip ):
181- self .get_logger ().error (f"{ self .this_robot } - Rajant Peer RSSI - Failed to ping { self .target_ip } " )
182- raise ValueError (f"Failed to ping { self .target_ip } " )
207+ self .get_logger ().error (
208+ f'{ self .this_robot } - Rajant Peer RSSI - Failed to ping { self .target_ip } ' )
209+ raise ValueError (f'Failed to ping { self .target_ip } ' )
183210
184211 # Create the publishers for the peers
185212 self .peers = {}
186- for peer in self .robot_configs [self .this_robot ][" clients" ]:
213+ for peer in self .robot_configs [self .this_robot ][' clients' ]:
187214 # Index peer by computed IP
188- peer_radio = self .robot_configs [peer ][" using-radio" ]
189- computed_ip = self .radio_configs [peer_radio ][" computed-IP-address" ]
215+ peer_radio = self .robot_configs [peer ][' using-radio' ]
216+ computed_ip = self .radio_configs [peer_radio ][' computed-IP-address' ]
190217 self .peers [computed_ip ] = PeerPublisher (peer , self )
191218
192219 # Invert radio lookup
193- self .ip_to_radio = {self .radio_configs [radio ][" computed-IP-address" ]: radio
194- for radio in self .radio_configs }
220+ self .ip_to_radio = {self .radio_configs [radio ][' computed-IP-address' ]: radio
221+ for radio in self .radio_configs }
195222
196223 # Start the java program and thread to read output
197224 self .start_java_process_and_queue ()
@@ -215,8 +242,7 @@ def shutdown(self):
215242 self .enqueue_thread .join ()
216243
217244 def enqueue_output (self ):
218- """ Saves the output of the process in a queue to be parsed
219- afterwards """
245+ """Save the output of the process in a queue to be parsed afterwards."""
220246 for line in self .java_process .stdout :
221247 self .process_queue .put (line )
222248 # If the java process dies, we will reach the end of the thread
@@ -227,12 +253,16 @@ def process_output(self):
227253 while not self .process_thread_shutdown .is_set ():
228254 if self .enqueue_thread is not None and not self .enqueue_thread .is_alive ():
229255 time .sleep (1 )
230- self .get_logger ().error (f"{ self .this_robot } - Rajant Peer RSSI - Java process died, restarting" )
256+ self .get_logger ().error (
257+ f'{ self .this_robot } - Rajant Peer RSSI - Java process died, restarting' )
231258 self .start_java_process_and_queue ()
232259 restart_count += 1
233260 if restart_count == 5 :
234261 # shutdown
235- self .get_logger ().error (f"{ self .this_robot } - Rajant Peer RSSI - Java process died too many times. Killing node." )
262+ self .get_logger ().error (
263+ f'{ self .this_robot } - Rajant Peer RSSI'
264+ ' - Java process died too many times.'
265+ ' Killing node.' )
236266 sys .exit (1 )
237267 continue
238268 try :
@@ -241,50 +271,57 @@ def process_output(self):
241271 # No data, just continue the loop
242272 continue
243273 # Data comes in lines. Decide what to do based on the output
244- if data == " \n " :
274+ if data == ' \n ' :
245275 continue
246- elif data == " ERR\n " :
276+ elif data == ' ERR\n ' :
247277 self .java_process .terminate ()
248278 continue
249- elif " END," in data :
279+ elif ' END,' in data :
250280 # End of transmission, send messages
251- data = data .replace (" END," , "" )
252- data = data .replace (" ;\n " , "" )
281+ data = data .replace (' END,' , '' )
282+ data = data .replace (' ;\n ' , '' )
253283 end_ts = int (data )
254284 for peer in self .peers :
255285 self .peers [peer ].publish_all (end_ts )
256286 continue
257287 # Process regular messages
258- data = data .replace (" ;\n " , "" ) # Remove end line
259- ts , iface , peer , rssi = data .split ("," )
288+ data = data .replace (' ;\n ' , '' ) # Remove end line
289+ ts , iface , peer , rssi = data .split (',' )
260290 # Cleanup things
261- ts = int (ts .replace (" Ts:" , "" ))
262- iface = iface .replace (" Iface:" , "" )
263- peer = peer .replace (" Peer:" , "" )
264- rssi = int (rssi .replace (" RSSI:" , "" ))
291+ ts = int (ts .replace (' Ts:' , '' ))
292+ iface = iface .replace (' Iface:' , '' )
293+ peer = peer .replace (' Peer:' , '' )
294+ rssi = int (rssi .replace (' RSSI:' , '' ))
265295 # Let the right peer handle it
266296 if peer in self .peers :
267297 self .peers [peer ].filter_rssi (ts , rssi )
268298 else :
269299 # Discover the rogue peer with the radio
270300 if peer in self .ip_to_radio :
271301 rogue_radio = self .ip_to_radio [peer ]
272- self .get_logger ().debug (f"{ self .this_robot } - Rajant Peer RSSI - Peer with radio { rogue_radio } not assigned to any robot" )
302+ self .get_logger ().debug (
303+ f'{ self .this_robot } - Rajant Peer RSSI'
304+ f' - Peer with radio { rogue_radio } '
305+ ' not assigned to any robot' )
273306 else :
274- self .get_logger ().debug (f"{ self .this_robot } - Rajant Peer RSSI - Peer with IP { peer } not assigned to any robot" )
275-
276-
307+ self .get_logger ().debug (
308+ f'{ self .this_robot } - Rajant Peer RSSI'
309+ f' - Peer with IP { peer } '
310+ ' not assigned to any robot' )
277311
278312 def start_java_process_and_queue (self ):
279313 # Subprocess to run the java BCAPI interface
280- self .get_logger ().info (f"{ self .this_robot } - Rajant Peer RSSI - Starting Java Process for IP { self .target_ip } " )
314+ self .get_logger ().info (
315+ f'{ self .this_robot } - Rajant Peer RSSI'
316+ f' - Starting Java Process for IP { self .target_ip } ' )
281317 # Run process in its own separate process group so it does not get
282318 # SIGINT. We will handle that ourselves
283319 popen_kwargs = {}
284320 popen_kwargs ['preexec_fn' ] = os .setpgrp
285- self .java_process = subprocess .Popen (['java' , '-jar' , self .bcapi_jar_file ,
286- self .target_ip ], stdout = subprocess .PIPE , close_fds = ON_POSIX ,
287- text = True , ** popen_kwargs )
321+ self .java_process = subprocess .Popen (
322+ ['java' , '-jar' , self .bcapi_jar_file , self .target_ip ],
323+ stdout = subprocess .PIPE , close_fds = ON_POSIX ,
324+ text = True , ** popen_kwargs )
288325 self .process_queue = queue .Queue ()
289326 self .enqueue_thread = threading .Thread (target = self .enqueue_output , args = ())
290327 self .enqueue_thread .start ()
@@ -296,7 +333,7 @@ def main(args=None):
296333 try :
297334 rajant_peer_rssi_node = RajantPeerRSSI ()
298335 except Exception as e :
299- print (f" Node initialization failed: { e } " )
336+ print (f' Node initialization failed: { e } ' )
300337 rclpy .shutdown ()
301338 return
302339
@@ -310,11 +347,11 @@ def main(args=None):
310347 mtexecutor .spin_once (timeout_sec = 0.1 )
311348 except KeyboardInterrupt :
312349 rajant_peer_rssi_node .shutdown ()
313- print (" Keyboard interrupt" )
350+ print (' Keyboard interrupt' )
314351 except Exception as e :
315- print (f" Exception: { e } " )
352+ print (f' Exception: { e } ' )
316353 rajant_peer_rssi_node .shutdown ()
317354
318355
319- if __name__ == " __main__" :
356+ if __name__ == ' __main__' :
320357 main ()
0 commit comments