|
| 1 | +"""Tests for S7 routing support (multi-subnet PLC access).""" |
| 2 | + |
| 3 | +import struct |
| 4 | + |
| 5 | +import pytest |
| 6 | + |
| 7 | +from snap7.connection import ISOTCPConnection |
| 8 | +from snap7.client import Client |
| 9 | + |
| 10 | +# Use a unique port to avoid conflicts with other test suites |
| 11 | +ROUTING_TEST_PORT = 11102 |
| 12 | + |
| 13 | + |
| 14 | +@pytest.mark.routing |
| 15 | +class TestRoutingTSAP: |
| 16 | + """Test TSAP construction for routed connections.""" |
| 17 | + |
| 18 | + def test_remote_tsap_encodes_rack_slot(self) -> None: |
| 19 | + """Remote TSAP should encode rack and slot per S7 spec.""" |
| 20 | + rack, slot = 0, 2 |
| 21 | + expected = 0x0100 | (rack << 5) | slot # 0x0102 |
| 22 | + conn = ISOTCPConnection("127.0.0.1", remote_tsap=expected) |
| 23 | + assert conn.remote_tsap == 0x0102 |
| 24 | + |
| 25 | + def test_routing_tsap_encodes_dest_rack_slot(self) -> None: |
| 26 | + """Routing TSAP should encode destination rack/slot.""" |
| 27 | + conn = ISOTCPConnection("127.0.0.1") |
| 28 | + conn.set_routing(subnet_id=0x0001, dest_rack=0, dest_slot=3) |
| 29 | + assert conn._routing_tsap == 0x0100 | (0 << 5) | 3 # 0x0103 |
| 30 | + |
| 31 | + def test_routing_tsap_higher_rack(self) -> None: |
| 32 | + """Routing TSAP with rack=2, slot=1.""" |
| 33 | + conn = ISOTCPConnection("127.0.0.1") |
| 34 | + conn.set_routing(subnet_id=0x0002, dest_rack=2, dest_slot=1) |
| 35 | + assert conn._routing_tsap == 0x0100 | (2 << 5) | 1 # 0x0141 |
| 36 | + |
| 37 | + |
| 38 | +@pytest.mark.routing |
| 39 | +class TestCOTPCRRouting: |
| 40 | + """Test COTP Connection Request PDU generation with routing.""" |
| 41 | + |
| 42 | + def _parse_cotp_cr(self, pdu: bytes) -> dict[str, object]: |
| 43 | + """Parse a COTP CR PDU into its components for inspection.""" |
| 44 | + result: dict[str, object] = {} |
| 45 | + pdu_len = pdu[0] |
| 46 | + result["pdu_len"] = pdu_len |
| 47 | + result["pdu_type"] = pdu[1] |
| 48 | + result["dst_ref"] = struct.unpack(">H", pdu[2:4])[0] |
| 49 | + result["src_ref"] = struct.unpack(">H", pdu[4:6])[0] |
| 50 | + result["class_opt"] = pdu[6] |
| 51 | + |
| 52 | + # Parse variable-part parameters |
| 53 | + params: dict[int, bytes] = {} |
| 54 | + offset = 7 |
| 55 | + while offset < len(pdu): |
| 56 | + if offset + 2 > len(pdu): |
| 57 | + break |
| 58 | + code = pdu[offset] |
| 59 | + length = pdu[offset + 1] |
| 60 | + data = pdu[offset + 2 : offset + 2 + length] |
| 61 | + params[code] = data |
| 62 | + offset += 2 + length |
| 63 | + |
| 64 | + result["params"] = params |
| 65 | + return result |
| 66 | + |
| 67 | + def test_standard_cr_has_no_routing_params(self) -> None: |
| 68 | + """A non-routed CR should not contain routing parameters.""" |
| 69 | + conn = ISOTCPConnection("127.0.0.1") |
| 70 | + pdu = conn._build_cotp_cr() |
| 71 | + parsed = self._parse_cotp_cr(pdu) |
| 72 | + params = parsed["params"] |
| 73 | + assert isinstance(params, dict) |
| 74 | + assert ISOTCPConnection.COTP_PARAM_SUBNET_ID not in params |
| 75 | + assert ISOTCPConnection.COTP_PARAM_ROUTING_TSAP not in params |
| 76 | + |
| 77 | + def test_routed_cr_contains_subnet_param(self) -> None: |
| 78 | + """A routed CR must include the subnet ID parameter (0xC6).""" |
| 79 | + conn = ISOTCPConnection("127.0.0.1") |
| 80 | + conn.set_routing(subnet_id=0x0001, dest_rack=0, dest_slot=2) |
| 81 | + pdu = conn._build_cotp_cr() |
| 82 | + parsed = self._parse_cotp_cr(pdu) |
| 83 | + params = parsed["params"] |
| 84 | + assert isinstance(params, dict) |
| 85 | + assert ISOTCPConnection.COTP_PARAM_SUBNET_ID in params |
| 86 | + subnet_data = params[ISOTCPConnection.COTP_PARAM_SUBNET_ID] |
| 87 | + assert struct.unpack(">H", subnet_data)[0] == 0x0001 |
| 88 | + |
| 89 | + def test_routed_cr_contains_routing_tsap(self) -> None: |
| 90 | + """A routed CR must include the routing TSAP parameter (0xC7).""" |
| 91 | + conn = ISOTCPConnection("127.0.0.1") |
| 92 | + conn.set_routing(subnet_id=0x0001, dest_rack=0, dest_slot=2) |
| 93 | + pdu = conn._build_cotp_cr() |
| 94 | + parsed = self._parse_cotp_cr(pdu) |
| 95 | + params = parsed["params"] |
| 96 | + assert isinstance(params, dict) |
| 97 | + assert ISOTCPConnection.COTP_PARAM_ROUTING_TSAP in params |
| 98 | + tsap_data = params[ISOTCPConnection.COTP_PARAM_ROUTING_TSAP] |
| 99 | + expected_tsap = 0x0100 | (0 << 5) | 2 |
| 100 | + assert struct.unpack(">H", tsap_data)[0] == expected_tsap |
| 101 | + |
| 102 | + def test_routed_cr_pdu_length_is_consistent(self) -> None: |
| 103 | + """The PDU length byte must equal len(pdu) - 1.""" |
| 104 | + conn = ISOTCPConnection("127.0.0.1") |
| 105 | + conn.set_routing(subnet_id=0x00FF, dest_rack=1, dest_slot=1) |
| 106 | + pdu = conn._build_cotp_cr() |
| 107 | + # The first byte is the length of the rest of the PDU |
| 108 | + assert pdu[0] == len(pdu) - 1 |
| 109 | + |
| 110 | + def test_standard_cr_pdu_length_is_consistent(self) -> None: |
| 111 | + """Non-routed PDU length byte must also be consistent.""" |
| 112 | + conn = ISOTCPConnection("127.0.0.1") |
| 113 | + pdu = conn._build_cotp_cr() |
| 114 | + assert pdu[0] == len(pdu) - 1 |
| 115 | + |
| 116 | + def test_routed_cr_still_has_standard_params(self) -> None: |
| 117 | + """Routing should not remove the standard TSAP / PDU size params.""" |
| 118 | + conn = ISOTCPConnection("127.0.0.1") |
| 119 | + conn.set_routing(subnet_id=0x0001, dest_rack=0, dest_slot=3) |
| 120 | + pdu = conn._build_cotp_cr() |
| 121 | + parsed = self._parse_cotp_cr(pdu) |
| 122 | + params = parsed["params"] |
| 123 | + assert isinstance(params, dict) |
| 124 | + assert ISOTCPConnection.COTP_PARAM_CALLING_TSAP in params |
| 125 | + assert ISOTCPConnection.COTP_PARAM_CALLED_TSAP in params |
| 126 | + assert ISOTCPConnection.COTP_PARAM_PDU_SIZE in params |
| 127 | + |
| 128 | + |
| 129 | +@pytest.mark.routing |
| 130 | +class TestRoutedFrameValidity: |
| 131 | + """Test that routed connections produce valid protocol frames.""" |
| 132 | + |
| 133 | + def test_routed_cr_wrapped_in_tpkt(self) -> None: |
| 134 | + """A routed CR wrapped in TPKT should have correct TPKT header.""" |
| 135 | + conn = ISOTCPConnection("127.0.0.1") |
| 136 | + conn.set_routing(subnet_id=0x0005, dest_rack=0, dest_slot=1) |
| 137 | + cr_pdu = conn._build_cotp_cr() |
| 138 | + tpkt = conn._build_tpkt(cr_pdu) |
| 139 | + |
| 140 | + # TPKT header: version=3, reserved=0, length=total |
| 141 | + assert tpkt[0] == 3 |
| 142 | + assert tpkt[1] == 0 |
| 143 | + total_len = struct.unpack(">H", tpkt[2:4])[0] |
| 144 | + assert total_len == len(tpkt) |
| 145 | + assert tpkt[4:] == cr_pdu |
| 146 | + |
| 147 | + def test_subnet_id_truncated_to_16_bits(self) -> None: |
| 148 | + """Subnet IDs larger than 16 bits should be masked.""" |
| 149 | + conn = ISOTCPConnection("127.0.0.1") |
| 150 | + conn.set_routing(subnet_id=0x1FFFF, dest_rack=0, dest_slot=1) |
| 151 | + # 0x1FFFF & 0xFFFF == 0xFFFF |
| 152 | + assert conn._subnet_id == 0xFFFF |
| 153 | + |
| 154 | + |
| 155 | +@pytest.mark.routing |
| 156 | +@pytest.mark.server |
| 157 | +class TestClientConnectRouted: |
| 158 | + """Test Client.connect_routed against the built-in server.""" |
| 159 | + |
| 160 | + def test_connect_routed_to_server(self) -> None: |
| 161 | + """Client.connect_routed should negotiate PDU with a local server. |
| 162 | +
|
| 163 | + The server does not validate routing parameters in the COTP CR, |
| 164 | + so the connection handshake should succeed. |
| 165 | + """ |
| 166 | + from snap7.server import Server |
| 167 | + from snap7.type import SrvArea |
| 168 | + from ctypes import c_char |
| 169 | + |
| 170 | + server = Server() |
| 171 | + size = 100 |
| 172 | + db_data = bytearray(size) |
| 173 | + db_array = (c_char * size).from_buffer(db_data) |
| 174 | + server.register_area(SrvArea.DB, 1, db_array) |
| 175 | + server.start(tcp_port=ROUTING_TEST_PORT) |
| 176 | + |
| 177 | + try: |
| 178 | + client = Client() |
| 179 | + client.connect_routed( |
| 180 | + host="127.0.0.1", |
| 181 | + router_rack=0, |
| 182 | + router_slot=2, |
| 183 | + subnet=0x0001, |
| 184 | + dest_rack=0, |
| 185 | + dest_slot=3, |
| 186 | + port=ROUTING_TEST_PORT, |
| 187 | + ) |
| 188 | + assert client.get_connected() |
| 189 | + |
| 190 | + # Verify we can do a basic read through the routed connection |
| 191 | + data = client.db_read(1, 0, 10) |
| 192 | + assert len(data) == 10 |
| 193 | + |
| 194 | + client.disconnect() |
| 195 | + finally: |
| 196 | + server.stop() |
| 197 | + |
| 198 | + def test_connect_routed_returns_self(self) -> None: |
| 199 | + """connect_routed should return self for method chaining.""" |
| 200 | + from snap7.server import Server |
| 201 | + from snap7.type import SrvArea |
| 202 | + from ctypes import c_char |
| 203 | + |
| 204 | + server = Server() |
| 205 | + size = 10 |
| 206 | + db_data = bytearray(size) |
| 207 | + db_array = (c_char * size).from_buffer(db_data) |
| 208 | + server.register_area(SrvArea.DB, 1, db_array) |
| 209 | + server.start(tcp_port=ROUTING_TEST_PORT + 1) |
| 210 | + |
| 211 | + try: |
| 212 | + client = Client() |
| 213 | + result = client.connect_routed( |
| 214 | + host="127.0.0.1", |
| 215 | + router_rack=0, |
| 216 | + router_slot=2, |
| 217 | + subnet=0x0002, |
| 218 | + dest_rack=0, |
| 219 | + dest_slot=1, |
| 220 | + port=ROUTING_TEST_PORT + 1, |
| 221 | + ) |
| 222 | + assert result is client |
| 223 | + client.disconnect() |
| 224 | + finally: |
| 225 | + server.stop() |
0 commit comments