Skip to content

Commit 202e720

Browse files
committed
optimized pipe
1 parent 4c5f871 commit 202e720

3 files changed

Lines changed: 214 additions & 95 deletions

File tree

base-station/base.py

Lines changed: 111 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
#!/usr/bin/env python3
22
"""
3-
Base station for receiving CAN-over-UDP JSON from ESP32, forwarding batches
4-
to an HTTP endpoint, and exposing a CANserver-compatible TCP service for SavvyCAN.
5-
6-
Usage:
7-
python3 base.py [--test]
3+
Base station with memory diagnostics and safeguards
84
"""
95

106
import socket
@@ -13,17 +9,18 @@
139
import threading
1410
import requests
1511
import argparse
12+
import psutil
13+
import os
14+
from collections import deque
1615

17-
# Optional cantools import (for DBC decoding)+
16+
# Optional cantools import
1817
try:
1918
import cantools
2019
try:
21-
# Allow fallback paths for DBC file on /WFR25-6389976.dbc
2220
try:
2321
db = cantools.database.load_file('WFR25-6389976.dbc')
2422
except FileNotFoundError:
2523
db = cantools.database.load_file('base-station/WFR25-6389976.dbc')
26-
2724
print("DBC file loaded successfully - ready to decode CAN messages.")
2825
except FileNotFoundError:
2926
db = None
@@ -36,31 +33,31 @@
3633
db = None
3734
print("cantools not installed. Install with: pip install cantools")
3835

39-
import os
40-
4136
# Configuration
42-
UDP_PORT = 12345 # incoming from ESP32
43-
TIME_SYNC_PORT = 12346 # for time sync broadcast
44-
NAMED_PIPE_PATH = "/tmp/can_data_pipe" # Named pipe for local communication
37+
UDP_PORT = 12345
38+
TIME_SYNC_PORT = 12346
39+
NAMED_PIPE_PATH = "/tmp/can_data_pipe"
4540
HTTP_FORWARD_URL = "http://127.0.0.1:8085/can"
4641

47-
# Command-line arguments
48-
parser = argparse.ArgumentParser(description='Base station with CANserver interface')
49-
parser.add_argument('--test', action='store_true', help='Enable testing mode with fake CAN messages')
42+
# Memory safeguards
43+
MAX_BATCH_SIZE = 1000 # Maximum frames to batch before forcing flush
44+
MAX_BATCH_AGE = 5 # Maximum seconds to hold frames before forcing flush
45+
46+
parser = argparse.ArgumentParser(description='Base station with memory diagnostics')
47+
parser.add_argument('--test', action='store_true', help='Enable testing mode')
5048
args = parser.parse_args()
5149

52-
# UDP listener socket for incoming CAN-over-UDP JSON
50+
# UDP listener socket
5351
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
5452
udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
5553
udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
5654
udp_sock.bind(('', UDP_PORT))
5755

58-
# Setup named pipe for local communication
5956
def setup_named_pipe():
6057
"""Create a named pipe for local communication."""
6158
try:
6259
if os.path.exists(NAMED_PIPE_PATH):
63-
os.unlink(NAMED_PIPE_PATH) # Remove existing pipe
60+
os.unlink(NAMED_PIPE_PATH)
6461
print(f"Removed existing named pipe: {NAMED_PIPE_PATH}")
6562
os.mkfifo(NAMED_PIPE_PATH)
6663
print(f"Created named pipe: {NAMED_PIPE_PATH}")
@@ -70,15 +67,48 @@ def setup_named_pipe():
7067
print(f"Error creating named pipe: {e}")
7168

7269
setup_named_pipe()
73-
7470
print(f"Base station listening for ESP32 CAN JSON on UDP {UDP_PORT}")
7571
print(f"CAN data available via named pipe: {NAMED_PIPE_PATH}")
7672

