-
Notifications
You must be signed in to change notification settings - Fork 572
Expand file tree
/
Copy pathmongo_hooks.py
More file actions
227 lines (196 loc) · 8.16 KB
/
mongo_hooks.py
File metadata and controls
227 lines (196 loc) · 8.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import itertools
import logging
from contextlib import suppress
from pymongo import UpdateOne, errors
from pymongo.errors import InvalidDocument, BulkWriteError
import bson
from dev_utils.mongodb import (
mongo_bulk_write,
mongo_delete_data,
mongo_delete_data_range,
mongo_delete_many,
mongo_find,
mongo_find_one,
mongo_hook,
mongo_insert_one,
mongo_update_many,
mongo_update_one,
)
log = logging.getLogger(__name__)
FILES_COLL = "files"
FILE_KEY = "sha256"
TASK_IDS_KEY = "_task_ids"
FILE_REF_KEY = "file_ref"
def normalize_file(file_dict, task_id):
"""Pull out the detonation-independent attributes of the given file and
return an UpdateOne object usable by bulk_write to upsert a
document into the FILES_COLL collection with its _id set to the FILE_KEY of
the file. The given file_dict is updated in place to remove those
attributes and add a FILE_REF_KEY key containing the FILE_KEY that can be
used as a lookup in the FILES_COLL collection.
If the file has already been "normalized," then it is not modified and
None is returned.
"""
if FILE_REF_KEY in file_dict:
# This has already been normalized.
return
key = file_dict.get(FILE_KEY, None)
if not key:
return
static_fields = (
# hashes
"crc32",
"md5",
"sha1",
"sha256",
"sha512",
"sha3_384",
"ssdeep",
"tlsh",
"rh_hash",
# other metadata & static analysis fields
"size",
"pe",
"ep_bytes",
"entrypoint",
"data",
"strings",
)
new_dict = {}
for fld in static_fields:
with suppress(KeyError):
new_dict[fld] = file_dict.pop(fld)
new_dict["_id"] = key
file_dict[FILE_REF_KEY] = key
return UpdateOne({"_id": key}, {"$set": new_dict, "$addToSet": {TASK_IDS_KEY: task_id}}, upsert=True, hint=[("_id", 1)])
@mongo_hook((mongo_insert_one, mongo_update_one), "analysis")
def normalize_files(report):
"""Take the detonation-independent file data from various parts of
the report and extract them out to a separate collection, keeping a
reference to it (along with the detonation-dependent fields) in the
report.
"""
requests = []
for file_dict in collect_file_dicts(report):
request = normalize_file(file_dict, report["info"]["id"])
if request:
requests.append(request)
try:
if requests:
mongo_bulk_write(FILES_COLL, requests, ordered=False)
except (errors.OperationFailure, InvalidDocument, BulkWriteError) as exc:
log.warning("Mongo hook 'normalize_files' failed: %s. Attempting to sanitize strings and retry.", exc)
for req in requests:
# req._doc is the update document: {"$set": new_dict, ...}
# Accessing private attribute _doc to modify in place for retry
try:
if hasattr(req, "_doc") and "$set" in req._doc and "strings" in req._doc["$set"]:
strings_val = req._doc["$set"]["strings"]
# Check if strings field alone is too large (buffer safe 15MB)
if strings_val and len(bson.encode({"strings": strings_val})) > 15 * 1024 * 1024:
log.warning("Truncating oversized strings field for retry.")
if isinstance(strings_val, list):
req._doc["$set"]["strings"] = strings_val[:1000]
else:
req._doc["$set"]["strings"] = []
# If still too large, clear it
if len(bson.encode({"strings": req._doc["$set"]["strings"]})) > 15 * 1024 * 1024:
req._doc["$set"]["strings"] = []
except Exception as e:
log.error("Failed to sanitize request during retry: %s", e)
# Retry the bulk write
try:
mongo_bulk_write(FILES_COLL, requests, ordered=False)
except Exception as retry_exc:
log.error("Retry of 'normalize_files' failed: %s", retry_exc)
return report
@mongo_hook(mongo_find, "analysis")
def denormalize_files_from_reports(reports):
"""Pull the file info from the FILES_COLL collection in to associated parts of
the reports.
"""
# Make sure we have a list whose objects we can modify in place instead of a mongo
# cursor as returned from mongo_find.
reports = list(reports)
file_dicts = [
file_dict
for file_dict in itertools.chain.from_iterable(collect_file_dicts(report) for report in reports)
if FILE_REF_KEY in file_dict
]
if not file_dicts:
# These are likely partial reports (like for an ajax request of a specific
# part of the report), had a projection applied that does not include any file
# information, or only the old-style of storing file information is present in
# these documents.
return reports
file_refs = {file_dict[FILE_REF_KEY] for file_dict in file_dicts}
file_docs = {}
batch_size = 50
file_ref_iter = iter(file_refs)
while batch := tuple(itertools.islice(file_ref_iter, batch_size)):
# Reduce the size of the $in clause when there are large numbers of file refs by
# making multiple requests, passing batches of refs in.
for file_doc in mongo_find(FILES_COLL, {"_id": {"$in": batch}}, {TASK_IDS_KEY: 0}):
file_docs[file_doc.pop("_id")] = file_doc
for file_dict in file_dicts:
if file_dict[FILE_REF_KEY] not in file_docs:
log.warning("Failed to find %s in %s collection.", FILES_COLL, file_dict[FILE_REF_KEY])
continue
file_doc = file_docs[file_dict.pop(FILE_REF_KEY)]
file_dict.update(file_doc)
return reports
@mongo_hook(mongo_find_one, "analysis")
def denormalize_files(report):
"""Pull the file info from the FILES_COLL collection in to associated parts of
the report.
"""
denormalize_files_from_reports([report])
return report
@mongo_hook(mongo_delete_data, "analysis")
def remove_task_references_from_files(task_ids):
"""Remove the given task_ids from the TASK_IDS_KEY field on "files"
documents that were referenced by those tasks that are being deleted.
"""
mongo_update_many(
FILES_COLL,
{TASK_IDS_KEY: {"$elemMatch": {"$in": task_ids}}},
{"$pullAll": {TASK_IDS_KEY: task_ids}},
)
@mongo_hook(mongo_delete_data_range, "analysis")
def remove_task_references_from_files_range(*, range_start: int = 0, range_end: int = 0):
"""Remove the given task_ids from the TASK_IDS_KEY field on "files"
documents that were referenced by those tasks that are being deleted.
"""
range_query = {}
if range_start > 0:
range_query["$gte"] = range_start
if range_end > 0:
range_query["$lt"] = range_end
if range_query:
mongo_update_many(
FILES_COLL,
{TASK_IDS_KEY: {"$elemMatch": range_query}},
{"$pull": {TASK_IDS_KEY: range_query}},
)
def delete_unused_file_docs():
"""Delete entries in the FILES_COLL collection that are no longer
referenced by any analysis tasks. This should typically be invoked
via utils/cleaners.py in a cron job.
"""
return mongo_delete_many(FILES_COLL, {TASK_IDS_KEY: {"$size": 0}})
NORMALIZED_FILE_FIELDS = ("target.file", "dropped", "CAPE.payloads", "procdump", "procmemory")
def collect_file_dicts(report) -> itertools.chain:
"""Return an iterable containing all of the candidates for files
from various parts of the report to be normalized.
"""
# ToDo extend to self extract
file_dicts = []
target_file = report.get("target", {}).get("file", None)
if target_file:
file_dicts.append([target_file])
file_dicts.append(report.get("dropped", None) or [])
file_dicts.append(report.get("CAPE", {}).get("payloads", None) or [])
file_dicts.append(report.get("procdump", None) or [])
if report.get("suricata", {}).get("files", []):
file_dicts.append(list(filter(None, [file_info.get("file_info", []) for file_info in report.get("suricata", {}).get("files", [])])))
return itertools.chain.from_iterable(file_dicts)