Skip to content

Commit 2e8842f

Browse files
committed
added back the named piping as a fallback in case redis fails to start
1 parent 4820c8d commit 2e8842f

2 files changed

Lines changed: 179 additions & 102 deletions

File tree

base-station/base.py

Lines changed: 103 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,19 @@
1616

1717
REDIS_URL = ""
1818
REDIS_CHANNEL_NAME = "can_messages"
19+
IS_REDIS_ACTIVE:bool
1920

2021

2122
# Setting up Redis client
2223
try:
2324
redis_client = redis.Redis()
25+
redis_client.setex("test", 10, "True") # To test redis connection
2426
print("redis client initialized")
27+
IS_REDIS_ACTIVE = True
2528
except Exception as e:
26-
print("redis database couldn't be reached")
2729
print(e)
30+
IS_REDIS_ACTIVE = False
31+
print("redis database couldn't be reached. Switching to Named Pipes")
2832

2933
# Optional cantools import
3034
try:
@@ -49,7 +53,7 @@
4953
# Configuration
5054
UDP_PORT = 12345
5155
TIME_SYNC_PORT = 12346
52-
# NAMED_PIPE_PATH = "/tmp/can_data_pipe"
56+
NAMED_PIPE_PATH = "/tmp/can_data_pipe"
5357
HTTP_FORWARD_URL = "http://127.0.0.1:8085/can"
5458

5559
# Memory safeguards
@@ -66,28 +70,31 @@
6670
udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
6771
udp_sock.bind(('', UDP_PORT))
6872

69-
# def setup_named_pipe():
70-
# """Create a named pipe for local communication."""
71-
# try:
72-
# if os.path.exists(NAMED_PIPE_PATH):
73-
# os.unlink(NAMED_PIPE_PATH)
74-
# print(f"Removed existing named pipe: {NAMED_PIPE_PATH}")
75-
# os.mkfifo(NAMED_PIPE_PATH)
76-
# print(f"Created named pipe: {NAMED_PIPE_PATH}")
77-
# except FileExistsError:
78-
# print(f"Named pipe already exists: {NAMED_PIPE_PATH}")
79-
# except Exception as e:
80-
# print(f"Error creating named pipe: {e}")
73+
def setup_named_pipe():
74+
"""Create a named pipe for local communication."""
75+
try:
76+
if os.path.exists(NAMED_PIPE_PATH):
77+
os.unlink(NAMED_PIPE_PATH)
78+
print(f"Removed existing named pipe: {NAMED_PIPE_PATH}")
79+
os.mkfifo(NAMED_PIPE_PATH)
80+
print(f"Created named pipe: {NAMED_PIPE_PATH}")
81+
except FileExistsError:
82+
print(f"Named pipe already exists: {NAMED_PIPE_PATH}")
83+
except Exception as e:
84+
print(f"Error creating named pipe: {e}")
85+
8186

82-
# setup_named_pipe()
87+
if not IS_REDIS_ACTIVE:
88+
setup_named_pipe()
8389
print(f"Base station listening for ESP32 CAN JSON on UDP {UDP_PORT}")
8490
print(f"CAN data available via Redis pub/sub")
8591

8692
# Use deque with maxlen for automatic memory management
8793
batched_frames = deque(maxlen=MAX_BATCH_SIZE)
8894
batch_lock = threading.Lock()
89-
# pipe_fd = None
90-
# pipe_file = None
95+
if not IS_REDIS_ACTIVE:
96+
pipe_fd = None
97+
pipe_file = None
9198
last_batch_time = time.time()
9299

93100
# Statistics
@@ -123,60 +130,85 @@ def print_stats():
123130
print(f"Time since last message: {time_since_last:.1f}s")
124131
print(f"==================")
125132

