1414from collections import deque
1515import redis
1616
17- REDIS_URL = ""
1817REDIS_CHANNEL_NAME = "can_messages"
19- IS_REDIS_ACTIVE :bool
18+ IS_REDIS_ACTIVE : bool
2019
21-
22- # Setting up Redis client
23- try :
20+ # Setting up Redis client for pub/sub communication
21+ try :
2422 redis_client = redis .Redis ()
25- redis_client .setex ( "test" , 10 , "True" ) # To test redis connection
23+ redis_client .ping () # To test redis connection
2624 print ("redis client initialized" )
2725 IS_REDIS_ACTIVE = True
2826except Exception as e :
2927 print (e )
3028 IS_REDIS_ACTIVE = False
3129 print ("redis database couldn't be reached. Switching to Named Pipes" )
3230
33- # Optional cantools import
31+ # Optional cantools import and DBC loading for CAN message decoding
3432try :
3533 import cantools
34+
3635 try :
37- try :
38- db = cantools .database .load_file (' WFR25-6389976.dbc' )
36+ try :
37+ db = cantools .database .load_file (" WFR25-6389976.dbc" )
3938 except FileNotFoundError :
40- db = cantools .database .load_file (' base-station/WFR25-6389976.dbc' )
39+ db = cantools .database .load_file (" base-station/WFR25-6389976.dbc" )
4140 print ("DBC file loaded successfully - ready to decode CAN messages." )
4241 except FileNotFoundError :
4342 db = None
5049 db = None
5150 print ("cantools not installed. Install with: pip install cantools" )
5251
53- # Configuration
52+ # Configuration parameters
5453UDP_PORT = 12345
5554TIME_SYNC_PORT = 12346
5655NAMED_PIPE_PATH = "/tmp/can_data_pipe"
5756HTTP_FORWARD_URL = "http://127.0.0.1:8085/can"
5857
59- # Memory safeguards
58+ # Memory safeguards for batching CAN frames
6059MAX_BATCH_SIZE = 1000 # Maximum frames to batch before forcing flush
61- MAX_BATCH_AGE = 5 # Maximum seconds to hold frames before forcing flush
60+ MAX_BATCH_AGE = 5 # Maximum seconds to hold frames before forcing flush
6261
63- parser = argparse .ArgumentParser (description = ' Base station with memory diagnostics' )
64- parser .add_argument (' --test' , action = ' store_true' , help = ' Enable testing mode' )
62+ parser = argparse .ArgumentParser (description = " Base station with memory diagnostics" )
63+ parser .add_argument (" --test" , action = " store_true" , help = " Enable testing mode" )
6564args = parser .parse_args ()
6665
67- # UDP listener socket
66+ # UDP listener socket setup for receiving CAN JSON messages
6867udp_sock = socket .socket (socket .AF_INET , socket .SOCK_DGRAM )
6968udp_sock .setsockopt (socket .SOL_SOCKET , socket .SO_REUSEADDR , 1 )
7069udp_sock .setsockopt (socket .SOL_SOCKET , socket .SO_BROADCAST , 1 )
71- udp_sock .bind (('' , UDP_PORT ))
70+ udp_sock .bind (("" , UDP_PORT ))
71+
7272
7373def setup_named_pipe ():
7474 """Create a named pipe for local communication."""
@@ -84,63 +84,72 @@ def setup_named_pipe():
8484 print (f"Error creating named pipe: { e } " )
8585
8686
87- if not IS_REDIS_ACTIVE :
87+ if not IS_REDIS_ACTIVE :
8888 setup_named_pipe ()
8989print (f"Base station listening for ESP32 CAN JSON on UDP { UDP_PORT } " )
9090print (f"CAN data available via Redis pub/sub" )
9191
92- # Use deque with maxlen for automatic memory management
92+ # Use deque with maxlen for automatic memory management of batches
9393batched_frames = deque (maxlen = MAX_BATCH_SIZE )
9494batch_lock = threading .Lock ()
9595if not IS_REDIS_ACTIVE :
9696 pipe_fd = None
9797 pipe_file = None
9898last_batch_time = time .time ()
9999
100- # Statistics
100+ # Statistics tracking for diagnostics
101101stats = {
102- ' udp_messages_received' : 0 ,
103- ' can_frames_processed' : 0 ,
104- ' messages_published_success' : 0 ,
105- ' messages_published_failed' : 0 ,
106- ' http_forwards_success' : 0 ,
107- ' http_forwards_failed' : 0 ,
108- ' last_message_time' : 0.0
102+ " udp_messages_received" : 0 ,
103+ " can_frames_processed" : 0 ,
104+ " messages_published_success" : 0 ,
105+ " messages_published_failed" : 0 ,
106+ " http_forwards_success" : 0 ,
107+ " http_forwards_failed" : 0 ,
108+ " last_message_time" : 0.0 ,
109109}
110110
111+
111112def print_stats ():
112113 """Print diagnostic statistics periodically."""
113114 while True :
114115 time .sleep (10 ) # Print stats every 10 seconds
115116 process = psutil .Process ()
116117 memory_mb = process .memory_info ().rss / 1024 / 1024
117-
118+
118119 with batch_lock :
119120 batch_size = len (batched_frames )
120-
121- time_since_last = time .time () - stats [' last_message_time' ]
122-
121+
122+ time_since_last = time .time () - stats [" last_message_time" ]
123+
123124 print (f"\n === DIAGNOSTICS ===" )
124125 print (f"Memory usage: { memory_mb :.1f} MB" )
125126 print (f"Batched frames: { batch_size } /{ MAX_BATCH_SIZE } " )
126127 print (f"UDP messages received: { stats ['udp_messages_received' ]} " )
127128 print (f"CAN frames processed: { stats ['can_frames_processed' ]} " )
128- print (f"Pipe writes: { stats ['messages_published_success' ]} success, { stats ['messages_published_failed' ]} failed" )
129- print (f"HTTP forwards: { stats ['http_forwards_success' ]} success, { stats ['http_forwards_failed' ]} failed" )
129+ print (
130+ f"Pipe writes: { stats ['messages_published_success' ]} success, { stats ['messages_published_failed' ]} failed"
131+ )
132+ print (
133+ f"HTTP forwards: { stats ['http_forwards_success' ]} success, { stats ['http_forwards_failed' ]} failed"
134+ )
130135 print (f"Time since last message: { time_since_last :.1f} s" )
131136 print (f"==================" )
132137
138+
133139# When testing piping on macOS, uncomment the pipe_fd with os.0RDWR as one of the args
134140
141+
135142def open_pipe ():
136143 """Open the named pipe for writing."""
137144 global pipe_fd , pipe_file
138145 try :
139146 if pipe_fd is not None :
140147 return True
141- pipe_fd = os .open (NAMED_PIPE_PATH , os .O_WRONLY | os .O_NONBLOCK ) # Uncomment on Base Station / comment out when testing on macOS
148+ pipe_fd = os .open (
149+ NAMED_PIPE_PATH , os .O_WRONLY | os .O_NONBLOCK
150+ ) # Uncomment on Base Station / comment out when testing on macOS
142151 # pipe_fd = os.open(NAMED_PIPE_PATH, os.O_RDWR | os.O_NONBLOCK) # Added for macOS testing / comment out when on base station
143- pipe_file = os .fdopen (pipe_fd , 'w' )
152+ pipe_file = os .fdopen (pipe_fd , "w" )
144153 print ("Opened named pipe for writing" )
145154 return True
146155 except Exception as e :
@@ -149,6 +158,7 @@ def open_pipe():
149158 pipe_file = None
150159 return False
151160
161+
152162def close_pipe ():
153163 """Close the named pipe."""
154164 global pipe_fd , pipe_file
@@ -160,8 +170,9 @@ def close_pipe():
160170 except Exception as e :
161171 print (f"Error closing pipe: { e } " )
162172
173+
163174def canserver_broadcast (frames ):
164- """Write CAN frames to named pipe with error handling."""
175+ """Write CAN frames to Redis Pub/Sub or Named Pipe with error handling."""
165176 if not frames :
166177 return
167178
@@ -170,105 +181,113 @@ def canserver_broadcast(frames):
170181 for frame in frames :
171182 line = json .dumps (frame ) + "\n "
172183 redis_client .publish (REDIS_CHANNEL_NAME , line )
173- stats [' messages_published_success' ] += 1
184+ stats [" messages_published_success" ] += 1
174185 print (f"Successfully published { len (frames )} frames to Redis pub/sub" )
175186 elif not IS_REDIS_ACTIVE :
176187 if not open_pipe ():
177188 print ("Failed to open pipe for writing" )
178- stats [' messages_published_failed' ] += 1
189+ stats [" messages_published_failed" ] += 1
179190 return
180-
191+
181192 for frame in frames :
182193 line = json .dumps (frame ) + "\n "
183- pipe_file .write (line ) # type:ignore
194+ pipe_file .write (line ) # type:ignore
184195 pipe_file .flush () # type:ignore
185- stats ['messages_published_success' ] += 1
186- print (f"Successfully wrote { len (frames )} frames to pipe" )
187-
188- # else:
189- # print("Failed to open pipe for writing")
190- # stats['messages_published_failed'] += 1
191- # return
196+ stats ["messages_published_success" ] += 1
197+ print (f"Successfully wrote { len (frames )} frames to named pipe" )
192198 else :
193- print ("Both Methods (Named Pipe and Redis Pub/Sub) have failed" )
199+ print (
200+ "CRITICAL ERROR\n Both Methods (Named Pipe and Redis Pub/Sub) have failed"
201+ )
194202 return
195203 except (OSError , IOError ) as e :
196204 if not IS_REDIS_ACTIVE :
197- stats [' messages_published_failed' ] += 1
205+ stats [" messages_published_failed" ] += 1
198206 if e .errno != 32 : # Ignore "Broken pipe" when no reader
199207 print (f"Pipe write error: { e } " )
200208 close_pipe ()
201209 else :
202- stats [' messages_published_failed' ] += 1
210+ stats [" messages_published_failed" ] += 1
203211 print (f"Failed uploading to Redis: { e } " )
204212 except Exception as e :
205213 if not IS_REDIS_ACTIVE :
206- stats [' messages_published_failed' ] += 1
214+ stats [" messages_published_failed" ] += 1
207215 print (f"Unexpected pipe error: { e } " )
208216 close_pipe ()
209217 else :
210- stats [' messages_published_failed' ] += 1
218+ stats [" messages_published_failed" ] += 1
211219 print (f"Failed uploading to Redis: { e } " )
212220
221+
213222def send_can_messages_batch (messages_batch ):
214223 """Send a batch of CAN messages to HTTP endpoint."""
215224 try :
216225 r = requests .post (HTTP_FORWARD_URL , json = messages_batch , timeout = 5 )
217226 if r .status_code == 200 :
218- stats [' http_forwards_success' ] += 1
227+ stats [" http_forwards_success" ] += 1
219228 else :
220- stats [' http_forwards_failed' ] += 1
229+ stats [" http_forwards_failed" ] += 1
221230 except Exception as e :
222- stats [' http_forwards_failed' ] += 1
231+ stats [" http_forwards_failed" ] += 1
223232 print (f"Error forwarding batch: { e } " )
224233
234+
225235def broadcast_time ():
226- """Broadcast timestamp for ESP32 sync."""
236+ """Broadcast timestamp for ESP32 sync via UDP broadcast ."""
227237 b_sock = socket .socket (socket .AF_INET , socket .SOCK_DGRAM )
228238 b_sock .setsockopt (socket .SOL_SOCKET , socket .SO_BROADCAST , 1 )
229239 while True :
230240 try :
231241 now_ms = int (time .time () * 1000 )
232- ts = now_ms .to_bytes (8 , ' big' )
233- b_sock .sendto (ts , (' 192.168.4.255' , TIME_SYNC_PORT ))
242+ ts = now_ms .to_bytes (8 , " big" )
243+ b_sock .sendto (ts , (" 192.168.4.255" , TIME_SYNC_PORT ))
234244 except Exception as e :
235245 print (f"Time broadcast error: { e } " )
236246 time .sleep (1 )
237247
248+
238249def broadcast_batch_timer ():
239- """Broadcast accumulated CAN frames with memory safeguards."""
250+ """Periodically flush accumulated CAN frames with memory safeguards."""
240251 global last_batch_time
241252 while True :
242253 time .sleep (1 )
243254 current_time = time .time ()
244-
255+
245256 with batch_lock :
246257 # Force flush if batch is full, old, or has any frames
247- should_flush = (len (batched_frames ) >= MAX_BATCH_SIZE or
248- (batched_frames and current_time - last_batch_time >= MAX_BATCH_AGE ) or
249- len (batched_frames ) > 0 )
250-
258+ should_flush = (
259+ len (batched_frames ) >= MAX_BATCH_SIZE
260+ or (batched_frames and current_time - last_batch_time >= MAX_BATCH_AGE )
261+ or len (batched_frames ) > 0
262+ )
263+
251264 if should_flush and batched_frames :
252265 frames_to_send = list (batched_frames ) # Convert deque to list
253266 batched_frames .clear ()
254267 last_batch_time = current_time
255268 canserver_broadcast (frames_to_send )
256269
270+
257271def send_test_messages ():
258272 """Send fake messages for testing."""
259273 test_sock = socket .socket (socket .AF_INET , socket .SOCK_DGRAM )
260274 test_id = 1200
261275 while True :
262276 msg = {
263277 "messages" : [
264- {"id" : str (test_id ), "data" : [1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ], "timestamp" : int (time .time () * 1000 )}
278+ {
279+ "id" : str (test_id ),
280+ "data" : [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 ],
281+ "timestamp" : int (time .time () * 1000 ),
282+ }
265283 ]
266284 }
267- test_sock .sendto (json .dumps (msg ).encode (), (' 127.0.0.1' , UDP_PORT ))
285+ test_sock .sendto (json .dumps (msg ).encode (), (" 127.0.0.1" , UDP_PORT ))
268286 # test_id += 1 # Increment ID to see different messages
269287 time .sleep (1 )
270288
271- # Start background threads
289+
290+ # Start background threads for time broadcast, batch flushing, and diagnostics printing
272291threading .Thread (target = broadcast_time , daemon = True ).start ()
273292threading .Thread (target = broadcast_batch_timer , daemon = True ).start ()
274293threading .Thread (target = print_stats , daemon = True ).start ()
@@ -277,19 +296,19 @@ def send_test_messages():
277296 threading .Thread (target = send_test_messages , daemon = True ).start ()
278297 print ("--- TEST MODE ENABLED: Sending fake CAN messages ---" )
279298
280- # Main UDP listener loop
299+ # Main UDP listener loop to receive and process CAN JSON messages
281300try :
282301 print ("Starting main UDP listener loop..." )
283302 while True :
284303 data , addr = udp_sock .recvfrom (4096 )
285- stats [' udp_messages_received' ] += 1
286- stats [' last_message_time' ] = time .time ()
287-
288- if stats [' udp_messages_received' ] % 50 == 0 :
304+ stats [" udp_messages_received" ] += 1
305+ stats [" last_message_time" ] = time .time ()
306+
307+ if stats [" udp_messages_received" ] % 50 == 0 :
289308 print (f"Received { stats ['udp_messages_received' ]} UDP messages so far..." )
290-
309+
291310 try :
292- decoded = data .decode (' utf-8' )
311+ decoded = data .decode (" utf-8" )
293312 msg = json .loads (decoded )
294313 except Exception as e :
295314 print (f"!!! ERROR: Could not decode JSON from { addr } . Error: { e } " )
@@ -299,32 +318,32 @@ def send_test_messages():
299318 if isinstance (msg , dict ) and "messages" in msg :
300319 send_can_messages_batch (msg )
301320 processed_count = 0
302-
321+
303322 for m in msg ["messages" ]:
304323 try :
305324 mid = int (m ["id" ], 0 ) if isinstance (m ["id" ], str ) else int (m ["id" ])
306325 mdata = m ["data" ]
307326 if not isinstance (mdata , list ):
308327 continue
309-
328+
310329 frame = {
311330 "time" : m .get ("timestamp" ),
312331 "bus" : 0 ,
313332 "id" : mid ,
314- "data" : list (mdata )
333+ "data" : list (mdata ),
315334 }
316-
335+
317336 with batch_lock :
318337 batched_frames .append (frame )
319-
338+
320339 processed_count += 1
321- stats [' can_frames_processed' ] += 1
322-
340+ stats [" can_frames_processed" ] += 1
341+
323342 except Exception as e :
324343 print (f"!!! ERROR processing message: { e } " )
325344 print (f" Problematic message: { m } " )
326345 continue
327-
346+
328347 if args .test :
329348 if processed_count > 0 :
330349 print (f"Processed { processed_count } CAN frames from batch" )
0 commit comments