Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion canopen/emcy.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ def on_emcy(self, can_id, data, timestamp):
self.emcy_received.notify_all()

for callback in self.callbacks:
callback(entry)
try:
callback(entry)
except Exception:
logger.exception("Exception in EMCY callback")

def add_callback(self, callback: Callable[[EmcyError], None]):
"""Get notified on EMCY messages from this node.
Expand Down
169 changes: 165 additions & 4 deletions test/test_emcy.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ def check_error(self, err, code, reg, data, ts):
self.assertAlmostEqual(err.timestamp, ts)

def test_emcy_consumer_on_emcy(self):
# Make sure multiple callbacks receive the same information.
"""Make sure multiple callbacks receive the same information."""
acc1 = []
acc2 = []
self.emcy.add_callback(lambda err: acc1.append(err))
self.emcy.add_callback(lambda err: acc2.append(err))

# Dispatch an EMCY datagram.
self.emcy.on_emcy(0x81, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1000)

self.assertEqual(len(self.emcy.log), 1)
Expand All @@ -45,7 +44,6 @@ def test_emcy_consumer_on_emcy(self):
data=bytes([0, 1, 2, 3, 4]), ts=1000,
)

# Dispatch a new EMCY datagram.
self.emcy.on_emcy(0x81, b'\x10\x90\x01\x04\x03\x02\x01\x00', 2000)
self.assertEqual(len(self.emcy.log), 2)
self.assertEqual(len(self.emcy.active), 2)
Expand All @@ -58,7 +56,6 @@ def test_emcy_consumer_on_emcy(self):
data=bytes([4, 3, 2, 1, 0]), ts=2000,
)

# Dispatch an EMCY reset.
self.emcy.on_emcy(0x81, b'\x00\x00\x00\x00\x00\x00\x00\x00', 2000)
self.assertEqual(len(self.emcy.log), 3)
self.assertEqual(len(self.emcy.active), 0)
Expand Down Expand Up @@ -123,6 +120,65 @@ def push_reset():
t.start()
self.assertIsNone(self.emcy.wait(0x9000, TIMEOUT))

def test_emcy_consumer_initialization(self):
consumer = canopen.emcy.EmcyConsumer()
self.assertEqual(consumer.log, [])
self.assertEqual(consumer.active, [])
self.assertEqual(consumer.callbacks, [])

Comment thread
friederschueler marked this conversation as resolved.
Outdated
def test_emcy_consumer_multiple_callbacks(self):
"""Test adding multiple callbacks and their execution order."""
call_order = []
self.emcy.add_callback(lambda err: call_order.append('callback1'))
self.emcy.add_callback(lambda err: call_order.append('callback2'))
self.emcy.add_callback(lambda err: call_order.append('callback3'))
self.emcy.on_emcy(0x81, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1000)
self.assertEqual(call_order, ['callback1', 'callback2', 'callback3'])

def test_emcy_consumer_callback_exception_handling(self):
"""Test that callback exceptions don't break other callbacks or the system."""
successful_callbacks = []
self.emcy.add_callback(lambda err: successful_callbacks.append('success1'))
Comment thread
friederschueler marked this conversation as resolved.
Outdated
self.emcy.add_callback(
lambda err: exec('raise ValueError("Test exception in callback")')
)
self.emcy.add_callback(lambda err: successful_callbacks.append('success2'))
self.emcy.on_emcy(0x81, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1000)
self.assertEqual(successful_callbacks, ['success1', 'success2'])

def test_emcy_consumer_error_reset_variants(self):
"""Test different error reset code patterns."""
self.emcy.on_emcy(0x81, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1000)
self.emcy.on_emcy(0x81, b'\x10\x90\x01\x04\x03\x02\x01\x00', 2000)
self.assertEqual(len(self.emcy.active), 2)
self.emcy.on_emcy(0x81, b'\x00\x00\x00\x00\x00\x00\x00\x00', 3000)
self.assertEqual(len(self.emcy.active), 0)
self.emcy.on_emcy(0x81, b'\x01\x30\x02\x00\x01\x02\x03\x04', 4000)
self.assertEqual(len(self.emcy.active), 1)
self.emcy.on_emcy(0x81, b'\x99\x00\x01\x00\x00\x00\x00\x00', 5000)
self.assertEqual(len(self.emcy.active), 0)

def test_emcy_consumer_wait_timeout_edge_cases(self):
"""Test wait method with various timeout scenarios."""
result = self.emcy.wait(timeout=0)
self.assertIsNone(result)
result = self.emcy.wait(timeout=0.001)
self.assertIsNone(result)

def test_emcy_consumer_wait_concurrent_errors(self):
"""Test wait method when multiple errors arrive concurrently."""
def push_multiple_errors():
self.emcy.on_emcy(0x81, b'\x01\x20\x01\x01\x02\x03\x04\x05', 100)
self.emcy.on_emcy(0x81, b'\x02\x20\x01\x01\x02\x03\x04\x05', 101)
self.emcy.on_emcy(0x81, b'\x03\x20\x01\x01\x02\x03\x04\x05', 102)
t = threading.Timer(TIMEOUT / 2, push_multiple_errors)
with self.assertLogs(level=logging.INFO):
t.start()
Comment thread
friederschueler marked this conversation as resolved.
Outdated
err = self.emcy.wait(0x2003, timeout=TIMEOUT)
t.join(TIMEOUT)
self.assertIsNotNone(err)
self.assertEqual(err.code, 0x2003)