126-
# def open_pipe():
127-
# """Open the named pipe for writing."""
128-
# global pipe_fd, pipe_file
129-
# try:
130-
# if pipe_fd is not None:
131-
# return True
132-
# pipe_fd = os.open(NAMED_PIPE_PATH, os.O_WRONLY | os.O_NONBLOCK)
133-
# pipe_file = os.fdopen(pipe_fd, 'w')
134-
# print("Opened named pipe for writing")
135-
# return True
136-
# except Exception as e:
137-
# print(f"Error opening pipe: {e}")
138-
# pipe_fd = None
139-
# pipe_file = None
140-
# return False
133+
# When testing piping on macOS, uncomment the pipe_fd with os.0RDWR as one of the args
141134

142-
# def close_pipe():
143-
# """Close the named pipe."""
144-
# global pipe_fd, pipe_file
145-
# try:
146-
# if pipe_file:
147-
# pipe_file.close()
148-
# pipe_fd = None
149-
# pipe_file = None
150-
# except Exception as e:
151-
# print(f"Error closing pipe: {e}")
135+
def open_pipe():
136+
"""Open the named pipe for writing."""
137+
global pipe_fd, pipe_file
138+
try:
139+
if pipe_fd is not None:
140+
return True
141+
pipe_fd = os.open(NAMED_PIPE_PATH, os.O_WRONLY | os.O_NONBLOCK) # Uncomment on Base Station / comment out when testing on macOS
142+
# pipe_fd = os.open(NAMED_PIPE_PATH, os.O_RDWR | os.O_NONBLOCK) # Added for macOS testing / comment out when on base station
143+
pipe_file = os.fdopen(pipe_fd, 'w')
144+
print("Opened named pipe for writing")
145+
return True
146+
except Exception as e:
147+
print(f"Error opening pipe: {e}")
148+
pipe_fd = None
149+
pipe_file = None
150+
return False
151+
152+
def close_pipe():
153+
"""Close the named pipe."""
154+
global pipe_fd, pipe_file
155+
try:
156+
if pipe_file:
157+
pipe_file.close()
158+
pipe_fd = None
159+
pipe_file = None
160+
except Exception as e:
161+
print(f"Error closing pipe: {e}")
152162

153163
def canserver_broadcast(frames):
154164
"""Write CAN frames to named pipe with error handling."""
155165
if not frames:
156166
return
157-
167+
158168
try:
159-
if not redis_client:
160-
print("Failed to open pipe for writing")
161-
stats['messages_published_failed'] += 1
162-
return
169+
if IS_REDIS_ACTIVE:
170+
for frame in frames:
171+
line = json.dumps(frame) + "\n"
172+
redis_client.publish(REDIS_CHANNEL_NAME, line)
173+
stats['messages_published_success'] += 1
174+
print(f"Successfully published {len(frames)} frames to Redis pub/sub")
175+
elif not IS_REDIS_ACTIVE:
176+
if not open_pipe():
177+
print("Failed to open pipe for writing")
178+
stats['messages_published_failed'] += 1
179+
return
163180

164-
for frame in frames:
165-
line = json.dumps(frame) + "\n"
166-
redis_client.publish(REDIS_CHANNEL_NAME, line)
167-
# pipe_file.write(line)
168-
# pipe_file.flush()
169-
stats['messages_published_success'] += 1
170-
print(f"Successfully wrote {len(frames)} frames to pipe")
181+
for frame in frames:
182+
line = json.dumps(frame) + "\n"
183+
pipe_file.write(line) # type:ignore
184+
pipe_file.flush() # type:ignore
185+
stats['messages_published_success'] += 1
186+
print(f"Successfully wrote {len(frames)} frames to pipe")
187+
188+
# else:
189+
# print("Failed to open pipe for writing")
190+
# stats['messages_published_failed'] += 1
191+
# return
192+
else:
193+
print("Both Methods (Named Pipe and Redis Pub/Sub) have failed")
194+
return
171195
except (OSError, IOError) as e:
172-
stats['messages_published_failed'] += 1
173-
if e.errno != 32: # Ignore "Broken pipe" when no reader
174-
print(f"Pipe write error: {e}")
175-
# close_pipe()
196+
if not IS_REDIS_ACTIVE:
197+
stats['messages_published_failed'] += 1
198+
if e.errno != 32: # Ignore "Broken pipe" when no reader
199+
print(f"Pipe write error: {e}")
200+
close_pipe()
201+
else:
202+
stats['messages_published_failed'] += 1
203+
print(f"Failed uploading to Redis: {e}")
176204
except Exception as e:
177-
stats['messages_published_failed'] += 1
178-
print(f"Unexpected pipe error: {e}")
179-
# close_pipe()
205+
if not IS_REDIS_ACTIVE:
206+
stats['messages_published_failed'] += 1
207+
print(f"Unexpected pipe error: {e}")
208+
close_pipe()
209+
else:
210+
stats['messages_published_failed'] += 1
211+
print(f"Failed uploading to Redis: {e}")
180212