77-
# Batch for named pipe broadcasts
78-
batched_frames = []
73+
# Use deque with maxlen for automatic memory management
74+
batched_frames = deque(maxlen=MAX_BATCH_SIZE)
7975
batch_lock = threading.Lock()
8076
pipe_fd = None
8177
pipe_file = None
78+
last_batch_time = time.time()
79+
80+
# Statistics
81+
stats = {
82+
'udp_messages_received': 0,
83+
'can_frames_processed': 0,
84+
'pipe_writes_success': 0,
85+
'pipe_writes_failed': 0,
86+
'http_forwards_success': 0,
87+
'http_forwards_failed': 0,
88+
'last_message_time': 0
89+
}
90+
91+
def print_stats():
92+
"""Print diagnostic statistics periodically."""
93+
while True:
94+
time.sleep(10) # Print stats every 10 seconds
95+
process = psutil.Process()
96+
memory_mb = process.memory_info().rss / 1024 / 1024
97+
98+
with batch_lock:
99+
batch_size = len(batched_frames)
100+
101+
time_since_last = time.time() - stats['last_message_time']
102+
103+
print(f"\n=== DIAGNOSTICS ===")
104+
print(f"Memory usage: {memory_mb:.1f} MB")
105+
print(f"Batched frames: {batch_size}/{MAX_BATCH_SIZE}")
106+
print(f"UDP messages received: {stats['udp_messages_received']}")
107+
print(f"CAN frames processed: {stats['can_frames_processed']}")
108+
print(f"Pipe writes: {stats['pipe_writes_success']} success, {stats['pipe_writes_failed']} failed")
109+
print(f"HTTP forwards: {stats['http_forwards_success']} success, {stats['http_forwards_failed']} failed")
110+
print(f"Time since last message: {time_since_last:.1f}s")
111+
print(f"==================")
82112

83113
def open_pipe():
84114
"""Open the named pipe for writing."""
@@ -108,45 +138,47 @@ def close_pipe():
108138
print(f"Error closing pipe: {e}")
109139

110140
def canserver_broadcast(frames):
111-
"""
112-
Write CAN frames to named pipe for local communication.
113-
Each frame: {"time":123456.789,"bus":0,"id":123,"data":[1,2,3]}
114-
"""
141+
"""Write CAN frames to named pipe with error handling."""
115142
global pipe_file
116143
if not frames:
117144
return
118-
print(f"Writing {len(frames)} frame(s) to named pipe...")
119145

120146
try:
121147
if not open_pipe():
122148
print("Failed to open pipe for writing")
149+
stats['pipe_writes_failed'] += 1
123150
return
124151

125152
for frame in frames:
126153
line = json.dumps(frame) + "\n"
127154
pipe_file.write(line)
128155
pipe_file.flush()
156+
stats['pipe_writes_success'] += 1
129157
print(f"Successfully wrote {len(frames)} frames to pipe")
130158
except (OSError, IOError) as e:
159+
stats['pipe_writes_failed'] += 1
131160
if e.errno != 32: # Ignore "Broken pipe" when no reader
132161
print(f"Pipe write error: {e}")
133-
close_pipe() # Close and will reopen on next write
162+
close_pipe()
134163
except Exception as e:
164+
stats['pipe_writes_failed'] += 1
135165
print(f"Unexpected pipe error: {e}")
136166
close_pipe()
137167

138168
def send_can_messages_batch(messages_batch):
139-
"""Send a batch of CAN messages (JSON) to HTTP endpoint."""
169+
"""Send a batch of CAN messages to HTTP endpoint."""
140170
try:
141171
r = requests.post(HTTP_FORWARD_URL, json=messages_batch, timeout=5)
142-
if r.status_code != 200:
143-
# print(f"HTTP forward error {r.status_code}")
144-
pass
172+
if r.status_code == 200:
173+
stats['http_forwards_success'] += 1
174+
else:
175+
stats['http_forwards_failed'] += 1
145176
except Exception as e:
177+
stats['http_forwards_failed'] += 1
146178
print(f"Error forwarding batch: {e}")
147179

148180
def broadcast_time():
149-
"""Broadcast 8-byte big-endian timestamp for ESP32 sync."""
181+
"""Broadcast timestamp for ESP32 sync."""
150182
b_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
151183
b_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
152184
while True:
@@ -159,83 +191,108 @@ def broadcast_time():
159191
time.sleep(1)
160192

161193
def broadcast_batch_timer():
162-
"""Broadcast accumulated CAN frames every second."""
194+
"""Broadcast accumulated CAN frames with memory safeguards."""
195+
global last_batch_time
163196
while True:
164197
time.sleep(1)
198+
current_time = time.time()
199+
165200
with batch_lock:
166-
if batched_frames:
167-
frames_to_send = batched_frames[:]
201+
# Force flush if batch is full, old, or has any frames
202+
should_flush = (len(batched_frames) >= MAX_BATCH_SIZE or
203+
(batched_frames and current_time - last_batch_time >= MAX_BATCH_AGE) or
204+
len(batched_frames) > 0)
205+
206+
if should_flush and batched_frames:
207+
frames_to_send = list(batched_frames) # Convert deque to list
168208
batched_frames.clear()
209+
last_batch_time = current_time
169210
canserver_broadcast(frames_to_send)
170211

