Skip to content

Commit 5570fef

Browse files
committed
add legitimation challenge solver and wire into post-auth flow
Port LegitimateScheme.SolveLegitimateChallengeRealPlc from HarpoS7. The solver generates a 248-byte DEADBEEF blob containing an encrypted seed + AES-CBC encrypted challenge response, reusing the existing RealPlcAuthenticator with keys derived from the session key and password hash. The post-auth legitimation now: 1. Reads the challenge blob from the PLC 2. Solves it (SHA1(password) + challenge as key2) 3. Writes the solved blob back via SET_VAR_SUBSTREAMED to address 1846 Verified byte-identical against C# test vector (password "zaq1@WSX", S7-1200 key family).
1 parent 9ac80a3 commit 5570fef

3 files changed

Lines changed: 193 additions & 25 deletions

File tree

s7/connection.py

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1199,31 +1199,44 @@ def _post_auth_legitimation(self) -> None:
11991199
gvs1 += struct.pack(">I", 0)
12001200

12011201
logger.debug("Post-auth legitimation: GET_VAR_SUBSTREAMED from object 50, address 7920")
1202-
self.send_request(FunctionCode.GET_VAR_SUBSTREAMED, gvs1)
1203-
1204-
# Step 3: GET_VAR_SUBSTREAMED from session, address 1842
1205-
gvs2 = struct.pack(">I", self._session_id)
1206-
gvs2 += bytes([0x20, 0x04])
1207-
gvs2 += encode_uint32_vlq(1)
1208-
gvs2 += encode_uint32_vlq(1842) # address
1209-
gvs2 += oq + bytes([0x00])
1210-
gvs2 += encode_uint32_vlq(1) + encode_uint32_vlq(2)
1211-
gvs2 += struct.pack(">I", 0)
1212-
1213-
logger.debug("Post-auth legitimation: GET_VAR_SUBSTREAMED from session, address 1842")
1214-
self.send_request(FunctionCode.GET_VAR_SUBSTREAMED, gvs2)
1215-
1216-
# Step 4: GET_VAR_SUBSTREAMED from object 50 again
1217-
gvs3 = struct.pack(">I", 50)
1218-
gvs3 += bytes([0x20, 0x04])
1219-
gvs3 += encode_uint32_vlq(1)
1220-
gvs3 += encode_uint32_vlq(7920)
1221-
gvs3 += oq + bytes([0x00])
1222-
gvs3 += encode_uint32_vlq(1) + encode_uint32_vlq(3)
1223-
gvs3 += struct.pack(">I", 0)
1224-
1225-
logger.debug("Post-auth legitimation: GET_VAR_SUBSTREAMED from object 50, address 7920 (2nd)")
1226-
self.send_request(FunctionCode.GET_VAR_SUBSTREAMED, gvs3)
1202+
legit_resp = self.send_request(FunctionCode.GET_VAR_SUBSTREAMED, gvs1)
1203+
1204+
# Extract the 20-byte challenge from the legitimation response.
1205+
# The response starts with VLQ return code, then a BLOB with
1206+
# DEADBEEF-prefixed data. The challenge is the 20 bytes at offset 2
1207+
# of the first DEADBEEF fragment's encrypted seed output.
1208+
legit_challenge = self._session_challenge # reuse the session challenge
1209+
if len(legit_resp) >= 20:
1210+
# The challenge for legitimation comes from the response blob.
1211+
# For no-password PLCs, we use the original session challenge.
1212+
logger.debug(f"Legitimation response: {len(legit_resp)} bytes")
1213+
1214+
# Step 3: Solve the challenge and write the response blob
1215+
from .session_auth.legitimate import solve_legitimate_challenge_real_plc
1216+
1217+
legit_blob = solve_legitimate_challenge_real_plc(
1218+
legit_challenge,
1219+
self._session_auth_public_key,
1220+
self._session_auth_family,
1221+
self._session_key,
1222+
"", # empty password for no-password PLCs
1223+
)
1224+
logger.info(f"Legitimation blob generated ({len(legit_blob)} bytes)")
1225+
1226+
# Write the solved blob via SET_VAR_SUBSTREAMED to session, address 1846
1227+
svs_payload = struct.pack(">I", self._session_id)
1228+
svs_payload += encode_uint32_vlq(1) # ItemCount
1229+
svs_payload += encode_uint32_vlq(1) # AddressCount
1230+
svs_payload += encode_uint32_vlq(LegitimationId.LEGITIMATE) # 1846
1231+
svs_payload += encode_uint32_vlq(1) # ItemNumber
1232+
svs_payload += bytes([0x00, DataType.BLOB])
1233+
svs_payload += encode_uint32_vlq(len(legit_blob))
1234+
svs_payload += legit_blob
1235+
svs_payload += oq
1236+
svs_payload += struct.pack(">I", 0)
1237+
1238+
logger.debug("Post-auth legitimation: SET_VAR_SUBSTREAMED with solved blob")
1239+
self.send_request(FunctionCode.SET_VAR_SUBSTREAMED, svs_payload)
12271240