181213
def send_can_messages_batch(messages_batch):
182214
"""Send a batch of CAN messages to HTTP endpoint."""
@@ -245,7 +277,7 @@ def send_test_messages():
245277
threading.Thread(target=send_test_messages, daemon=True).start()
246278
print("--- TEST MODE ENABLED: Sending fake CAN messages ---")
247279

248-
# Main UDP listener loop
280+
# Main UDP listener loop
249281
try:
250282
print("Starting main UDP listener loop...")
251283
while True:
@@ -305,10 +337,12 @@ def send_test_messages():
305337
print("Exiting...")
306338
finally:
307339
udp_sock.close()
308-
redis_client.close()
309-
# close_pipe()
310-
# try:
311-
# os.unlink(NAMED_PIPE_PATH)
312-
# # print(f"Cleaned up named pipe: {NAMED_PIPE_PATH}")
313-
# except FileNotFoundError:
314-
# pass
340+
if IS_REDIS_ACTIVE:
341+
redis_client.close()
342+
elif not IS_REDIS_ACTIVE:
343+
close_pipe()
344+
try:
345+
os.unlink(NAMED_PIPE_PATH)
346+
# print(f"Cleaned up named pipe: {NAMED_PIPE_PATH}")
347+
except FileNotFoundError:
348+
pass

pecan/app.py

Lines changed: 76 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,18 @@
1717
CAN_MESSAGES = [] # Store decoded CAN messages
1818
MESSAGE_HISTORY_LIMIT = 1000 # Keep only the most recent 1000 messages
1919
lock = threading.Lock() # For thread-safe access to CAN_MESSAGES
20+
IS_REDIS_ACTIVE: bool
2021

21-
redis_client = redis.Redis()
22-
pubsub = redis_client.pubsub()
23-
pubsub.subscribe("can_messages")
22+
try:
23+
redis_client = redis.Redis()
24+
pubsub = redis_client.pubsub()
25+
pubsub.subscribe("can_messages")
26+
IS_REDIS_ACTIVE = True
27+
print("Redis Pub/Sub is now Active and Working")
28+
except Exception as e:
29+
print(f"There was an error with redis: {e}")
30+
print("Resorting to Named Piping")
31+
IS_REDIS_ACTIVE = False
2432

2533

2634
# ─── LOAD DBC ──────────────────────────────────────────────────────────────
@@ -85,41 +93,76 @@ def decode_can_message(can_id, data):
8593

