Skip to content

Commit 1a75732

Browse files
committed
Merge branch 'dev' of https://github.com/ParadoxAlarmInterface/pai into dev
2 parents 2b3eaf1 + a7c3e10 commit 1a75732

5 files changed

Lines changed: 363 additions & 122 deletions

File tree

paradox/connections/serial_encryption.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ def make_serial_key(password) -> bytes:
2727
"""Derive the 32-byte serial encryption key from the panel PC password."""
2828
if isinstance(password, bytes):
2929
raw = password
30+
elif isinstance(password, int):
31+
raw = str(password).zfill(4).encode("utf-8")
3032
else:
3133
raw = str(password).encode("utf-8")
3234
if len(raw) < 32:

paradox/console_scripts/ip150_connection_decrypt.py

Lines changed: 158 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,63 @@ def old_yaml_format_traverser(data):
165165
}
166166

167167

168-
def decrypt_file(file, password, max_packets: int = None):
169-
try:
170-
data = ordered_load(file, yaml.loader.SafeLoader)
171-
parser = PayloadParser()
168+
class IPFileDecryptor:
169+
def __init__(self, password):
170+
self.password = password
171+
self.parser = PayloadParser()
172+
173+
def _parse_packet(self, value):
174+
"""Parse raw packet bytes into an IP message. Returns (parsed, is_request) or None."""
175+
if value[0] != 0xAA:
176+
print(f"{Colors.RED}Not an IP packet: {value}{Colors.ENDC}")
177+
return None
178+
header = value[0:16]
179+
payload = value[16:]
180+
181+
is_request = header[3] in [3, 4]
182+
print(
183+
f"{Colors.BLUE}PC->IP: " if is_request else f"{Colors.GREEN}IP->PC:\n",
184+
f"\theader: {binascii.hexlify(header)}\n",
185+
f"\tencrypted_payload: {binascii.hexlify(payload)}",
186+
)
187+
188+
if is_request:
189+
parsed = IPMessageRequest.parse(value, password=self.password)
190+
else:
191+
parsed = IPMessageResponse.parse(value, password=self.password)
192+
return parsed, is_request
193+
194+
def _handle_connect_request(self, parsed):
195+
if parsed.header.sub_command == 0:
196+
assert self.password == parsed.payload, "Wrong decryption password"
197+
198+
def _handle_connect_response(self, parsed):
199+
if parsed.header.sub_command == 0:
200+
self.password = parsed.payload[1:17]
201+
assert len(self.password) == 16, "Wrong password length"
202+
print(f"{Colors.RED}Session password: {self.password}{Colors.ENDC}")
203+
elif parsed.header.sub_command == 3:
204+
self._print_connection_result(parsed.payload[0] & 240)
205+
206+
def _print_connection_result(self, result_byte):
207+
messages = {16: "Successfully connected", 112: "Connection failed"}
208+
msg = messages.get(result_byte, "Connected to unknown")
209+
print(f"{Colors.RED}{msg}{Colors.ENDC}")
210+
211+
def _handle_connect(self, parsed):
212+
if parsed.header.command != IPMessageCommand.connect:
213+
return
214+
if parsed.header.message_type == IPMessageType.ip_request:
215+
self._handle_connect_request(parsed)
216+
elif parsed.header.message_type == IPMessageType.ip_response:
217+
self._handle_connect_response(parsed)
218+
219+
def decrypt(self, file, max_packets: int = None):
220+
try:
221+
data = ordered_load(file, yaml.loader.SafeLoader)
222+
except yaml.YAMLError as exc:
223+
print(f"{Colors.RED}{exc}{Colors.ENDC}")
224+
return
172225

173226
if "peers" in data and "packets" in data:
174227
iterator = data["packets"]
@@ -177,49 +230,12 @@ def decrypt_file(file, password, max_packets: int = None):
177230

178231
n = 0
179232
for packet in iterator:
180-
value = packet["data"]
181-
key = packet["index"]
182-
if not value[0] == 0xAA:
183-
print(f"{Colors.RED}Not an IP packet: {value}{Colors.ENDC}")
233+
result = self._parse_packet(packet["data"])
234+
if result is None:
184235
continue
185-
header = value[0:16]
186-
payload = value[16:]
187-
188-
is_request = header[3] in [3, 4]
189-
print(
190-
f"{Colors.BLUE}PC->IP: " if is_request else f"{Colors.GREEN}IP->PC:\n",
191-
f"\theader: {binascii.hexlify(header)}\n",
192-
f"\tencrypted_payload: {binascii.hexlify(payload)}",
193-
)
236+
parsed, is_request = result
194237

