-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhardware_interface.py
More file actions
130 lines (116 loc) · 4.18 KB
/
hardware_interface.py
File metadata and controls
130 lines (116 loc) · 4.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""
Hardware Interface Module for Assistive Navigation.
Handles communication with Arduino Mega 2560 and external sensors (Pressure, etc.).
"""
try:
import serial
SerialException = serial.SerialException
except ImportError:
serial = None
SerialException = None
import threading
import time
import config
class HardwareInterface:
def __init__(self):
self.serial_port = config.SERIAL_PORT
self.baud_rate = config.SERIAL_BAUD
self.connection = None
self.is_connected = False
self.running = False
self.thread = None
self.sensor_data = {"pressure": 0, "battery_voltage": 0.0, "last_update": 0}
self.data_lock = threading.Lock()
self.connect()
def connect(self):
if not config.ENABLE_HARDWARE:
print("[HARDWARE] Hardware disabled in config.")
return False
if serial is None:
print("[HARDWARE] pyserial not installed. Hardware features unavailable.")
self.is_connected = False
self.connection = None
return False
try:
self.connection = serial.Serial(self.serial_port, self.baud_rate, timeout=1)
self.is_connected = True
print(f"[HARDWARE] Connected to Arduino at {self.serial_port}")
self.start_reading()
return True
except SerialException as e:
print(f"[HARDWARE] Connection failed: {e}")
self.is_connected = False
self.connection = None
return False
except Exception as e:
print(f"[HARDWARE] Unexpected connection error: {e}")
self.is_connected = False
self.connection = None
return False
def start_reading(self):
if self.thread and self.thread.is_alive():
return
self.running = True
self.thread = threading.Thread(target=self._read_worker, daemon=True)
self.thread.start()
def stop(self):
self.running = False
self.is_connected = False
if self.connection:
try:
self.connection.close()
except Exception:
pass
self.connection = None
if self.thread and self.thread.is_alive() and self.thread is not threading.current_thread():
self.thread.join(timeout=1.0)
def _read_worker(self):
while self.running and self.is_connected and self.connection:
try:
if self.connection.in_waiting > 0:
line = self.connection.readline().decode("utf-8", errors="ignore").strip()
self._parse_line(line)
else:
time.sleep(0.01)
except Exception as e:
print(f"[HARDWARE] Read error: {e}")
self.is_connected = False
self.running = False
if self.connection:
try:
self.connection.close()
except Exception:
pass
self.connection = None
break
def __del__(self):
try:
self.stop()
except Exception:
pass
def _parse_line(self, line):
try:
if ":" in line:
key, value = line.split(":", 1)
with self.data_lock:
if key == "PRESSURE":
self.sensor_data["pressure"] = int(value)
elif key == "BATTERY":
self.sensor_data["battery_voltage"] = float(value)
self.sensor_data["last_update"] = time.time()
except ValueError:
pass
def get_pressure(self):
with self.data_lock:
return self.sensor_data["pressure"]
def send_feedback(self, intensity=0):
if self.is_connected:
try:
msg = f"HAPTIC:{intensity}\n"
self.connection.write(msg.encode("utf-8"))
except Exception as e:
print(f"[HARDWARE] Write error: {e}")
class DummyHardwareInterface:
def get_pressure(self): return 0
def send_feedback(self, i): pass
def stop(self): pass