-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparse-routersploit-logs.py
More file actions
executable file
·136 lines (111 loc) · 5.42 KB
/
parse-routersploit-logs.py
File metadata and controls
executable file
·136 lines (111 loc) · 5.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python3
import argparse
import csv
import json
import pathlib
from typing import Dict, Set, Tuple
def create_argument_parser() -> argparse.ArgumentParser:
"""
Command line arguments parser
"""
parser = argparse.ArgumentParser()
parser.add_argument("-ld", "--logs-dir", type=str, required=True, help="Directory with routersploit logs")
parser.add_argument("-hdf", "--hash-data-file", type=str, required=False, default="exp-hashes-data.json", help="RCE hash codes map file")
return parser
def extract_exploit_status(stdout_file: pathlib.Path, exp_hashes: Dict[str, str]) -> Tuple[str, Dict[str, Set[str]]]:
"""
Process an stdout file and extract status of exploits that were run
"""
logs_of_interest_marker = b"AAAAAAAAAA"
log_separator = b" -> "
log_status_not_vulnerable = b"is not vulnerable"
log_status_unverified = b"Could not be verified"
log_status_vulnerable = b"is vulnerable"
rce_prefix = b"GHRCE_"
name_prefix = b"Target: "
result = {"vulnerable": set(), "not-vulnerable": set(), "needs-verification": set(), "error": set()}
target_ip = None
name = ""
with stdout_file.open('rb') as fh:
for line_num, aline in enumerate(fh, start=1):
if logs_of_interest_marker in aline:
aline = aline.rstrip(b'\n')
log_data = aline.split(log_separator)[1].split(b' ')
target_ip = str(log_data[0].split(b':')[0], "utf-8")
exploit_name = str(log_data[2], "utf-8")
if aline.endswith(log_status_vulnerable):
result["vulnerable"].add(exploit_name)
elif aline.endswith(log_status_not_vulnerable):
result["not-vulnerable"].add(exploit_name)
elif aline.endswith(log_status_unverified):
result["needs-verification"].add(exploit_name)
else:
result["error"].add(exploit_name)
elif aline.startswith(rce_prefix):
# Convert hash to str since hashes dictionary has strings as keys
exp_hash = str(aline.strip(), 'utf-8')
exploit_name = exp_hashes.get(exp_hash, None)
if exploit_name is None:
print(f"Found GHRCE hash at line {line_num} of {stdout_file} but couldn't find exploit name. This is a bug!")
else:
# Mark exploit as vulnerable and remove it from other status' sets if already present (silently
# ignore if not)
result["vulnerable"].add(exploit_name)
result["not-vulnerable"].discard(exploit_name)
result["needs-verification"].discard(exploit_name)
result["error"].discard(exploit_name)
elif aline.startswith(name_prefix):
name = str(aline).strip().split(":")[-1].split("/")[-1][:-3]
return (name, target_ip, result)
def main():
args = create_argument_parser().parse_args()
logs_dir = pathlib.Path(args.logs_dir).resolve()
if not logs_dir.is_dir():
print(f"{logs_dir} is not a valid directory")
return
exp_hash_map_file = pathlib.Path(args.hash_data_file).resolve()
if not exp_hash_map_file.is_file():
print(f"{exp_hash_map_file} is not a valid file")
return
exp_hash_map = {}
with exp_hash_map_file.open('rb') as fh:
exp_hash_map = json.load(fh)
out_dir = logs_dir / "processed_data"
out_dir.mkdir(exist_ok=True)
print("Processing routersploit logs...", end="", flush=True)
processed_data = {"vulnerable": [], "not-vulnerable": [], "needs-verification": [], "error": []}
for afile in logs_dir.iterdir():
if afile.is_file() and afile.name.endswith(".stdout"):
# TODO: Fix firmware ID extract if name of log file changes
firmware_id = afile.name.rsplit('.', 1)[0]
name, target_ip, firmware_data = extract_exploit_status(afile, exp_hash_map)
for status in processed_data:
for exploit_name in firmware_data[status]:
processed_data[status].append((firmware_id, name, target_ip, exploit_name))
print("done.")
print("Dumping processed data to CSV...", end="", flush=True)
csv_header = ["Firmware ID", "Name", "Target IP", "Exploit name"]
error_out_file = out_dir / "error.csv"
not_vulnerable_out_file = out_dir / "not-vulnerable.csv"
needs_verification_out_file = out_dir / "needs-verification.csv"
vulnerable_out_file = out_dir / "vulnerable.csv"
if len(processed_data["error"]) > 0:
with error_out_file.open('w') as fh:
csvwriter = csv.writer(fh)
csvwriter.writerow(csv_header)
csvwriter.writerows(sorted(processed_data["error"]))
with needs_verification_out_file.open('w') as fh:
csvwriter = csv.writer(fh)
csvwriter.writerow(csv_header)
csvwriter.writerows(sorted(processed_data["needs-verification"]))
with not_vulnerable_out_file.open('w') as fh:
csvwriter = csv.writer(fh)
csvwriter.writerow(csv_header)
csvwriter.writerows(sorted(processed_data["not-vulnerable"]))
with vulnerable_out_file.open('w') as fh:
csvwriter = csv.writer(fh)
csvwriter.writerow(csv_header)
csvwriter.writerows(sorted(processed_data["vulnerable"]))
print(f"done. See {out_dir}.")
if __name__ == "__main__":
main()