195-
if is_request:
196-
parsed = IPMessageRequest.parse(value, password=password)
197-
else:
198-
parsed = IPMessageResponse.parse(value, password=password)
199-
200-
if (
201-
parsed.header.command == IPMessageCommand.connect
202-
and parsed.header.message_type == IPMessageType.ip_request
203-
):
204-
if parsed.header.sub_command == 0:
205-
assert password == parsed.payload, "Wrong decryption password"
206-
207-
if (
208-
parsed.header.command == IPMessageCommand.connect
209-
and parsed.header.message_type == IPMessageType.ip_response
210-
):
211-
if parsed.header.sub_command == 0:
212-
password = parsed.payload[1:17]
213-
assert len(password) == 16, "Wrong password length"
214-
print(f"{Colors.RED}Session password: {password}{Colors.ENDC}")
215-
elif parsed.header.sub_command == 3:
216-
connection_result = parsed.payload[0] & 240
217-
if connection_result == 16:
218-
print(f"{Colors.RED}Successfully connected{Colors.ENDC}")
219-
elif connection_result == 112:
220-
print(f"{Colors.RED}Connection failed{Colors.ENDC}")
221-
else:
222-
print(f"{Colors.RED}Connected to unknown{Colors.ENDC}")
238+
self._handle_connect(parsed)
223239

