Skip to content

Commit 4820c8d

Browse files
committed
Add Redis integration for CAN message handling and update Docker setup to include redis container
1 parent 202e720 commit 4820c8d

5 files changed

Lines changed: 128 additions & 91 deletions

File tree

base-station/Dockerfile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ FROM python:3.9-slim
22

33
WORKDIR /app
44

5-
RUN pip install cantools
5+
COPY requirements.txt .
6+
7+
RUN pip install --upgrade pip && \
8+
pip install --no-cache-dir -r requirements.txt
69

710
COPY base.py /app/base.py
811

base-station/base.py

Lines changed: 76 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,19 @@
1212
import psutil
1313
import os
1414
from collections import deque
15+
import redis
16+
17+
REDIS_URL = ""
18+
REDIS_CHANNEL_NAME = "can_messages"
19+
20+
21+
# Setting up Redis client
22+
try:
23+
redis_client = redis.Redis()
24+
print("redis client initialized")
25+
except Exception as e:
26+
print("redis database couldn't be reached")
27+
print(e)
1528

1629
# Optional cantools import
1730
try:
@@ -36,7 +49,7 @@
3649
# Configuration
3750
UDP_PORT = 12345
3851
TIME_SYNC_PORT = 12346
39-
NAMED_PIPE_PATH = "/tmp/can_data_pipe"
52+
# NAMED_PIPE_PATH = "/tmp/can_data_pipe"
4053
HTTP_FORWARD_URL = "http://127.0.0.1:8085/can"
4154

4255
# Memory safeguards
@@ -53,39 +66,39 @@
5366
udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
5467
udp_sock.bind(('', UDP_PORT))
5568

56-
def setup_named_pipe():
57-
"""Create a named pipe for local communication."""
58-
try:
59-
if os.path.exists(NAMED_PIPE_PATH):
60-
os.unlink(NAMED_PIPE_PATH)
61-
print(f"Removed existing named pipe: {NAMED_PIPE_PATH}")
62-
os.mkfifo(NAMED_PIPE_PATH)
63-
print(f"Created named pipe: {NAMED_PIPE_PATH}")
64-
except FileExistsError:
65-
print(f"Named pipe already exists: {NAMED_PIPE_PATH}")
66-
except Exception as e:
67-
print(f"Error creating named pipe: {e}")
69+
# def setup_named_pipe():
70+
# """Create a named pipe for local communication."""
71+
# try:
72+
# if os.path.exists(NAMED_PIPE_PATH):
73+
# os.unlink(NAMED_PIPE_PATH)
74+
# print(f"Removed existing named pipe: {NAMED_PIPE_PATH}")
75+
# os.mkfifo(NAMED_PIPE_PATH)
76+
# print(f"Created named pipe: {NAMED_PIPE_PATH}")
77+
# except FileExistsError:
78+
# print(f"Named pipe already exists: {NAMED_PIPE_PATH}")
79+
# except Exception as e:
80+
# print(f"Error creating named pipe: {e}")
6881

69-
setup_named_pipe()
82+
# setup_named_pipe()
7083
print(f"Base station listening for ESP32 CAN JSON on UDP {UDP_PORT}")
71-
print(f"CAN data available via named pipe: {NAMED_PIPE_PATH}")
84+
print(f"CAN data available via Redis pub/sub")
7285

7386
# Use deque with maxlen for automatic memory management
7487
batched_frames = deque(maxlen=MAX_BATCH_SIZE)
7588
batch_lock = threading.Lock()
76-
pipe_fd = None
77-
pipe_file = None
89+
# pipe_fd = None
90+
# pipe_file = None
7891
last_batch_time = time.time()
7992

8093
# Statistics
8194
stats = {
8295
'udp_messages_received': 0,
8396
'can_frames_processed': 0,
84-
'pipe_writes_success': 0,
85-
'pipe_writes_failed': 0,
97+
'messages_published_success': 0,
98+
'messages_published_failed': 0,
8699
'http_forwards_success': 0,
87100
'http_forwards_failed': 0,
88-
'last_message_time': 0
101+
'last_message_time': 0.0
89102
}
90103

91104
def print_stats():
@@ -105,65 +118,65 @@ def print_stats():
105118
print(f"Batched frames: {batch_size}/{MAX_BATCH_SIZE}")
106119
print(f"UDP messages received: {stats['udp_messages_received']}")
107120
print(f"CAN frames processed: {stats['can_frames_processed']}")
108-
print(f"Pipe writes: {stats['pipe_writes_success']} success, {stats['pipe_writes_failed']} failed")
121+
print(f"Pipe writes: {stats['messages_published_success']} success, {stats['messages_published_failed']} failed")
109122
print(f"HTTP forwards: {stats['http_forwards_success']} success, {stats['http_forwards_failed']} failed")
110123
print(f"Time since last message: {time_since_last:.1f}s")
111124
print(f"==================")
112125

