Skip to content

Commit 49f79a2

Browse files
Improve performance and error handling in bson part 2 (kevoreilly#2814)
* Improve performance and error handling in core modules Refactored compressor and behavior processing to use mmap for faster file access, added standard file reading fallback, and improved error handling and logging in guest and sniffer modules. Refactored behavior summary logic to use dispatch tables for API handling, enhancing maintainability and extensibility. Enhanced remote sniffer operations with timeouts and error logging for robustness. * sync * Apply suggestions from code review Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update modules/auxiliary/sniffer.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update compressor.py * Update compressor.py * Update compressor.py * Update compressor.py * fix --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
1 parent 5eee4ec commit 49f79a2

5 files changed

Lines changed: 491 additions & 271 deletions

File tree

conf/default/processing.conf.default

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ ram_boost = no
5454
replace_patterns = no
5555
file_activities = no
5656

57+
# process behavior files in ram to speedup processing a little bit?
58+
ram_mmap = no
59+
5760
[tracee]
5861
enabled = no
5962

lib/cuckoo/common/compressor.py

Lines changed: 128 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import binascii
22
import logging
3+
import mmap
34
import os
45
import struct
56
from pathlib import Path
@@ -76,50 +77,115 @@ def __init__(self):
7677
self.callmap = {}
7778
self.head = []
7879
self.ccounter = 0
80+
self.category = None
7981

80-
def __next_message(self):
81-
data = self.fd_in.read(4)
82-
if not data:
83-
return (False, False)
84-
_size = struct.unpack("I", data)[0]
85-
data += self.fd_in.read(_size - 4)
86-
self.raw_data = data
87-
return (data, bson.decode(data))
88-
89-
def run(self, file_path):
90-
if not os.path.isfile(file_path) and os.stat(file_path).st_size:
82+
def _process_message(self, msg, data):
83+
mtype = msg.get("type") # message type [debug, new_process, info]
84+
if mtype in {"debug", "new_process", "info"}:
85+
self.category = msg.get("category", "None")
86+
self.head.append(data.tobytes() if isinstance(data, memoryview) else data)
87+
88+
elif self.category and self.category.startswith("__"):
89+
self.head.append(data.tobytes() if isinstance(data, memoryview) else data)
90+
else:
91+
tid = msg.get("T", -1)
92+
time = msg.get("t", 0)
93+
94+
if tid not in self.threads:
95+
self.threads[tid] = Compressor(100)
96+
97+
csum = self.checksum(msg)
98+
self.ccounter += 1
99+
v = (csum, self.ccounter, time)
100+
self.threads[tid].add(v)
101+
102+
if csum not in self.callmap:
103+
self.callmap[csum] = msg
104+
105+
def _process_mmap_content(self, mm):
106+
with memoryview(mm) as mv:
107+
offset = 0
108+
size_mm = len(mm)
109+
110+
while offset < size_mm:
111+
# Read size (4 bytes)
112+
if offset + 4 > size_mm:
113+
break
114+
115+
# Slicing memoryview returns memoryview
116+
size_bytes = mv[offset : offset + 4]
117+
_size = struct.unpack("I", size_bytes)[0]
118+
119+
if offset + _size > size_mm:
120+
break
121+
122+
data = mv[offset : offset + _size]
123+
offset += _size
124+
125+
try:
126+
msg = bson.decode(data)
127+
except Exception:
128+
break
129+
130+
if msg:
131+
self._process_message(msg, data)
132+
133+
def run(self, file_path, use_mmap=False):
134+
if use_mmap:
135+
return self._run_mmap(file_path)
136+
return self._run_standard(file_path)
137+
138+
def _run_mmap(self, file_path):
139+
if not os.path.isfile(file_path) or not os.stat(file_path).st_size:
91140
log.warning("File %s does not exists or it is invalid", file_path)
92141
return False
93142

94-
self.fd_in = open(file_path, "rb")
143+
with open(file_path, "rb") as f:
144+
try:
145+
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
146+
except ValueError:
147+
return False
95148

96-
msg = "---"
97-
while msg:
98-
data, msg = self.__next_message()
149+
try:
150+
self._process_mmap_content(mm)
151+
finally:
152+
mm.close()
99153

100-
if msg:
101-
mtype = msg.get("type") # message type [debug, new_process, info]
102-
if mtype in {"debug", "new_process", "info"}:
103-
self.category = msg.get("category", "None")
104-
self.head.append(data)
154+
return self.flush(file_path)
105155

106-
elif self.category.startswith("__"):
107-
self.head.append(data)
108-
else:
109-
tid = msg.get("T", -1)
110-
time = msg.get("t", 0)
156+
def _run_standard(self, file_path):
157+
if not os.path.isfile(file_path) or not os.stat(file_path).st_size:
158+
log.warning("File %s does not exists or it is invalid", file_path)
159+
return False
160+
161+
with open(file_path, "rb") as f:
162+
while True:
163+
size_bytes = f.read(4)
164+
if len(size_bytes) < 4:
165+
break
166+
167+
try:
168+
_size = struct.unpack("I", size_bytes)[0]
169+
except struct.error:
170+
break
111171

112-
if tid not in self.threads:
113-
self.threads[tid] = Compressor(100)
172+
remaining = _size - 4
173+
if remaining < 0:
174+
break
114175

115-
csum = self.checksum(msg)
116-
self.ccounter += 1
117-
v = (csum, self.ccounter, time)
118-
self.threads[tid].add(v)
176+
data_body = f.read(remaining)
177+
if len(data_body) < remaining:
178+
break
119179

120-
if csum not in self.callmap:
121-
self.callmap[csum] = msg
122-
self.fd_in.close()
180+
data = size_bytes + data_body
181+
182+
try:
183+
msg = bson.decode(data)
184+
except Exception:
185+
break
186+
187+
if msg:
188+
self._process_message(msg, data)
123189

124190
return self.flush(file_path)
125191

@@ -171,3 +237,31 @@ def checksum(self, msg):
171237
content = f"{index}{msg['T']}{msg['R']}{args}{self.category}{msg['P']}"
172238

173239
return binascii.crc32(content.encode("utf8"))
240+
241+
242+
if __name__ == "__main__":
243+
import argparse
244+
import time
245+
import sys
246+
247+
parser = argparse.ArgumentParser()
248+
parser.add_argument("file", help="Path to BSON file to compress")
249+
parser.add_argument("--mmap", action="store_true", help="Use mmap for compression")
250+
args = parser.parse_args()
251+
252+
if not os.path.exists(args.file):
253+
print(f"File {args.file} not found.")
254+
sys.exit(1)
255+
256+
print(f"Compressing {args.file}...")
257+
start = time.time()
258+
259+
compressor = CuckooBsonCompressor()
260+
result = compressor.run(args.file, use_mmap=args.mmap)
261+
262+
end = time.time()
263+
264+
if result:
265+
print(f"Compression successful. Took {end - start:.4f} seconds.")
266+
else:
267+
print("Compression failed.")

lib/cuckoo/core/guest.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,11 @@ def get(self, method, *args, **kwargs):
107107

108108
try:
109109
r = session.get(url, *args, **kwargs)
110-
except requests.ConnectionError:
110+
except requests.ConnectionError as e:
111111
raise CuckooGuestError(
112-
"CAPE Agent failed without error status, please try "
113-
"upgrading to the latest version of agent.py (>= 0.10) and "
114-
"notify us if the issue persists"
112+
f"CAPE Agent failed without error status, please try "
113+
f"upgrading to the latest version of agent.py (>= 0.10) and "
114+
f"notify us if the issue persists. Error: {e}"
115115
)
116116

117117
do_raise and r.raise_for_status()
@@ -134,11 +134,11 @@ def post(self, method, *args, **kwargs):
134134

135135
try:
136136
r = session.post(url, *args, **kwargs)
137-
except requests.ConnectionError:
137+
except requests.ConnectionError as e:
138138
raise CuckooGuestError(
139-
"CAPE Agent failed without error status, please try "
140-
"upgrading to the latest version of agent.py (>= 0.10) and "
141-
"notify us if the issue persists"
139+
f"CAPE Agent failed without error status, please try "
140+
f"upgrading to the latest version of agent.py (>= 0.10) and "
141+
f"notify us if the issue persists. Error: {e}"
142142
)
143143

144144
r.raise_for_status()
@@ -377,14 +377,15 @@ def wait_for_completion(self):
377377

378378
try:
379379
status = self.get("/status", timeout=5).json()
380-
except (CuckooGuestError, requests.exceptions.ReadTimeout):
380+
except (CuckooGuestError, requests.exceptions.ReadTimeout) as e:
381381
# this might fail due to timeouts or just temporary network
382382
# issues thus we don't want to abort the analysis just yet and
383383
# wait for things to recover
384384
log.warning(
385-
"Task #%s: Virtual Machine %s /status failed. This can indicate the guest losing network connectivity",
385+
"Task #%s: Virtual Machine %s /status failed. This can indicate the guest losing network connectivity. Error: %s",
386386
self.task_id,
387387
self.vmid,
388+
e,
388389
)
389390
continue
390391
except Exception as e:

modules/auxiliary/sniffer.py

Lines changed: 57 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -191,27 +191,43 @@ def start(self):
191191
f.write(f"echo $PID > /tmp/{self.task.id}.pid")
192192
f.write("\n")
193193

194-
subprocess.check_output(
195-
["scp", "-q", f"/tmp/{self.task.id}.sh", remote_host + f":/tmp/{self.task.id}.sh"],
196-
)
197-
subprocess.check_output(
198-
["ssh", remote_host, "nohup", "/bin/bash", f"/tmp/{self.task.id}.sh", ">", "/tmp/log", "2>", "/tmp/err"],
199-
)
200-
201-
self.pid = subprocess.check_output(
202-
["ssh", remote_host, "cat", f"/tmp/{self.task.id}.pid"], stderr=subprocess.DEVNULL
203-
).strip()
204-
log.info(
205-
"Started remote sniffer @ %s with (interface=%s, host=%s, dump path=%s, pid=%s)",
206-
remote_host,
207-
interface,
208-
host,
209-
file_path,
210-
self.pid,
211-
)
212-
subprocess.check_output(
213-
["ssh", remote_host, "rm", "-f", f"/tmp/{self.task.id}.pid", f"/tmp/{self.task.id}.sh"],
214-
)
194+
try:
195+
subprocess.check_output(
196+
["scp", "-q", f"/tmp/{self.task.id}.sh", remote_host + f":/tmp/{self.task.id}.sh"], timeout=30
197+
)
198+
subprocess.check_output(
199+
[
200+
"ssh",
201+
remote_host,
202+
"nohup",
203+
"/bin/bash",
204+
f"/tmp/{self.task.id}.sh",
205+
">",
206+
"/tmp/log",
207+
"2>",
208+
"/tmp/err",
209+
],
210+
timeout=30,
211+
)
212+
213+
self.pid = subprocess.check_output(
214+
["ssh", remote_host, "cat", f"/tmp/{self.task.id}.pid"], stderr=subprocess.DEVNULL, timeout=30
215+
).strip()
216+
log.info(
217+
"Started remote sniffer @ %s with (interface=%s, host=%s, dump path=%s, pid=%s)",
218+
remote_host,
219+
interface,
220+
host,
221+
file_path,
222+
self.pid,
223+
)
224+
subprocess.check_output(
225+
["ssh", remote_host, "rm", "-f", f"/tmp/{self.task.id}.pid", f"/tmp/{self.task.id}.sh"], timeout=30
226+
)
227+
except subprocess.TimeoutExpired:
228+
log.error("Timeout connecting to remote host %s", remote_host)
229+
except subprocess.CalledProcessError as e:
230+
log.error("Error connecting to remote host %s: %s", remote_host, e)
215231

216232
else:
217233
try:
@@ -220,7 +236,13 @@ def start(self):
220236
log.exception("Failed to start sniffer (interface=%s, host=%s, dump path=%s)", interface, host, file_path)
221237
return
222238

223-
log.info("Started sniffer with PID %d (interface=%s, host=%s, dump path=%s)", self.proc.pid, interface, host, file_path)
239+
log.info(
240+
"Started sniffer with PID %d (interface=%s, host=%s, dump path=%s)",
241+
self.proc.pid,
242+
interface,
243+
host,
244+
file_path,
245+
)
224246

225247
def stop(self):
226248
"""Stop sniffing.
@@ -235,22 +257,26 @@ def stop(self):
235257
remote_host = self.options.get("host", "")
236258
remote_args = ["ssh", remote_host, "kill", "-2", self.pid]
237259

238-
subprocess.check_output(remote_args)
260+
try:
261+
subprocess.check_output(remote_args, timeout=30)
239262

240-
file_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(self.task.id), "dump.pcap")
241-
file_path2 = f"/tmp/tcp.dump.{self.task.id}"
263+
file_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(self.task.id), "dump.pcap")
264+
file_path2 = f"/tmp/tcp.dump.{self.task.id}"
242265

243-
subprocess.check_output(["scp", "-q", f"{remote_host}:{file_path2}", file_path])
244-
subprocess.check_output(["ssh", remote_host, "rm", "-f", file_path2])
266+
subprocess.check_output(["scp", "-q", f"{remote_host}:{file_path2}", file_path], timeout=300)
267+
subprocess.check_output(["ssh", remote_host, "rm", "-f", file_path2], timeout=30)
268+
except (subprocess.TimeoutExpired, subprocess.CalledProcessError) as e:
269+
log.error("Error stopping remote sniffer: %s", e)
245270
return
246271

247272
if self.proc and not self.proc.poll():
248273
if self.proc.args[0] == self.sudo_path and "-Z" in self.proc.args:
249274
# We must kill the child process that sudo spawned. We won't
250275
# have permission to kill the parent process because it's owned by root.
251276
try:
252-
pid = int(subprocess.check_output(["ps", "--ppid", str(self.proc.pid), "-o", "pid="]).decode())
253-
except (subprocess.CalledProcessError, TypeError, ValueError):
277+
output = subprocess.check_output(["ps", "--ppid", str(self.proc.pid), "-o", "pid="]).decode().strip()
278+
pid = int(output.split()[0])
279+
except (subprocess.CalledProcessError, TypeError, ValueError, IndexError):
254280
log.exception("Failed to get child pid of sudo process to stop the sniffer.")
255281
return
256282
term_func = functools.partial(os.kill, pid, signal.SIGTERM)
@@ -261,14 +287,14 @@ def stop(self):
261287
pid = self.proc.pid
262288
try:
263289
term_func()
264-
_, _ = self.proc.communicate()
290+
_, _ = self.proc.communicate(timeout=5)
265291
except Exception as e:
266292
log.error("Unable to stop the sniffer (first try) with pid %d: %s", pid, e)
267293
try:
268294
if not self.proc.poll():
269295
log.debug("Killing sniffer")
270296
kill_func()
271-
_, _ = self.proc.communicate()
297+
_, _ = self.proc.communicate(timeout=5)
272298
except OSError as e:
273299
log.debug("Error killing sniffer: %s, continuing", e)
274300
except Exception as e:

0 commit comments

Comments
 (0)