Skip to content

Commit 2f70af1

Browse files
Michael GravesMichael Graves
authored andcommitted
Added notifier restart method and tests
1 parent 5d5cc16 commit 2f70af1

2 files changed

Lines changed: 115 additions & 12 deletions

File tree

can/notifier.py

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -158,28 +158,21 @@ def bus(self) -> BusABC | tuple["BusABC", ...]:
158158
return tuple(self._bus_list)
159159

160160
def add_bus(self, bus: BusABC) -> None:
161-
"""Add a bus for notification.
161+
"""Add a bus for notification."""
162+
self._bus_list.append(bus)
163+
self._start_reader(bus)
162164

163-
:param bus:
164-
CAN bus instance.
165-
:raises ValueError:
166-
If the *bus* is already assigned to an active :class:`~can.Notifier`.
167-
"""
168-
# add bus to notifier registry
165+
def _start_reader(self, bus: BusABC) -> None:
166+
"""Internal helper to spin up the actual background worker for a bus."""
169167
Notifier._registry.register(bus, self)
170168

171-
# add bus to internal bus list
172-
self._bus_list.append(bus)
173-
174169
file_descriptor: int = -1
175170
try:
176171
file_descriptor = bus.fileno()
177172
except NotImplementedError:
178-
# Bus doesn't support fileno, we fall back to thread based reader
179173
pass
180174

181175
if self._loop is not None and file_descriptor >= 0:
182-
# Use bus file descriptor to watch for messages
183176
self._loop.add_reader(file_descriptor, self._on_message_available, bus)
184177
self._readers.append(file_descriptor)
185178
else:
@@ -218,6 +211,10 @@ def stop(self, timeout: float = 5.0) -> None:
218211
for bus in self._bus_list:
219212
Notifier._registry.unregister(bus, self)
220213

214+
self._readers = []
215+
# Clear any pending asyncio tasks to prevent stale references
216+
self._tasks.clear()
217+
221218
def _rx_thread(self, bus: BusABC) -> None:
222219
# determine message handling callable early, not inside while loop
223220
if self._loop:
@@ -317,6 +314,30 @@ def find_instances(bus: BusABC) -> tuple["Notifier", ...]:
317314
"""
318315
return Notifier._registry.find_instances(bus)
319316

317+
def restart(self) -> None:
318+
"""Restarts the Notifier if it has been stopped.
319+
320+
:raises RuntimeWarning: If the notifier is already running.
321+
"""
322+
with self._lock:
323+
if not self._stopped:
324+
raise RuntimeWarning("Notifier is already running.")
325+
326+
self._stopped = False
327+
self.exception = None
328+
# Note: _bus_list is preserved from previous run
329+
330+
for bus in self._bus_list:
331+
self._start_reader(bus)
332+
333+
# Re-trigger listeners if they have a start method
334+
for listener in self.listeners:
335+
if hasattr(listener, "start"):
336+
try:
337+
listener.start()
338+
except Exception as e:
339+
logger.error("Failed to restart listener: %s", e)
340+
320341
def __exit__(
321342
self,
322343
exc_type: type[BaseException] | None,

test/notifier_test.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,58 @@ def test_registry(self):
7272
# find_instance must return the existing instance
7373
self.assertEqual(can.Notifier.find_instances(bus), (notifier,))
7474

75+
def test_restart(self):
76+
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
77+
reader = can.BufferedReader()
78+
notifier = can.Notifier(bus, [reader], 0.1)
79+
80+
bus.send(can.Message(arbitration_id=0x123))
81+
self.assertIsNotNone(reader.get_message(1))
82+
83+
notifier.stop()
84+
self.assertTrue(notifier.stopped)
85+
86+
new_reader = can.BufferedReader()
87+
notifier.listeners = [new_reader]
88+
89+
notifier.restart()
90+
self.assertFalse(notifier.stopped)
91+
92+
# Small settling time for the thread to actually start
93+
time.sleep(0.2)
94+
95+
# Send and Verify
96+
msg = can.Message(arbitration_id=0xABC)
97+
bus.send(msg)
98+
99+
recv = new_reader.get_message(1)
100+
self.assertIsNotNone(recv, "Failed to receive message after restart")
101+
self.assertEqual(recv.arbitration_id, 0xABC)
102+
103+
notifier.stop()
104+
105+
def test_restart_registry_lifecycle(self):
106+
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
107+
notifier = can.Notifier(bus, [], 0.1)
108+
notifier.stop()
109+
110+
# Verify it is removed from registry
111+
self.assertEqual(can.Notifier.find_instances(bus), ())
112+
113+
notifier.restart()
114+
115+
# Verify it is back in the registry and blocking others
116+
self.assertEqual(can.Notifier.find_instances(bus), (notifier,))
117+
self.assertRaises(ValueError, can.Notifier, bus, [], 0.1)
118+
119+
notifier.stop()
120+
121+
def test_double_restart_warning(self):
122+
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
123+
with can.Notifier(bus, [], 0.1) as notifier:
124+
# Attempting to restart an already running notifier should warn
125+
with self.assertRaises(RuntimeWarning):
126+
notifier.restart()
75127

76128
class AsyncNotifierTest(unittest.TestCase):
77129
def test_asyncio_notifier(self):
@@ -88,6 +140,36 @@ async def run_it():
88140

89141
asyncio.run(run_it())
90142

143+
def test_asyncio_notifier_restart(self):
144+
async def run_it():
145+
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
146+
reader = can.AsyncBufferedReader()
147+
notifier = can.Notifier(
148+
bus, [reader], 0.1, loop=asyncio.get_running_loop()
149+
)
150+
151+
notifier.stop()
152+
153+
# In some cases, creating a new listener is the safest path for a restart
154+
new_reader = can.AsyncBufferedReader()
155+
notifier.listeners = [new_reader]
156+
157+
notifier.restart()
158+
159+
# Small yield to let the selector register the FD
160+
await asyncio.sleep(0.1)
161+
162+
bus.send(can.Message(arbitration_id=0x123))
163+
164+
recv_msg = await asyncio.wait_for(new_reader.get_message(), 1.5)
165+
166+
self.assertIsNotNone(recv_msg)
167+
self.assertEqual(recv_msg.arbitration_id, 0x123)
168+
169+
notifier.stop()
170+
171+
asyncio.run(run_it())
172+
91173

92174
if __name__ == "__main__":
93175
unittest.main()

0 commit comments

Comments
 (0)