113-
def open_pipe():
114-
"""Open the named pipe for writing."""
115-
global pipe_fd, pipe_file
116-
try:
117-
if pipe_fd is not None:
118-
return True
119-
pipe_fd = os.open(NAMED_PIPE_PATH, os.O_WRONLY | os.O_NONBLOCK)
120-
pipe_file = os.fdopen(pipe_fd, 'w')
121-
print("Opened named pipe for writing")
122-
return True
123-
except Exception as e:
124-
print(f"Error opening pipe: {e}")
125-
pipe_fd = None
126-
pipe_file = None
127-
return False
126+
# def open_pipe():
127+
# """Open the named pipe for writing."""
128+
# global pipe_fd, pipe_file
129+
# try:
130+
# if pipe_fd is not None:
131+
# return True
132+
# pipe_fd = os.open(NAMED_PIPE_PATH, os.O_WRONLY | os.O_NONBLOCK)
133+
# pipe_file = os.fdopen(pipe_fd, 'w')
134+
# print("Opened named pipe for writing")
135+
# return True
136+
# except Exception as e:
137+
# print(f"Error opening pipe: {e}")
138+
# pipe_fd = None
139+
# pipe_file = None
140+
# return False
128141

129-
def close_pipe():
130-
"""Close the named pipe."""
131-
global pipe_fd, pipe_file
132-
try:
133-
if pipe_file:
134-
pipe_file.close()
135-
pipe_fd = None
136-
pipe_file = None
137-
except Exception as e:
138-
print(f"Error closing pipe: {e}")
142+
# def close_pipe():
143+
# """Close the named pipe."""
144+
# global pipe_fd, pipe_file
145+
# try:
146+
# if pipe_file:
147+
# pipe_file.close()
148+
# pipe_fd = None
149+
# pipe_file = None
150+
# except Exception as e:
151+
# print(f"Error closing pipe: {e}")
139152

140153
def canserver_broadcast(frames):
141154
"""Write CAN frames to named pipe with error handling."""
142-
global pipe_file
143155
if not frames:
144156
return
145157

146158
try:
147-
if not open_pipe():
159+
if not redis_client:
148160
print("Failed to open pipe for writing")
149-
stats['pipe_writes_failed'] += 1
161+
stats['messages_published_failed'] += 1
150162
return
151163

152164
for frame in frames:
153165
line = json.dumps(frame) + "\n"
154-
pipe_file.write(line)
155-
pipe_file.flush()
156-
stats['pipe_writes_success'] += 1
166+
redis_client.publish(REDIS_CHANNEL_NAME, line)
167+
# pipe_file.write(line)
168+
# pipe_file.flush()
169+
stats['messages_published_success'] += 1
157170
print(f"Successfully wrote {len(frames)} frames to pipe")
158171
except (OSError, IOError) as e:
159-
stats['pipe_writes_failed'] += 1
172+
stats['messages_published_failed'] += 1
160173
if e.errno != 32: # Ignore "Broken pipe" when no reader
161174
print(f"Pipe write error: {e}")
162-
close_pipe()
175+
# close_pipe()
163176
except Exception as e:
164-
stats['pipe_writes_failed'] += 1
177+
stats['messages_published_failed'] += 1
165178
print(f"Unexpected pipe error: {e}")
166-
close_pipe()
179+
# close_pipe()
167180

168181
def send_can_messages_batch(messages_batch):
169182
"""Send a batch of CAN messages to HTTP endpoint."""
@@ -292,9 +305,10 @@ def send_test_messages():
292305
print("Exiting...")
293306
finally:
294307
udp_sock.close()
295-
close_pipe()
296-
try:
297-
os.unlink(NAMED_PIPE_PATH)
298-
print(f"Cleaned up named pipe: {NAMED_PIPE_PATH}")
299-
except FileNotFoundError:
300-
pass
308+
redis_client.close()
309+
# close_pipe()
310+
# try:
311+
# os.unlink(NAMED_PIPE_PATH)
312+
# # print(f"Cleaned up named pipe: {NAMED_PIPE_PATH}")
313+
# except FileNotFoundError:
314+
# pass

base-station/docker-compose.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,15 @@ services:
44
network_mode: host
55
restart: unless-stopped
66

