-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathbus_manager.py
More file actions
77 lines (67 loc) · 2.52 KB
/
bus_manager.py
File metadata and controls
77 lines (67 loc) · 2.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""
Tinymovr BusManager Module
Copyright 2020-2026 MotionLayer P.C.
Implements a BusRouter class to distribute incoming traffic according to rules
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <http://www.gnu.org/licenses/>.
"""
from can.exceptions import CanOperationError, CanInitializationError
class BusManager:
def __init__(self, bus_class, bus_params, logger, bus_exceptions=(CanOperationError, CanInitializationError)):
self.bus = None
self.bus_class = bus_class
self.bus_params = bus_params
self.bus_exceptions = bus_exceptions
self.logger = logger
self.attempt_reconnect()
def attempt_reconnect(self):
try:
bus = self.bus_class(**self.bus_params)
# Flush bus to discard buffered data
# Also acts as check for bus exception
self.flush_rx_buffer(bus)
self.bus = bus
self.logger.info("Bus connected")
except self.bus_exceptions as e:
self.bus = None
def is_connected(self):
return self.bus != None
def recv(self, timeout):
try:
return self.bus.recv(timeout=timeout)
except self.bus_exceptions as e:
self.logger.warn(e)
self.bus = None
self.attempt_reconnect()
except AttributeError:
self.attempt_reconnect()
def shutdown(self):
try:
self.bus.shutdown()
self.bus = None
except AttributeError:
pass
def send(self, frame):
try:
self.bus.send(frame)
except self.bus_exceptions as e:
self.logger.warn(e)
self.attempt_reconnect()
except AttributeError:
self.attempt_reconnect()
def flush_rx_buffer(self, bus=None, trials=100):
"""
Flush the RX buffer of a bus
"""
if bus == None:
bus = self.bus
for i in range(trials):
if not bus.recv(timeout=0.001):
return