224240
print(
225241
f"\tpayload: {binascii.hexlify(parsed.payload)}\n",
@@ -228,105 +244,129 @@ def decrypt_file(file, password, max_packets: int = None):
228244

229245
print(parsed)
230246
print(Colors.ENDC)
231-
parser.parse(parsed)
247+
self.parser.parse(parsed)
232248

233249
if not is_request:
234250
print(
235251
"----end %s-------------------------------------------------------------"
236-
% key
252+
% packet["index"]
237253
)
238254
n += 1
239255

240-
if max_packets is not None and n > max_packets:
256+
if max_packets is not None and n >= max_packets:
241257
print(f"Force stopped on {max_packets} packets")
242258
return
243259

244-
except yaml.YAMLError as exc:
245-
print(f"{Colors.RED}{exc}{Colors.ENDC}")
246260

261+
class SerialFileDecryptor:
262+
_LINE_RE = re.compile(r"^(TX|RX) \[(\d+)\]: ([0-9A-Fa-f ]+)$")
263+
264+
def __init__(self, pc_password=None):
265+
self.panel = create_panel(None)
266+
self.key_bytes = make_serial_key(pc_password) if pc_password else None
267+
268+
def _direction_str(self, direction):
269+
return "topanel" if direction == "TX" else "frompanel"
247270

248-
def decrypt_serial_file(file, pc_password=None, max_packets: int = None):
249-
import re
271+
def _try_update_panel(self, parsed):
272+
if parsed and parsed.fields.value.po.command == 0:
273+
self.panel = create_panel(None, parsed)
250274

251-
line_re = re.compile(r"^(TX|RX) \[(\d+)\]: ([0-9A-Fa-f ]+)$")
252-
panel = create_panel(None)
253-
key_bytes = make_serial_key(pc_password) if pc_password else None
254-
n = 0
255-
for line in file:
256-
line = line.strip()
257-
if not line:
258-
continue
259-
m = line_re.match(line)
275+
def _parse_line(self, line):
276+
"""Parse a serial log line. Returns (direction, message) or None."""
277+
m = self._LINE_RE.match(line)
260278
if not m:
261279
print(f"{Colors.RED}Unrecognised line: {line}{Colors.ENDC}")
262-
continue
280+
return None
263281
direction, length, hex_data = m.group(1), int(m.group(2)), m.group(3)
264282
message = bytes.fromhex(hex_data.replace(" ", ""))
265283
if len(message) != length:
266284
print(
267285
f"{Colors.RED}Length mismatch: declared {length}, got {len(message)}{Colors.ENDC}"
268286
)
269-
continue
287+
return None
288+
return direction, message
289+
290+
def _handle_encrypted_frame(self, direction, message):
291+
"""Handle E0 FE encrypted frames. Returns True if AES-decrypted (skip fallback)."""
292+
if self.key_bytes:
293+
plaintext = decrypt_serial_message(message, self.key_bytes)
294+
if plaintext:
295+
self._print_aes_decrypted(direction, message, plaintext)
296+
return True
297+
298+
self._print_encrypted_fallback(message)
299+
return False
270300

271-
color = Colors.BLUE if direction == "TX" else Colors.GREEN
301+
def _print_aes_decrypted(self, direction, message, plaintext):
272302
print(
273-
f"{color}{direction} [{length}]: {binascii.hexlify(message).decode()}{Colors.ENDC}"
303+
f"{Colors.ON_WHITE} AES-256 decrypted ({len(message)}b → {len(plaintext)}b): "
304+
f"{binascii.hexlify(plaintext).decode()}{Colors.ENDC}"
274305
)
306+
try:
307+
inner = self.panel.parse_message(plaintext, self._direction_str(direction))
308+
if inner:
309+
print(f"{Colors.ON_WHITE} Parsed: {inner}{Colors.ENDC}")
310+
self._try_update_panel(inner)
311+
except Exception:
312+
pass
275313

276-
if len(message) >= 2 and message[0] >> 4 == 0xE and message[1] == 0xFE:
277-
# Try full-AES decryption first (ESP32-style frames)
278-
if key_bytes:
279-
plaintext = decrypt_serial_message(message, key_bytes)
280-
if plaintext:
281-
print(
282-
f"{Colors.ON_WHITE} AES-256 decrypted ({len(message)}b → {len(plaintext)}b): "
283-
f"{binascii.hexlify(plaintext).decode()}{Colors.ENDC}"
284-
)
285-
try:
286-
inner = panel.parse_message(
287-
plaintext,
288-
"topanel" if direction == "TX" else "frompanel",
289-
)
290-
if inner:
291-
print(f"{Colors.ON_WHITE} Parsed: {inner}{Colors.ENDC}")
292-
if inner.fields.value.po.command == 0:
293-
panel = create_panel(None, inner)
294-
except Exception:
295-
pass
296-
continue
314+
def _print_encrypted_fallback(self, message):
315+
try:
316+
parsed = Encrypted.parse(message)
317+
print(
318+
f"{Colors.ON_WHITE} Encrypted frame (compact): request_nr={parsed.fields.value.request_nr}"
319+
f" data({len(parsed.fields.value.data)}b)={binascii.hexlify(parsed.fields.value.data).decode()}{Colors.ENDC}"
320+
)
321+
except Exception:
322+
print(
323+
f"{Colors.ON_WHITE} E0 FE frame ({len(message)}b, no key provided or unknown format){Colors.ENDC}"
324+
)
297325

298-
# Fallback: BabyWare compact E0 FE (algorithm unknown, display structure only)
299-
try:
300-
parsed = Encrypted.parse(message)
301-
print(
302-
f"{Colors.ON_WHITE} Encrypted frame (compact): request_nr={parsed.fields.value.request_nr}"
303-
f" data({len(parsed.fields.value.data)}b)={binascii.hexlify(parsed.fields.value.data).decode()}{Colors.ENDC}"
304-
)
305-
except Exception:
326+
def _handle_plain_frame(self, direction, message):
327+
try:
328+
parsed = self.panel.parse_message(message, self._direction_str(direction))
329+
if parsed:
330+
print(f"{Colors.ON_WHITE} {parsed}{Colors.ENDC}")
331+
self._try_update_panel(parsed)
332+
else:
306333
print(
307-
f"{Colors.ON_WHITE} E0 FE frame ({len(message)}b, no key provided or unknown format){Colors.ENDC}"
308-
)
309-
else:
310-
try:
311-
parsed = panel.parse_message(
312-
message, "topanel" if direction == "TX" else "frompanel"
334+
f"{Colors.RED} No parser for message: {binascii.hexlify(message).decode()}{Colors.ENDC}"
313335
)
314-
if parsed:
315-
print(f"{Colors.ON_WHITE} {parsed}{Colors.ENDC}")
316-
if parsed.fields.value.po.command == 0:
317-
panel = create_panel(None, parsed)
318-
else:
319-
print(
320-
f"{Colors.RED} No parser for message: {binascii.hexlify(message).decode()}{Colors.ENDC}"
321-
)
322-
except Exception:
323-
print(f"{Colors.RED} Parse error{Colors.ENDC}")
324-
traceback.print_exc()
325-
326-
n += 1
327-
if max_packets is not None and n >= max_packets:
328-
print(f"Force stopped on {max_packets} packets")
329-
return
336+
except Exception:
337+
print(f"{Colors.RED} Parse error{Colors.ENDC}")
338+
traceback.print_exc()
339+
340+
def _is_encrypted_frame(self, message):
341+
return len(message) >= 2 and message[0] >> 4 == 0xE and message[1] == 0xFE
342+
343+
def decrypt(self, file, max_packets: int = None):
344+
n = 0
345+
for line in file:
346+
line = line.strip()
347+
if not line:
348+
continue
349+
350+
result = self._parse_line(line)
351+
if result is None:
352+
continue
353+
direction, message = result
354+
355+
color = Colors.BLUE if direction == "TX" else Colors.GREEN
356+
print(
357+
f"{color}{direction} [{len(message)}]: {binascii.hexlify(message).decode()}{Colors.ENDC}"
358+
)
359+
360+
if self._is_encrypted_frame(message):
361+
if self._handle_encrypted_frame(direction, message):
362+
continue
363+
else:
364+
self._handle_plain_frame(direction, message)
365+
366+
n += 1
367+
if max_packets is not None and n >= max_packets:
368+
print(f"Force stopped on {max_packets} packets")
369+
return
330370

331371

332372
def main():
@@ -366,9 +406,9 @@ def main():
366406
args = parser.parse_args()
367407

368408
if args.serial:
369-
decrypt_serial_file(args.file, args.pc_password, args.packets)
409+
SerialFileDecryptor(args.pc_password).decrypt(args.file, args.packets)
370410
else:
371-
decrypt_file(args.file, args.password.encode("utf8"), args.packets)
411+
IPFileDecryptor(args.password.encode("utf8")).decrypt(args.file, args.packets)
372412

373413

374414
if __name__ == "__main__":

paradox/hardware/panel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def __init__(self, core, variable_message_length=True):
3333
self.variable_message_length = variable_message_length
3434

3535
def parse_message(self, message, direction="topanel") -> typing.Optional[Container]:
36-
if message is None or len(message) == 0:
36+
if message is None or len(message) < 2:
3737
return None
3838

3939
if message[0] >> 4 == 0xE and message[1] == 0xFE:

0 commit comments

Comments
 (0)