class TestEmcyError(unittest.TestCase):
def test_emcy_error(self):
Expand Down Expand Up @@ -180,6 +236,26 @@ def check(code, expected):
check(0xff00, "Device Specific")
check(0xffff, "Device Specific")

def test_emcy_error_initialization_types(self):
"""Test EmcyError initialization with various data types."""
error = EmcyError(0x1000, 0, b'', 123.456)
self.assertEqual(error.code, 0x1000)
self.assertEqual(error.register, 0)
self.assertEqual(error.data, b'')
self.assertEqual(error.timestamp, 123.456)
error = EmcyError(0xFFFF, 0xFF, b'\xFF' * 5, float('inf'))
self.assertEqual(error.code, 0xFFFF)
self.assertEqual(error.register, 0xFF)
self.assertEqual(error.data, b'\xFF' * 5)
self.assertEqual(error.timestamp, float('inf'))

Comment thread
friederschueler marked this conversation as resolved.
Outdated
def test_emcy_error_str_edge_cases(self):
for code in (0x0000, 0x0001, 0x0100, 0xFFFF):
error = EmcyError(code, 0, b'', 1000)
s = str(error)
self.assertIsInstance(s, str)
Comment thread
friederschueler marked this conversation as resolved.
Outdated
self.assertIn(f"0x{code:04X}", s)


class TestEmcyProducer(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -220,6 +296,91 @@ def check(*args, res):
check(3, res=b'\x00\x00\x03\x00\x00\x00\x00\x00')
check(3, b"\xaa\xbb", res=b'\x00\x00\x03\xaa\xbb\x00\x00\x00')

def test_emcy_producer_initialization(self):
producer = canopen.emcy.EmcyProducer(0x123)
self.assertEqual(producer.cob_id, 0x123)
self.assertIsNotNone(producer.network)

Comment thread
friederschueler marked this conversation as resolved.
Outdated
def test_emcy_producer_send_edge_cases(self):
self.emcy.send(0xFFFF, 0xFF, b'\xFF\xFF\xFF\xFF\xFF')
self.check_response(b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF')
self.emcy.send(0x0000, 0x00)
self.check_response(b'\x00\x00\x00\x00\x00\x00\x00\x00')
self.emcy.send(0x1234, 0x56, b'\xAB\xCD')
self.check_response(b'\x34\x12\x56\xAB\xCD\x00\x00\x00')
self.emcy.send(0x1234, 0x56, b'\xAB\xCD\xEF\x12\x34')
self.check_response(b'\x34\x12\x56\xAB\xCD\xEF\x12\x34')

def test_emcy_producer_reset_edge_cases(self):
self.emcy.reset(0xFF)
self.check_response(b'\x00\x00\xFF\x00\x00\x00\x00\x00')
self.emcy.reset(0xFF, b'\xFF\xFF\xFF\xFF\xFF')
self.check_response(b'\x00\x00\xFF\xFF\xFF\xFF\xFF\xFF')
self.emcy.reset(0x12, b'\xAB\xCD')
self.check_response(b'\x00\x00\x12\xAB\xCD\x00\x00\x00')


class TestEmcyIntegration(unittest.TestCase):
"""Integration tests for EMCY producer and consumer."""

def setUp(self):
self.txbus = can.Bus(interface="virtual")
self.rxbus = can.Bus(interface="virtual")
self.net = canopen.Network(self.txbus)
self.net.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
self.net.connect()
self.rx_net = canopen.Network(self.rxbus)
self.rx_net.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
self.rx_net.connect()
self.producer = canopen.emcy.EmcyProducer(0x081)
self.producer.network = self.net
self.consumer = canopen.emcy.EmcyConsumer()
self.rx_net.subscribe(0x081, self.consumer.on_emcy)

def tearDown(self):
self.net.disconnect()
self.rx_net.disconnect()
self.txbus.shutdown()
self.rxbus.shutdown()

def test_producer_consumer_integration(self):
"""Test that producer and consumer work together."""
received_errors = []
self.consumer.add_callback(lambda err: received_errors.append(err))
t = threading.Timer(
TIMEOUT / 2,
lambda: self.producer.send(0x2001, 0x02, b'\x01\x02\x03\x04\x05'),
)
with self.assertLogs(level=logging.INFO):
t.start()
err = self.consumer.wait(0x2001, timeout=TIMEOUT)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .emcy_received trick is missing here, so this is flaky.

t.join(TIMEOUT)
self.assertIsNotNone(err)
self.assertEqual(err.code, 0x2001)
self.assertEqual(err.register, 0x02)
self.assertEqual(err.data, b'\x01\x02\x03\x04\x05')
self.assertEqual(received_errors, [err])

def test_producer_reset_consumer_integration(self):
"""Test producer reset clears consumer active errors."""
t = threading.Timer(
TIMEOUT / 2,
lambda: self.producer.send(0x2001, 0x02, b'\x01\x02\x03\x04\x05'),
)
with self.assertLogs(level=logging.INFO):
t.start()
self.consumer.wait(0x2001, timeout=TIMEOUT)
t.join(TIMEOUT)
self.assertEqual(len(self.consumer.active), 1)
t = threading.Timer(TIMEOUT / 2, self.producer.reset)
with self.assertLogs(level=logging.INFO):
t.start()
err = self.consumer.wait(timeout=TIMEOUT)
t.join(TIMEOUT)
self.assertIsNotNone(err)
self.assertEqual(len(self.consumer.active), 0)
self.assertEqual(len(self.consumer.log), 2)


if __name__ == "__main__":
unittest.main()
Loading