8694
def named_pipe_listener():
8795
"""Read CAN messages from named pipe."""
88-
# pipe_path = "/tmp/can_data_pipe"
96+
pipe_path = "/tmp/can_data_pipe"
8997
message_count = 0
90-
while True:
98+
if IS_REDIS_ACTIVE:
99+
while True:
100+
try:
101+
for sub_msg in pubsub.listen():
102+
sub_msg = sub_msg["data"].decode("utf-8")
103+
if sub_msg:
104+
message_count += 1
105+
if message_count % 100 == 0:
106+
print(f"Processed {message_count} messages from Redis Pub/Sub")
107+
try:
108+
msg = json.loads(sub_msg)
109+
can_id = msg['id']
110+
raw_data = bytes(msg['data'])
111+
# Use the timestamp from the message, convert UTC to local time
112+
original_timestamp = datetime.fromtimestamp(msg['time'] / 1000, tz=timezone.utc).astimezone()
113+
received_timestamp = datetime.now() # When we received it locally
114+
115+
decoded = decode_can_message(can_id, raw_data)
116+
decoded['timestamp'] = original_timestamp.isoformat()
117+
decoded['received_timestamp'] = received_timestamp.isoformat() # Track when we received it
118+
119+
with lock:
120+
CAN_MESSAGES.append(decoded)
121+
if len(CAN_MESSAGES) > MESSAGE_HISTORY_LIMIT:
122+
CAN_MESSAGES.pop(0)
123+
print(f"Message limit reached ({MESSAGE_HISTORY_LIMIT}), removed oldest message. Total: {len(CAN_MESSAGES)}")
124+
except Exception as e:
125+
print(f"Error parsing CAN message: {e}")
126+
except Exception as e:
127+
print(f"Redis Pub/Sub listener error: {e}")
128+
time.sleep(5) # Retry after 5 seconds
129+
elif not IS_REDIS_ACTIVE:
91130
try:
92-
# Open pipe for reading
93-
# with open(pipe_path, 'r') as pipe:
94-
# print(f"Connected to named pipe: {pipe_path}")
95-
for sub_msg in pubsub.listen():
96-
sub_msg = sub_msg["data"].decode("utf-8")
97-
if sub_msg:
98-
message_count += 1
99-
if message_count % 100 == 0:
100-
print(f"Processed {message_count} messages from pipe")
101-
try:
102-
msg = json.loads(sub_msg)
103-
can_id = msg['id']
104-
raw_data = bytes(msg['data'])
105-
# Use the timestamp from the message, convert UTC to local time
106-
original_timestamp = datetime.fromtimestamp(msg['time'] / 1000, tz=timezone.utc).astimezone()
107-
received_timestamp = datetime.now() # When we received it locally
108-
109-
decoded = decode_can_message(can_id, raw_data)
110-
decoded['timestamp'] = original_timestamp.isoformat()
111-
decoded['received_timestamp'] = received_timestamp.isoformat() # Track when we received it
112-
113-
with lock:
114-
CAN_MESSAGES.append(decoded)
115-
if len(CAN_MESSAGES) > MESSAGE_HISTORY_LIMIT:
116-
CAN_MESSAGES.pop(0)
117-
print(f"Message limit reached ({MESSAGE_HISTORY_LIMIT}), removed oldest message. Total: {len(CAN_MESSAGES)}")
118-
except Exception as e:
119-
print(f"Error parsing CAN message: {e}")
131+
while True:
132+
# Open pipe for reading
133+
with open(pipe_path, 'r') as pipe:
134+
print(f"Connected to named pipe: {pipe_path}")
135+
for line in pipe:
136+
line = line.strip()
137+
if line:
138+
message_count += 1
139+
if message_count % 100 == 0:
140+
print(f"Processed {message_count} messages from pipe")
141+
try:
142+
msg = json.loads(line)
143+
can_id = msg['id']
144+
raw_data = bytes(msg['data'])
145+
# Use the timestamp from the message, convert UTC to local time
146+
original_timestamp = datetime.fromtimestamp(msg['time'] / 1000, tz=timezone.utc).astimezone()
147+
received_timestamp = datetime.now() # When we received it locally
148+
149+
decoded = decode_can_message(can_id, raw_data)
150+
decoded['timestamp'] = original_timestamp.isoformat()
151+
decoded['received_timestamp'] = received_timestamp.isoformat() # Track when we received it
152+
153+
with lock:
154+
CAN_MESSAGES.append(decoded)
155+
if len(CAN_MESSAGES) > MESSAGE_HISTORY_LIMIT:
156+
CAN_MESSAGES.pop(0)
157+
print(f"Message limit reached ({MESSAGE_HISTORY_LIMIT}), removed oldest message. Total: {len(CAN_MESSAGES)}")
158+
except Exception as e:
159+
print(f"Error parsing CAN message: {e}")
120160
except Exception as e:
121161
print(f"Named pipe listener error: {e}")
122162
time.sleep(5) # Retry after 5 seconds
163+
164+
else:
165+
print("Nothing is being picked up")
123166

124167
@dash_app.callback(
125168
Output('messages-table', 'data'),

0 commit comments

Comments
 (0)