7+
redis:
8+
image: redis:8.2
9+
# network_mode: host
10+
restart: unless-stopped
11+
ports:
12+
- "6379:6379"
13+
volumes:
14+
- redis_data:/data
15+
716
murmur:
817
image: mumblevoip/murmur:latest
918
network_mode: host
@@ -12,3 +21,6 @@ services:
1221
- ./murmur.ini:/etc/murmur/murmur.ini:ro
1322
environment:
1423
- MURMUR_SUPERUSER_PASSWORD=superuser_password
24+
25+
volumes:
26+
redis_data:

base-station/requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
redis
2+
cantools

pecan/app.py

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import socket
88
import threading
99
import json
10+
import redis
1011
from datetime import datetime, timedelta, timezone
1112

1213
app = Flask(__name__)
@@ -17,6 +18,11 @@
1718
MESSAGE_HISTORY_LIMIT = 1000 # Keep only the most recent 1000 messages
1819
lock = threading.Lock() # For thread-safe access to CAN_MESSAGES
1920

21+
redis_client = redis.Redis()
22+
pubsub = redis_client.pubsub()
23+
pubsub.subscribe("can_messages")
24+
25+
2026
# ─── LOAD DBC ──────────────────────────────────────────────────────────────
2127
def load_dbc_file():
2228
"""Load DBC file with multiple path fallbacks"""
@@ -79,38 +85,38 @@ def decode_can_message(can_id, data):
7985

8086
def named_pipe_listener():
8187
"""Read CAN messages from named pipe."""
82-
pipe_path = "/tmp/can_data_pipe"
88+
# pipe_path = "/tmp/can_data_pipe"
8389
message_count = 0
8490
while True:
8591
try:
8692
# Open pipe for reading
87-
with open(pipe_path, 'r') as pipe:
88-
print(f"Connected to named pipe: {pipe_path}")
89-
for line in pipe:
90-
line = line.strip()
91-
if line:
92-
message_count += 1
93-
if message_count % 100 == 0:
94-
print(f"Processed {message_count} messages from pipe")
95-
try:
96-
msg = json.loads(line)
97-
can_id = msg['id']
98-
raw_data = bytes(msg['data'])
99-
# Use the timestamp from the message, convert UTC to local time
100-
original_timestamp = datetime.fromtimestamp(msg['time'] / 1000, tz=timezone.utc).astimezone()
101-
received_timestamp = datetime.now() # When we received it locally
102-
103-
decoded = decode_can_message(can_id, raw_data)
104-
decoded['timestamp'] = original_timestamp.isoformat()
105-
decoded['received_timestamp'] = received_timestamp.isoformat() # Track when we received it
106-
107-
with lock:
108-
CAN_MESSAGES.append(decoded)
109-
if len(CAN_MESSAGES) > MESSAGE_HISTORY_LIMIT:
110-
CAN_MESSAGES.pop(0)
111-
print(f"Message limit reached ({MESSAGE_HISTORY_LIMIT}), removed oldest message. Total: {len(CAN_MESSAGES)}")
112-
except Exception as e:
113-
print(f"Error parsing CAN message: {e}")
93+
# with open(pipe_path, 'r') as pipe:
94+
# print(f"Connected to named pipe: {pipe_path}")
95+
for sub_msg in pubsub.listen():
96+
sub_msg = sub_msg["data"].decode("utf-8")
97+
if sub_msg:
98+
message_count += 1
99+
if message_count % 100 == 0:
100+
print(f"Processed {message_count} messages from pipe")
101+
try:
102+
msg = json.loads(sub_msg)
103+
can_id = msg['id']
104+
raw_data = bytes(msg['data'])
105+
# Use the timestamp from the message, convert UTC to local time
106+
original_timestamp = datetime.fromtimestamp(msg['time'] / 1000, tz=timezone.utc).astimezone()
107+
received_timestamp = datetime.now() # When we received it locally
108+
109+
decoded = decode_can_message(can_id, raw_data)
110+
decoded['timestamp'] = original_timestamp.isoformat()
111+
decoded['received_timestamp'] = received_timestamp.isoformat() # Track when we received it
112+
113+
with lock:
114+
CAN_MESSAGES.append(decoded)
115+
if len(CAN_MESSAGES) > MESSAGE_HISTORY_LIMIT:
116+
CAN_MESSAGES.pop(0)
117+
print(f"Message limit reached ({MESSAGE_HISTORY_LIMIT}), removed oldest message. Total: {len(CAN_MESSAGES)}")
118+
except Exception as e:
119+
print(f"Error parsing CAN message: {e}")
114120
except Exception as e:
115121
print(f"Named pipe listener error: {e}")
116122
time.sleep(5) # Retry after 5 seconds

0 commit comments

Comments
 (0)