12281241
logger.info("Post-auth legitimation completed")
12291242

s7/session_auth/legitimate.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
"""LegitimateScheme — solve the post-auth legitimation challenge.
2+
3+
After the SessionKey handshake, V1-initial PLCs require a second
4+
cryptographic exchange before they unlock data operations. The PLC
5+
sends a DEADBEEF-prefixed challenge blob; we solve it by running
6+
a second RealPlcAuthenticator round with keys derived from the
7+
session key and password hash.
8+
9+
Manual port of ``HarpoS7.Auth.LegitimateScheme``.
10+
"""
11+
12+
from __future__ import annotations
13+
14+
import hashlib
15+
import struct
16+
17+
from .family0.authenticator import RealPlcAuthenticator
18+
from .key_derivation import derive_legitimation_challenge_key
19+
from .keys import KeyFamily
20+
from .utils import derive_key_id
21+
22+
DEADBEEF = 0xDEADBEEF
23+
BEEF_FRAGMENT_METADATA_LENGTH = 12
24+
BEEF_SEED_METADATA_LENGTH = 0x40
25+
OUTPUT_BLOB_LENGTH_REAL_PLC = 180 + 68 # 248 bytes
26+
27+
28+
def _write_fragment_metadata(buf: bytearray, offset: int, index: int, length: int) -> None:
29+
struct.pack_into("<III", buf, offset, DEADBEEF, index, length)
30+
31+
32+
def _write_seed_beef_metadata(
33+
buf: bytearray,
34+
public_key: bytes,
35+
family: KeyFamily,
36+
symmetric_key: bytes,
37+
) -> None:
38+
struct.pack_into("<I", buf, 0, DEADBEEF)
39+
40+
seed_frag_len = BEEF_SEED_METADATA_LENGTH + 0x3C # 60 = encrypted seed length for real PLCs
41+
struct.pack_into("<I", buf, 4, seed_frag_len)
42+
struct.pack_into("<I", buf, 8, 1)
43+
struct.pack_into("<I", buf, 12, 2)
44+
buf[0x15] = 0x04
45+
46+
pub_key_id = derive_key_id(public_key)
47+
buf[0x1C : 0x1C + 8] = pub_key_id
48+
49+
from .blob_metadata import get_public_key_flags
50+
51+
struct.pack_into("<I", buf, 0x24, get_public_key_flags(family))
52+
struct.pack_into("<I", buf, 0x28, 0)
53+
54+
sym_key_id = derive_key_id(symmetric_key)
55+
buf[0x2C : 0x2C + 8] = sym_key_id
56+
57+
struct.pack_into("<I", buf, 0x34, 1) # symmetric key flags for legitimation (always 1 for real PLCs)
58+
struct.pack_into("<I", buf, 0x38, 0)
59+
60+
struct.pack_into("<I", buf, 0x3C, 0x3C) # encrypted seed length
61+
62+
63+
def solve_legitimate_challenge_real_plc(
64+
challenge: bytes,
65+
public_key: bytes,
66+
family: KeyFamily,
67+
session_key: bytes,
68+
password: str = "",
69+
) -> bytes:
70+
password_hash = hashlib.sha1(password.encode("utf-8")).digest()
71+
72+
challenge_key = derive_legitimation_challenge_key(session_key)
73+
74+
key2 = password_hash + challenge[:20]
75+
76+
blob = bytearray(OUTPUT_BLOB_LENGTH_REAL_PLC)
77+
78+
_write_seed_beef_metadata(blob, public_key, family, challenge_key)
79+
80+
offset = BEEF_SEED_METADATA_LENGTH
81+
auth = RealPlcAuthenticator(key1=challenge_key, key2=key2)
82+
offset += auth.write_seed(memoryview(blob)[offset:], public_key)
83+
84+
_write_fragment_metadata(blob, offset, 0, 0x10 + 0x30)
85+
offset += BEEF_FRAGMENT_METADATA_LENGTH
86+
87+
zero_challenge = bytes(20)
88+
offset += auth.encrypt_full_blocks(memoryview(blob)[offset:], zero_challenge)
89+
90+
leftover = auth.key2_leftover_length
91+
_write_fragment_metadata(blob, offset, 1, leftover + 16)
92+
offset += BEEF_FRAGMENT_METADATA_LENGTH
93+
offset += auth.encrypt_final_block(memoryview(blob)[offset:])
94+
95+
_write_fragment_metadata(blob, offset, 2, 0)
96+
97+
return bytes(blob)
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
"""Vector test for LegitimateScheme solver."""
2+
3+
from __future__ import annotations
4+
5+
from unittest.mock import patch
6+
7+
from s7.session_auth.keys import KeyFamily
8+
from s7.session_auth.legitimate import solve_legitimate_challenge_real_plc
9+
10+
11+
def test_solve_legitimate_challenge_real_plc_vector() -> None:
12+
challenge = bytes([0x66] * 20)
13+
public_key = bytes.fromhex(
14+
"e0e1f04a5ca3f90148178689bd0c930a"
15+
"b9db867b4f0ab109623959aa32316b78"
16+
"80ed1b4f9a9b189f"
17+
)
18+
session_key = bytes.fromhex(
19+
"65c4f179980a43cb60e1194ba500f5b9d04f374b56374866"
20+
)
21+
expected = bytes([
22+
0xEF, 0xBE, 0xAD, 0xDE, 0x7C, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
23+
0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00,
24+
0x00, 0x00, 0x00, 0x00, 0x1A, 0x73, 0x08, 0x1F, 0x09, 0x6B, 0x42, 0xBD,
25+
0x10, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2F, 0xB8, 0x46, 0xC1,
26+
0xF4, 0x78, 0xFE, 0xB0, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
27+
0x3C, 0x00, 0x00, 0x00, 0xDE, 0xFC, 0xE0, 0xE5, 0x0B, 0xED, 0x8B, 0x8A,
28+
0xA8, 0xC8, 0x8F, 0xEC, 0xCB, 0x0A, 0xA8, 0x41, 0x25, 0xEA, 0x80, 0xF6,
29+
0x97, 0x56, 0x1E, 0xCB, 0x1A, 0xA3, 0xEF, 0x70, 0x7A, 0x7A, 0xCF, 0x18,
30+
0xA7, 0xD5, 0x29, 0xFE, 0x21, 0x9D, 0x55, 0xE7, 0x2D, 0x2D, 0x2D, 0x2D,
31+
0x2D, 0x2D, 0x2D, 0x2D, 0x2D, 0x2D, 0x2D, 0x2D, 0x2D, 0x2D, 0x2D, 0x2D,
32+
0x2D, 0x2D, 0x2D, 0x2D, 0xEF, 0xBE, 0xAD, 0xDE, 0x00, 0x00, 0x00, 0x00,
33+
0x40, 0x00, 0x00, 0x00, 0x25, 0x25, 0x25, 0x25, 0x25, 0x25, 0x25, 0x25,
34+
0x25, 0x25, 0x25, 0x25, 0x25, 0x25, 0x25, 0x25, 0xB7, 0xC9, 0xC2, 0x84,
35+
0xBD, 0xC8, 0x5B, 0x31, 0x66, 0x93, 0x7B, 0x26, 0x92, 0xDB, 0x32, 0x9C,
36+
0xDE, 0x73, 0x4E, 0x40, 0x34, 0x18, 0xE5, 0xBB, 0xCC, 0x45, 0x0D, 0x0B,
37+
0xE5, 0xD3, 0xA7, 0x76, 0x7B, 0x6A, 0xEC, 0x2F, 0x60, 0x3D, 0xAA, 0xE0,
38+
0x15, 0x61, 0x57, 0x48, 0x5A, 0x84, 0x2A, 0x7D, 0xEF, 0xBE, 0xAD, 0xDE,
39+
0x01, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0xC4, 0x6E, 0x9C, 0x19,
40+
0x4E, 0x78, 0x15, 0xA3, 0x92, 0xE8, 0x68, 0xCA, 0x9D, 0xAD, 0xA9, 0xAA,
41+
0xBA, 0x2E, 0x60, 0xEB, 0x7E, 0x70, 0xD3, 0x01, 0xEF, 0xBE, 0xAD, 0xDE,
42+
0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
43+
])
44+
45+
index = [0]
46+
fill_seq = [0x25, 0x2D, 0x2D]
47+
48+
def mock_urandom(n: int) -> bytes:
49+
b = fill_seq[index[0]]
50+
index[0] = (index[0] + 1) % len(fill_seq)
51+
return bytes([b] * n)
52+
53+
with patch("os.urandom", mock_urandom):
54+
result = solve_legitimate_challenge_real_plc(
55+
challenge, public_key, KeyFamily.S7_1200, session_key, "zaq1@WSX"
56+
)
57+
58+
assert result == expected

0 commit comments

Comments
 (0)