Skip to content

Commit 670ecba

Browse files
authored
Add PCAP file parser for protocol analysis (#1331)
1 parent bddb3e9 commit 670ecba

2 files changed

Lines changed: 93 additions & 0 deletions

File tree

devtools/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,15 @@
22

33
This directory contains tooling useful for developers
44

5+
## PCAP parser (parse_pcap.py)
6+
7+
This tool parses PCAP file and tries to decrypt the traffic using the given tokens. Requires typer, dpkt, and rich.
8+
Token option can be used multiple times. All tokens are tested for decryption until decryption succeeds or there are no tokens left to try.
9+
10+
```
11+
python pcap_parser.py <pcap file> --token <token> [--token <token>]
12+
```
13+
514
## MiOT generator
615

716
This tool generates some boilerplate code for adding support for MIoT devices

devtools/parse_pcap.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""Parse PCAP files for miio traffic."""
2+
from collections import Counter, defaultdict
3+
from ipaddress import ip_address
4+
5+
import dpkt
6+
import typer
7+
from dpkt.ethernet import ETH_TYPE_IP, Ethernet
8+
from rich import print
9+
10+
from miio import Message
11+
12+
app = typer.Typer()
13+
14+
15+
def read_payloads_from_file(file, tokens: list[str]):
16+
"""Read the given pcap file and yield src, dst, and result."""
17+
pcap = dpkt.pcap.Reader(file)
18+
19+
stats: defaultdict[str, Counter] = defaultdict(Counter)
20+
for _ts, pkt in pcap:
21+
eth = Ethernet(pkt)
22+
if eth.type != ETH_TYPE_IP:
23+
continue
24+
25+
ip = eth.ip
26+
27+
if ip.p != 17:
28+
continue
29+
30+
transport = ip.udp
31+
32+
if transport.dport != 54321 and transport.sport != 54321:
33+
continue
34+
35+
data = transport.data
36+
37+
src_addr = str(ip_address(ip.src))
38+
dst_addr = str(ip_address(ip.dst))
39+
40+
decrypted = None
41+
for token in tokens:
42+
try:
43+
decrypted = Message.parse(data, token=bytes.fromhex(token))
44+
45+
break
46+
except BaseException:
47+
continue
48+
49+
if decrypted is None:
50+
continue
51+
52+
stats["stats"]["miio_packets"] += 1
53+
54+
if decrypted.data.length == 0:
55+
stats["stats"]["empty_packets"] += 1
56+
continue
57+
58+
stats["dst_addr"][dst_addr] += 1
59+
stats["src_addr"][src_addr] += 1
60+
61+
payload = decrypted.data.value
62+
63+
if "result" in payload:
64+
stats["stats"]["results"] += 1
65+
if "method" in payload:
66+
method = payload["method"]
67+
stats["commands"][method] += 1
68+
69+
yield src_addr, dst_addr, payload
70+
71+
print(stats) # noqa: T001
72+
73+
74+
@app.command()
75+
def read_file(
76+
file: typer.FileBinaryRead, token: list[str] = typer.Option(...) # noqa: B008
77+
):
78+
"""Read PCAP file and output decrypted miio communication."""
79+
for src_addr, dst_addr, payload in read_payloads_from_file(file, token):
80+
print(f"{src_addr:<15} -> {dst_addr:<15} {payload}") # noqa: T001
81+
82+
83+
if __name__ == "__main__":
84+
app()

0 commit comments

Comments
 (0)