-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathchannel.py
More file actions
145 lines (125 loc) · 4.85 KB
/
channel.py
File metadata and controls
145 lines (125 loc) · 4.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""
Tinymovr CAN Channel Module
Copyright 2020-2026 MotionLayer P.C.
Implements a CAN bus communications channel
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <http://www.gnu.org/licenses/>.
"""
import time
from contextlib import contextmanager
from threading import Lock, Event
import can
from functools import cached_property
from avlos.channel import BaseChannel
from tinymovr.bus_router import get_router
from tinymovr.constants import (
CAN_DEV_MASK,
CAN_EP_SIZE,
CAN_EP_MASK,
CAN_HASH_SIZE,
CAN_HASH_MASK,
)
from tinymovr.codec import MultibyteCodec
class ResponseError(Exception):
def __init__(self, node_id):
super().__init__(f"Node {node_id} did not respond")
self.node_id = node_id
class CANChannel(BaseChannel):
# Inter-frame delay applied after CAN write frames (seconds).
# Zero during normal operation for full-speed control.
# Temporarily raised via throttled() context manager during bulk
# operations (e.g. import_values) to prevent overwhelming the
# device receive buffer.
BULK_SEND_DELAY = 0.003 # 3 ms
def __init__(self, node_id, compare_hash = 0):
self.node_id = node_id
self.compare_hash = compare_hash
self.queue = []
self.lock = Lock()
self.evt = Event()
self._send_delay = 0
get_router().add_client(self._filter_frame, self._recv_cb)
@contextmanager
def throttled(self):
"""Temporarily enable inter-frame write delay for bulk operations."""
prev = self._send_delay
self._send_delay = self.BULK_SEND_DELAY
try:
yield
finally:
self._send_delay = prev
def _filter_frame(self, frame):
return not frame.is_remote_frame and ids_from_arbitration(frame.arbitration_id)[2] == self.node_id
def _recv_cb(self, frame):
"""
Callback method for receiving frames. Append incoming frame
to the queue and set the event flag.
"""
self.queue.append(frame)
self.evt.set()
def send(self, data, ep_id):
"""
Send a CAN frame to a specific endpoint. The `rtr` flag is set
based on whether data is provided, and the frame is sent via the
global bus router instance.
"""
rtr = False if data and len(data) else True
get_router().send(self.create_frame(ep_id, rtr, data))
if not rtr and self._send_delay > 0:
time.sleep(self._send_delay)
def recv(self, ep_id, timeout=1.0):
"""
Receive a CAN frame from a specific endpoint. This method waits
for a frame with a matching arbitration ID, removes it from the
queue, and returns the frame data. If no matching frame is found
within the specified timeout, a ResponseError is raised.
"""
with self.lock:
self.evt.wait(timeout=timeout)
self.evt.clear()
for frame in self.queue:
inc_ep_id, inc_hash, _ = ids_from_arbitration(frame.arbitration_id)
if inc_ep_id == ep_id and (inc_hash == self.compare_hash or inc_hash == 0):
self.queue.remove(frame)
return frame.data
raise ResponseError(self.node_id)
def create_frame(self, endpoint_id, rtr=False, payload=None):
"""
Generate a CAN frame using python-can Message class
"""
return can.Message(
arbitration_id=arbitration_from_ids(endpoint_id, self.compare_hash, self.node_id),
is_extended_id=True,
is_remote_frame=rtr,
data=payload,
)
@cached_property
def serializer(self):
return MultibyteCodec()
# TODO: Implement unit test for these functions
def ids_from_arbitration(arbitration_id):
"""
Generate endpoint, hash value and node ids
from a CAN arbitration id
"""
node_id = (arbitration_id & CAN_DEV_MASK) >> (CAN_EP_SIZE + CAN_HASH_SIZE)
hash = (arbitration_id & CAN_HASH_MASK) >> CAN_EP_SIZE
ep_id = arbitration_id & CAN_EP_MASK
return ep_id, hash, node_id
def arbitration_from_ids(ep_id, hash, node_id):
"""
Generate a CAN arbitration id from endpoint,
hash value and node ids
"""
return (
ep_id & CAN_EP_MASK
| ((hash << CAN_EP_SIZE) & CAN_HASH_MASK)
| ((node_id << (CAN_EP_SIZE + CAN_HASH_SIZE)) & CAN_DEV_MASK)
)