|
| 1 | +import socket |
| 2 | + |
| 3 | +import pytest |
| 4 | + |
| 5 | +from labgrid.driver import CanInterfaceDriver |
| 6 | +from labgrid.driver.caninterfacedriver import CanFdFrame, CanFilter, CanFrame |
| 7 | +from labgrid.resource import NetworkInterface |
| 8 | + |
| 9 | + |
| 10 | +FRAMES = ( |
| 11 | + CanFrame(0x123, b'\x01'), |
| 12 | + CanFrame(0x456, b'\x11\x22\x33\x44\x55\x66\x77\x88'), |
| 13 | + CanFrame(0x789, b'\x44\x33\x22\x11'), |
| 14 | +) |
| 15 | + |
| 16 | + |
| 17 | +FRAMES_FD = ( |
| 18 | + CanFdFrame(0x123, b'\x01'), |
| 19 | + CanFdFrame(0x456, b'\x11\x22\x33\x44\x55\x66\x77\x88'), |
| 20 | + CanFdFrame(0x789, b'\x44\x33\x22\x11', flags=CanFdFrame.BRS), |
| 21 | + CanFdFrame(0x765, b'\xff' * 64), |
| 22 | +) |
| 23 | + |
| 24 | + |
| 25 | +def vcan0_exists(): |
| 26 | + try: |
| 27 | + socket.if_nametoindex("vcan0") |
| 28 | + return True |
| 29 | + except OSError: |
| 30 | + return False |
| 31 | + |
| 32 | + |
| 33 | +pytestmark = pytest.mark.skipif(not vcan0_exists(), reason="Virtual CAN interface not found") |
| 34 | + |
| 35 | + |
| 36 | +@pytest.fixture |
| 37 | +def vcan(): |
| 38 | + sock = socket.socket(socket.PF_CAN, socket.SOCK_RAW | socket.SOCK_NONBLOCK, socket.CAN_RAW) |
| 39 | + sock.bind(("vcan0",)) |
| 40 | + |
| 41 | + return sock |
| 42 | + |
| 43 | + |
| 44 | +@pytest.mark.parametrize("frame", FRAMES) |
| 45 | +def test_can_frames(target, vcan, frame): |
| 46 | + iface = NetworkInterface(target, "vcan", "vcan0") |
| 47 | + can = CanInterfaceDriver(target, "vcan", "500000") |
| 48 | + target.activate(can) |
| 49 | + |
| 50 | + vcan.send(frame.to_bytes()) |
| 51 | + assert can.recv() == frame |
| 52 | + |
| 53 | + |
| 54 | +@pytest.mark.parametrize("frame", FRAMES_FD) |
| 55 | +def test_can_frames_fd(target, vcan, frame): |
| 56 | + iface = NetworkInterface(target, "vcan", "vcan0") |
| 57 | + can = CanInterfaceDriver(target, "vcan", "500000", "2000000") |
| 58 | + target.activate(can) |
| 59 | + |
| 60 | + vcan.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FD_FRAMES, 1) |
| 61 | + |
| 62 | + vcan.send(frame.to_bytes()) |
| 63 | + assert can.recv() == frame |
| 64 | + |
| 65 | + |
| 66 | +def test_can_filer(target, vcan): |
| 67 | + iface = NetworkInterface(target, "vcan", "vcan0") |
| 68 | + can = CanInterfaceDriver(target, "vcan", "500000") |
| 69 | + target.activate(can) |
| 70 | + |
| 71 | + can.filter([CanFilter(0x123), CanFilter(0x002, 0x003)]) |
| 72 | + |
| 73 | + frame = CanFrame(0x123, b'\x01') |
| 74 | + vcan.send(frame.to_bytes()) |
| 75 | + assert can.recv() == frame |
| 76 | + |
| 77 | + vcan.send(CanFrame(0x321, b'\x02').to_bytes()) |
| 78 | + vcan.send(frame.to_bytes()) |
| 79 | + assert can.recv() == frame |
| 80 | + |
| 81 | + vcan.send(CanFrame(0x012, b'\x03').to_bytes()) |
| 82 | + assert can.recv() |
0 commit comments