Skip to content

Commit b28640f

Browse files
BenGardinerCopilot
andauthored
Fix slcan w IsoTPSoftSocket (#4938)
* isotp: fix soft socket .select() drops ObjectPipe, causing sr1() to hang in threaded mode The select() method was filtering out ObjectPipe instances (like the sniffer's close_pipe) from its return value. This prevented the sniffer's stop mechanism from working correctly in threaded mode - when sniffer.stop() sent to close_pipe, the select() method would unblock but not return the close_pipe, so the sniffer loop couldn't detect the stop signal and had to rely on continue_sniff timing, causing hangs under load. The fix includes close_pipe (ObjectPipe) instances in the select return value, so the sniffer loop properly detects the stop signal via the 'if s is close_pipe: break' check. Added two new tests: - sr1 timeout with threaded=True (no response scenario) - sr1 timeout with threaded=True and background CAN traffic The new "ISOTPSoftSocket select returns control ObjectPipe" test directly verifies that ISOTPSoftSocket.select() passes through ready ObjectPipe instances (e.g. the sniffer's close_pipe). This test deterministically FAILS without the fix and PASSES with it. The integration tests (sr1 timeout with threaded=True) are kept for end-to-end coverage but the race window is too narrow on Linux with TestSocket to reliably trigger the bug. Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: Ben Gardiner <ben.l.gardiner@gmail.com> * isotp: fix potential cause of intermittent test failures where soft socket is garbage collected * isotpsoft, test: try hard to cleanup background threads in tests * isotpsoft, test: sr1() soft socket tests incl MF resp, SF req on slow (slcan) interface introduce mulitple tests to confirm that all the combinations of filters, threading, slow/fast interfaces work with the isotpsoft socket in the particularly problematic case of a SF request yielding an MF respoonse. The new tests currently fail for slow (slcan) interfaces * isotpsoft: make TimeoutScheduler._task a daemon thread Make this timeout scheduler a daemon thread. This should fix the python 3.13 tox failures on windows. * python-can, mux: special case for slcan: drop bus filters * isotpsoft: schedule timeouts to work with slow (slcan) interfaces, make close and timeouts more robust * python-can, mux: limit time under locks, optimize data receive latency * isotpsoft: optimize for slow (slcan) interfaces: don't call select when the internal state will do --------- Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
1 parent bba4ec0 commit b28640f

File tree

4 files changed

+720
-31
lines changed

4 files changed

+720
-31
lines changed

scapy/contrib/cansocket_python_can.py

Lines changed: 75 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from functools import reduce
1818
from operator import add
19+
1920
from collections import deque
2021

2122
from scapy.config import conf
@@ -55,24 +56,51 @@ def __init__(self, bus, sockets):
5556
"""
5657
self.bus = bus
5758
self.sockets = sockets
58-
59-
def mux(self):
60-
# type: () -> None
61-
"""Multiplexer function. Tries to receive from its python-can bus
62-
object. If a message is received, this message gets forwarded to
63-
all receive queues of the SocketWrapper objects.
59+
self.closing = False
60+
61+
# Maximum time (seconds) to spend reading frames in one read_bus()
62+
# call. On serial interfaces (slcan) the final bus.recv(timeout=0)
63+
# when the buffer is empty blocks for the serial port's read timeout
64+
# (typically 100ms in python-can's slcan driver). During that block
65+
# the TimeoutScheduler thread cannot run any other callbacks. By
66+
# capping total read time, we ensure the scheduler stays responsive
67+
# even on slow serial interfaces with heavy background traffic.
68+
READ_BUS_TIME_LIMIT = 0.020 # 20 ms
69+
70+
def read_bus(self):
71+
# type: () -> List[can_Message]
72+
"""Read available frames from the bus, up to READ_BUS_TIME_LIMIT.
73+
74+
On slow serial interfaces (slcan), bus.recv(timeout=0) can
75+
block for ~100ms when the serial buffer is empty (python-can's
76+
slcan serial timeout). This method limits total time spent
77+
reading so the TimeoutScheduler thread stays responsive.
78+
79+
This method intentionally does NOT hold pool_mutex so that
80+
concurrent send() calls are not blocked during the serial I/O.
6481
"""
82+
if self.closing:
83+
return []
6584
msgs = []
85+
deadline = time.monotonic() + self.READ_BUS_TIME_LIMIT
6686
while True:
6787
try:
6888
msg = self.bus.recv(timeout=0)
6989
if msg is None:
7090
break
7191
else:
7292
msgs.append(msg)
93+
if time.monotonic() >= deadline:
94+
break
7395
except Exception as e:
74-
warning("[MUX] python-can exception caught: %s" % e)
75-
96+
if not self.closing:
97+
warning("[MUX] python-can exception caught: %s" % e)
98+
break
99+
return msgs
100+
101+
def distribute(self, msgs):
102+
# type: (List[can_Message]) -> None
103+
"""Distribute received messages to all subscribed sockets."""
76104
for sock in self.sockets:
77105
with sock.lock:
78106
for msg in msgs:
@@ -132,9 +160,17 @@ def multiplex_rx_packets(self):
132160
# this object is singleton and all python-CAN sockets are using
133161
# the same instance and locking the same locks.
134162
return
163+
# Snapshot pool entries under the lock, then read from each bus
164+
# WITHOUT holding pool_mutex. On slow serial interfaces (slcan)
165+
# bus.recv(timeout=0) can take ~2-3ms per frame; holding the
166+
# mutex during those reads would block send() for the entire
167+
# duration.
135168
with self.pool_mutex:
136-
for t in self.pool.values():
137-
t.mux()
169+
mappers = list(self.pool.values())
170+
for mapper in mappers:
171+
msgs = mapper.read_bus()
172+
if msgs:
173+
mapper.distribute(msgs)
138174
self.last_call = time.monotonic()
139175

140176
def register(self, socket, *args, **kwargs):
@@ -161,13 +197,36 @@ def register(self, socket, *args, **kwargs):
161197
if k in self.pool:
162198
t = self.pool[k]
163199
t.sockets.append(socket)
164-
filters = [s.filters for s in t.sockets
165-
if s.filters is not None]
166-
if filters:
167-
t.bus.set_filters(reduce(add, filters))
200+
# Update bus-level filters to the union of all sockets'
201+
# filters. For non-slcan interfaces (socketcan, kvaser,
202+
# vector), this enables efficient hardware/kernel
203+
# filtering. For slcan, the bus filters were already
204+
# cleared on creation, so this is a no-op (all sockets
205+
# on slcan share the unfiltered bus).
206+
if not k.lower().startswith('slcan'):
207+
filters = [s.filters for s in t.sockets
208+
if s.filters is not None]
209+
if filters:
210+
t.bus.set_filters(reduce(add, filters))
168211
socket.name = k
169212
else:
170213
bus = can_Bus(*args, **kwargs)
214+
# Serial interfaces like slcan only do software
215+
# filtering inside BusABC.recv(): the recv loop reads
216+
# one frame, finds it doesn't match, and returns
217+
# None -- silently consuming serial bandwidth without
218+
# returning the frame to the mux. This starves the
219+
# mux on busy buses.
220+
#
221+
# For slcan, clear the filters from the bus so that
222+
# bus.recv() returns ALL frames. Per-socket filtering
223+
# in distribute() via _matches_filters() handles
224+
# delivery. Other interfaces (socketcan, kvaser,
225+
# vector, candle) perform efficient hardware/kernel
226+
# filtering and should keep their bus-level filters.
227+
if kwargs.get('can_filters') and \
228+
k.lower().startswith('slcan'):
229+
bus.set_filters(None)
171230
socket.name = k
172231
self.pool[k] = SocketMapper(bus, [socket])
173232

@@ -188,6 +247,7 @@ def unregister(self, socket):
188247
t = self.pool[socket.name]
189248
t.sockets.remove(socket)
190249
if not t.sockets:
250+
t.closing = True
191251
t.bus.shutdown()
192252
del self.pool[socket.name]
193253
except KeyError:
@@ -322,6 +382,7 @@ def select(sockets, remain=conf.recv_poll_rate):
322382
:returns: an array of sockets that were selected and
323383
the function to be called next to get the packets (i.g. recv)
324384
"""
385+
SocketsPool.multiplex_rx_packets()
325386
ready_sockets = \
326387
[s for s in sockets if isinstance(s, PythonCANSocket) and
327388
len(s.can_iface.rx_queue)]
@@ -333,7 +394,6 @@ def select(sockets, remain=conf.recv_poll_rate):
333394
# yield this thread to avoid starvation
334395
time.sleep(0)
335396

336-
SocketsPool.multiplex_rx_packets()
337397
return cast(List[SuperSocket], ready_sockets)
338398

339399
def close(self):

scapy/contrib/isotp/isotp_soft_socket.py

Lines changed: 78 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ def __init__(self,
166166
def close(self):
167167
# type: () -> None
168168
if not self.closed:
169-
self.impl.close()
169+
if hasattr(self, "impl"):
170+
self.impl.close()
170171
self.closed = True
171172

172173
def failure_analysis(self):
@@ -202,8 +203,8 @@ def recv(self, x=0xffff, **kwargs):
202203
return msg
203204

204205
@staticmethod
205-
def select(sockets, remain=None):
206-
# type: (List[SuperSocket], Optional[float]) -> List[SuperSocket]
206+
def select(sockets, remain=None): # type: ignore[override]
207+
# type: (List[Union[SuperSocket, ObjectPipe[Any]]], Optional[float]) -> List[Union[SuperSocket, ObjectPipe[Any]]] # noqa: E501
207208
"""This function is called during sendrecv() routine to wait for
208209
sockets to be ready to receive
209210
"""
@@ -214,8 +215,12 @@ def select(sockets, remain=None):
214215

215216
ready_pipes = select_objects(obj_pipes, remain)
216217

217-
return [x for x in sockets if isinstance(x, ISOTPSoftSocket) and
218-
not x.closed and x.impl.rx_queue in ready_pipes]
218+
result: List[Union[SuperSocket, ObjectPipe[Any]]] = [
219+
x for x in sockets if isinstance(x, ISOTPSoftSocket) and
220+
not x.closed and x.impl.rx_queue in ready_pipes]
221+
result += [x for x in sockets if isinstance(x, ObjectPipe) and
222+
x in ready_pipes]
223+
return result
219224

220225

221226
class TimeoutScheduler:
@@ -251,6 +256,7 @@ def schedule(cls, timeout, callback):
251256
# Start the scheduling thread if it is not started already
252257
if cls._thread is None:
253258
t = Thread(target=cls._task, name="TimeoutScheduler._task")
259+
t.daemon = True
254260
must_interrupt = False
255261
cls._thread = t
256262
cls._event.clear()
@@ -550,6 +556,7 @@ def __init__(self,
550556
self.tx_handle = TimeoutScheduler.schedule(
551557
self.rx_tx_poll_rate, self._send)
552558
self.last_rx_call = 0.0
559+
self.rx_start_time = 0.0
553560

554561
def failure_analysis(self):
555562
# type: () -> None
@@ -592,12 +599,26 @@ def _get_padding_size(pl_size):
592599
def can_recv(self):
593600
# type: () -> None
594601
self.last_rx_call = TimeoutScheduler._time()
595-
if self.can_socket.select([self.can_socket], 0):
596-
pkt = self.can_socket.recv()
597-
if pkt:
598-
self.on_can_recv(pkt)
602+
try:
603+
while self.can_socket.select([self.can_socket], 0):
604+
pkt = self.can_socket.recv()
605+
if pkt:
606+
self.on_can_recv(pkt)
607+
else:
608+
break
609+
except Exception:
610+
if not self.closed:
611+
log_isotp.warning("Error in can_recv: %s",
612+
traceback.format_exc())
599613
if not self.closed and not self.can_socket.closed:
600-
if self.can_socket.select([self.can_socket], 0):
614+
# Determine poll_time from ISOTP state only.
615+
# Avoid calling select() here — on slow serial interfaces
616+
# (slcan), each select() triggers a mux() call that reads
617+
# N frames at ~2.5ms each, wasting time that could be spent
618+
# processing frames already in the rx_queue.
619+
if self.rx_state == ISOTP_WAIT_DATA or \
620+
self.tx_state == ISOTP_WAIT_FC or \
621+
self.tx_state == ISOTP_WAIT_FIRST_FC:
601622
poll_time = 0.0
602623
else:
603624
poll_time = self.rx_tx_poll_rate
@@ -643,13 +664,46 @@ def close(self):
643664
self.tx_handle.cancel()
644665
except Scapy_Exception:
645666
pass
667+
if self.rx_timeout_handle is not None:
668+
try:
669+
self.rx_timeout_handle.cancel()
670+
except Scapy_Exception:
671+
pass
672+
if self.tx_timeout_handle is not None:
673+
try:
674+
self.tx_timeout_handle.cancel()
675+
except Scapy_Exception:
676+
pass
677+
try:
678+
self.rx_queue.close()
679+
except (OSError, EOFError):
680+
pass
681+
try:
682+
self.tx_queue.close()
683+
except (OSError, EOFError):
684+
pass
646685

647686
def _rx_timer_handler(self):
648687
# type: () -> None
649688
"""Method called every time the rx_timer times out, due to the peer not
650689
sending a consecutive frame within the expected time window"""
651690

691+
if self.closed:
692+
return
693+
652694
if self.rx_state == ISOTP_WAIT_DATA:
695+
# On slow serial interfaces (slcan), the mux reads frames
696+
# from an OS serial buffer that may contain hundreds of
697+
# background CAN frames. Consecutive Frames from the ECU
698+
# are queued behind this backlog and can take several
699+
# seconds to reach the ISOTP state machine. Extend the
700+
# timeout up to 10 × cf_timeout to give the mux enough
701+
# time to drain the backlog.
702+
total_wait = TimeoutScheduler._time() - self.rx_start_time
703+
if total_wait < self.cf_timeout * 10:
704+
self.rx_timeout_handle = TimeoutScheduler.schedule(
705+
self.cf_timeout, self._rx_timer_handler)
706+
return
653707
# we did not get new data frames in time.
654708
# reset rx state
655709
self.rx_state = ISOTP_IDLE
@@ -662,6 +716,9 @@ def _tx_timer_handler(self):
662716
two situations: either a Flow Control frame was not received in time,
663717
or the Separation Time Min is expired and a new frame must be sent."""
664718

719+
if self.closed:
720+
return
721+
665722
if (self.tx_state == ISOTP_WAIT_FC or
666723
self.tx_state == ISOTP_WAIT_FIRST_FC):
667724
# we did not get any flow control frame in time
@@ -866,6 +923,7 @@ def _recv_ff(self, data, ts):
866923
# initial setup for this pdu reception
867924
self.rx_sn = 1
868925
self.rx_state = ISOTP_WAIT_DATA
926+
self.rx_start_time = TimeoutScheduler._time()
869927

870928
# no creation of flow control frames
871929
if not self.listen_only:
@@ -994,11 +1052,16 @@ def begin_send(self, x):
9941052

9951053
def _send(self):
9961054
# type: () -> None
997-
if self.tx_state == ISOTP_IDLE:
998-
if select_objects([self.tx_queue], 0):
999-
pkt = self.tx_queue.recv()
1000-
if pkt:
1001-
self.begin_send(pkt)
1055+
try:
1056+
if self.tx_state == ISOTP_IDLE:
1057+
if select_objects([self.tx_queue], 0):
1058+
pkt = self.tx_queue.recv()
1059+
if pkt:
1060+
self.begin_send(pkt)
1061+
except Exception:
1062+
if not self.closed:
1063+
log_isotp.warning("Error in _send: %s",
1064+
traceback.format_exc())
10021065

10031066
if not self.closed:
10041067
self.tx_handle = TimeoutScheduler.schedule(

0 commit comments

Comments
 (0)