171212
def send_test_messages():
172-
"""Send fake messages into UDP listener for testing."""
213+
"""Send fake messages for testing."""
173214
test_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
215+
test_id = 1200
174216
while True:
175217
msg = {
176218
"messages": [
177-
# time in ms
178-
{"id": "1200", "data": [1,2,3,4,5,6,7,8], "timestamp": int(time.time() * 1000)}
219+
{"id": str(test_id), "data": [1,2,3,4,5,6,7,8], "timestamp": int(time.time() * 1000)}
179220
]
180221
}
181222
test_sock.sendto(json.dumps(msg).encode(), ('127.0.0.1', UDP_PORT))
223+
# test_id += 1 # Increment ID to see different messages
182224
time.sleep(1)
183225

184226
# Start background threads
185227
threading.Thread(target=broadcast_time, daemon=True).start()
186228
threading.Thread(target=broadcast_batch_timer, daemon=True).start()
229+
threading.Thread(target=print_stats, daemon=True).start()
230+
187231
if args.test:
188232
threading.Thread(target=send_test_messages, daemon=True).start()
189-
print("--- TEST MODE ENABLED: Sending fake CAN messages every second. ---")
233+
print("--- TEST MODE ENABLED: Sending fake CAN messages ---")
190234

191-
# Main loop
235+
# Main UDP listener loop
192236
try:
237+
print("Starting main UDP listener loop...")
193238
while True:
194239
data, addr = udp_sock.recvfrom(4096)
195-
# {len(data)} bytes from {addr}")
240+
stats['udp_messages_received'] += 1
241+
stats['last_message_time'] = time.time()
242+
243+
if stats['udp_messages_received'] % 50 == 0:
244+
print(f"Received {stats['udp_messages_received']} UDP messages so far...")
245+
196246
try:
197247
decoded = data.decode('utf-8')
198248
msg = json.loads(decoded)
199249
except Exception as e:
200-
# --- DIAGNOSTIC PRINT ---
201-
print(f"!!! ERROR: Could not decode or parse JSON from {addr}. Error: {e}")
250+
print(f"!!! ERROR: Could not decode JSON from {addr}. Error: {e}")
202251
print(f" Raw data was: {data}")
203-
# ------------------------
204252
continue
205253

206254
if isinstance(msg, dict) and "messages" in msg:
207255
send_can_messages_batch(msg)
256+
processed_count = 0
257+
208258
for m in msg["messages"]:
209259
try:
210260
mid = int(m["id"], 0) if isinstance(m["id"], str) else int(m["id"])
211261
mdata = m["data"]
212262
if not isinstance(mdata, list):
213263
continue
214-
# Accumulate frame for batch broadcast
264+
215265
frame = {
216266
"time": m.get("timestamp"),
217267
"bus": 0,
218268
"id": mid,
219269
"data": list(mdata)
220270
}
271+
221272
with batch_lock:
222273
batched_frames.append(frame)
274+
275+
processed_count += 1
276+
stats['can_frames_processed'] += 1
277+
223278
except Exception as e:
224-
# --- DIAGNOSTIC PRINT ---
225-
print(f"!!! ERROR processing individual message: {e}")
226-
print(f" Problematic message data was: {m}")
227-
# ------------------------
279+
print(f"!!! ERROR processing message: {e}")
280+
print(f" Problematic message: {m}")
228281
continue
229-
# print(f"Successfully processed batch with {len(msg['messages'])} messages")
282+
283+
if args.test:
284+
if processed_count > 0:
285+
print(f"Processed {processed_count} CAN frames from batch")
286+
else:
287+
print(f"!!! WARNING: Invalid message format from {addr}: {msg}")
230288
else:
231-
print(f"!!! WARNING: Received valid JSON but it was missing the 'messages' key. Data: {msg}")
289+
print(f"!!! WARNING: Invalid message format from {addr}: {msg}")
232290

233291
except KeyboardInterrupt:
234292
print("Exiting...")
235293
finally:
236294
udp_sock.close()
237-
close_pipe() # Close pipe file descriptor
238-
# Clean up named pipe
295+
close_pipe()
239296
try:
240297
os.unlink(NAMED_PIPE_PATH)
241298
print(f"Cleaned up named pipe: {NAMED_PIPE_PATH}")

0 commit comments

Comments
 (0)