-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathbus_router.py
More file actions
153 lines (119 loc) · 3.96 KB
/
bus_router.py
File metadata and controls
153 lines (119 loc) · 3.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""
Tinymovr BusRouter Module
Copyright 2020-2026 MotionLayer P.C.
Implements a BusRouter class to distribute incoming traffic according to rules
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <http://www.gnu.org/licenses/>.
"""
import warnings
from enum import Enum
from threading import Thread, Lock
from tinymovr.bus_manager import BusManager
from logging import Logger
bus_router = None
class Client:
def __init__(self, filter_cb, recv_cb):
self.filter_cb = filter_cb
self.recv_cb = recv_cb
class BusRouterState(Enum):
INIT = 0
RUNNING = 1
STOPPING = 2
STOPPED = 3
class BusRouter:
"""
Distribute incoming messages based on the boolean result
of a filter callback.
python-can does not allow filtering messages per recipient therefore
this class.
Also implements a simple forwarding mechanism for sending messages, to
simplify interfacing with CAN bus objects.
"""
def __init__(self, bus_manager, timeout, logger):
self.bus_manager = bus_manager
self.timeout = timeout
self.logger = logger
self.clients = []
self.running = True
self.update_thread = Thread(target=self.run, daemon=True)
self.update_thread.start()
def run(self):
while self.running:
frame = self.bus_manager.recv(self.timeout)
if frame:
for client in self.clients:
if client.filter_cb(frame):
client.recv_cb(frame)
def add_client(self, filter_cb, recv_cb):
self.clients.append(Client(filter_cb, recv_cb))
def stop(self):
self.running = False
if self.update_thread.is_alive():
self.update_thread.join()
def shutdown(self):
self.stop()
self.bus_manager.shutdown()
def send(self, frame):
"""
Send a frame by forwarding to the bus manager object
"""
self.bus_manager.send(frame)
def init_router(bus_class, bus_params, logger=Logger("tinymovr"), timeout=0.1):
"""
Initializes a bus router using a python-can bus instance
"""
global bus_router
if bus_router == None:
bus_manager = BusManager(bus_class, bus_params, logger=logger)
bus_router = BusRouter(bus_manager, timeout, logger=logger)
else:
logger.warning("Bus Router already exists")
return bus_router
def destroy_router():
"""
Destroys the existing bus router, stopping its thread.
"""
global bus_router
if bus_router != None:
bus_router.shutdown()
bus_router = None
def get_router():
"""
Get the current bus router object
"""
return bus_router
# Deprecated functions
def init_tee(bus, timeout=0.1):
"""
Initializes a tee using a python-can bus instance
"""
raise RuntimeError(
"init_tee() is no longer supported. Please use init_router() instead."
)
def destroy_tee():
"""
Destroys the existing tee, stopping its thread.
"""
warnings.warn(
"destroy_tee() is deprecated and will be removed in future versions. Please use destroy_router() instead.",
DeprecationWarning,
stacklevel=2,
)
destroy_router()
def get_tee():
"""
Get the current tee object
"""
warnings.warn(
"get_tee() is deprecated and will be removed in future versions. Please use get_router() instead.",
DeprecationWarning,
stacklevel=2,
)
get_router()