Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions conf/default/api.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,36 @@ rps = 1/s
#rpm = 10/m
mcp = no

# Pull TLS keylog material (tlsdump.log / sslkeys.log / master_keys.log).
# These are sensitive — they decrypt captured TLS flows — so gated separately
# from other downloads even if operator has enabled [taskpcap].
[tasktlskeys]
enabled = yes
auth_only = no
rps = 1/s
#rpm = 10/m
mcp = no

# Pull ETW JSON logs (dns_etw.json, network_etw.json, wmi_etw.json, amsi_etw/).
# Companion to [taskevtx] — these cover the ETW-sourced signals used for
# process→network attribution and script-content inspection.
[tasketw]
enabled = yes
auth_only = no
rps = 1/s
#rpm = 10/m
mcp = no

# Bulk directory zip endpoints — logs/, network/, memory/, selfextracted/.
# These can be large and expose a lot of artifacts at once; disable if you
# don't want operators pulling entire analysis trees over the API.
[taskbulkzip]
enabled = yes
auth_only = no
rps = 1/s
#rpm = 10/m
mcp = no

# Pull the dropped files from a specific task
[taskdropped]
enabled = yes
Expand Down
26 changes: 20 additions & 6 deletions lib/cuckoo/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,27 +135,35 @@ def is_text_file(file_info, destination_folder, buf, file_data=False):
return file_data.decode("latin-1")


def create_zip(files=False, folder=False, encrypted=False):
def create_zip(files=False, folder=False, encrypted=False, temp_file=False):
"""Utility function to create zip archive with file(s)
@param files: file or list of files
@param folder: path to folder to compress
@param encrypted: create password protected and AES encrypted file
@param temp_file: if True, returns a tempfile.NamedTemporaryFile instead of BytesIO
"""

if folder:
# To avoid when we have only folder argument
if not files:
files = []
files += [os.path.join(folder, file) for file in os.listdir(folder)]
for root, _, fnames in os.walk(folder):
for fname in fnames:
files.append(os.path.join(root, fname))

if not isinstance(files, list):
files = [files]

mem_zip = BytesIO()
if temp_file:
mem_zip = tempfile.NamedTemporaryFile(delete=True)
else:
mem_zip = BytesIO()

if encrypted and HAVE_PYZIPPER:
zipper = pyzipper.AESZipFile(mem_zip, "w", compression=pyzipper.ZIP_DEFLATED, encryption=pyzipper.WZ_AES)
else:
zipper = zipfile.ZipFile(mem_zip, "a", zipfile.ZIP_DEFLATED, False)
zipper = zipfile.ZipFile(mem_zip, "w" if temp_file else "a", zipfile.ZIP_DEFLATED, False)

with zipper as zf:
if encrypted:
zf.setpassword(zippwd)
Expand All @@ -164,8 +172,14 @@ def create_zip(files=False, folder=False, encrypted=False):
log.error("File does't exist: %s", file)
continue

parent_folder = os.path.dirname(file).rsplit(os.sep, 1)[-1]
path = os.path.join(parent_folder, os.path.basename(file))
if folder and file.startswith(folder):
rel_path = os.path.relpath(file, folder)
folder_basename = os.path.basename(os.path.normpath(folder))
path = os.path.join(folder_basename, rel_path)
else:
parent_folder = os.path.dirname(file).rsplit(os.sep, 1)[-1]
path = os.path.join(parent_folder, os.path.basename(file))

zf.write(file, path)
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zf.write(file, path) will follow symlinks by default. If any archived entry is a symlink (e.g., introduced via extracted artifacts), this can zip and expose the link target outside the intended folder. Consider skipping symlinks and/or using follow_symlinks=False plus path_safe()/Path.resolve() validation for each file before archiving.

Copilot uses AI. Check for mistakes.

mem_zip.seek(0)
Expand Down
4 changes: 4 additions & 0 deletions web/apiv2/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@
re_path(r"^tasks/get/procmemory/(?P<task_id>\d+)/(?P<pid>\d{1,5})/$", views.tasks_procmemory),
re_path(r"^tasks/get/fullmemory/(?P<task_id>\d+)/$", views.tasks_fullmemory),
re_path(r"^tasks/get/pcap/(?P<task_id>\d+)/$", views.tasks_pcap),
re_path(r"^tasks/get/pcap/(?P<task_id>\d+)/(?P<variant>\w+)/$", views.tasks_pcap_variant),
re_path(r"^tasks/get/tlspcap/(?P<task_id>\d+)/$", views.tasks_tlspcap),
re_path(r"^tasks/get/keys/(?P<task_id>\d+)/(?P<kind>\w+)/$", views.tasks_keys),
re_path(r"^tasks/get/etw/(?P<task_id>\d+)/(?P<kind>\w+)/$", views.tasks_etw),
re_path(r"^tasks/get/bulkzip/(?P<task_id>\d+)/(?P<folder>\w+)/$", views.tasks_bulkzip),
re_path(r"^tasks/get/evtx/(?P<task_id>\d+)/$", views.tasks_evtx),
re_path(r"^tasks/get/dropped/(?P<task_id>\d+)/$", views.tasks_dropped),
re_path(r"^tasks/get/selfextracted/(?P<task_id>\d+)/$", views.tasks_selfextracted),
Expand Down
263 changes: 245 additions & 18 deletions web/apiv2/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import socket
import subprocess
import sys
import tempfile
import zipfile
from datetime import datetime, timedelta
from io import BytesIO
Expand Down Expand Up @@ -1653,34 +1654,260 @@ def tasks_pcap(request, task_id):
return Response(resp)


@csrf_exempt
@api_view(["GET"])
def tasks_tlspcap(request, task_id):
if not apiconf.tasktlspcap.get("enabled"):
resp = {"error": True, "error_value": "TLS PCAP download API is disabled"}
return Response(resp)
def _resolve_task_id(task_id, enabled_key, check_tlp=True):
"""Shared preamble for artifact-download endpoints.

Returns ((task_id, None)) on success or ((None, Response(error))) on failure.
`enabled_key` names the apiconf section that gates the endpoint; callers
that want to share a gate (e.g. all pcap variants under [taskpcap]) reuse
the same key. TLP:RED checks are skipped only for endpoints that need
to serve regardless (none at present)."""
Comment on lines +1663 to +1664
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _resolve_task_id docstring says TLP:RED checks are skipped only for endpoints that need it ("none at present"), but tasks_tlspcap() calls it with check_tlp=False. Either update the docstring (and rationale) or enable the TLP check here for consistency with other download endpoints.

Suggested change
the same key. TLP:RED checks are skipped only for endpoints that need
to serve regardless (none at present)."""
the same key. By default, TLP:RED tasks are blocked; callers may pass
`check_tlp=False` for endpoints that intentionally need to bypass that
restriction."""

Copilot uses AI. Check for mistakes.
section = getattr(apiconf, enabled_key, None)
if section is not None and not section.get("enabled"):
return None, Response({"error": True, "error_value": "%s download API is disabled" % enabled_key})
check = validate_task(task_id)
if check["error"]:
return Response(check)

return None, Response(check)
if check_tlp and (check.get("tlp") or "").lower() == "red":
return None, Response({"error": True, "error_value": "Task has a TLP of RED"})
rtid = check.get("rtid", 0)
if rtid:
task_id = rtid
return task_id, None


srcfile = os.path.join(CUCKOO_ROOT, "storage", "analyses", "%s" % task_id, "polarproxy", "tls.pcap")
def _serve_analysis_file(task_id, rel_path, download_name, content_type="application/octet-stream"):
"""Stream `<analysis>/<rel_path>` back as an attachment. Returns a Response
object (either a StreamingHttpResponse for success, or a JSON error)."""
srcfile = os.path.join(CUCKOO_ROOT, "storage", "analyses", "%s" % task_id, rel_path)
if not os.path.normpath(srcfile).startswith(ANALYSIS_BASE_PATH):
return render(request, "error.html", {"error": f"File not found: {os.path.basename(srcfile)}"})
if path_exists(srcfile):
fname = "%s_tls.pcap" % task_id
resp = StreamingHttpResponse(FileWrapper(open(srcfile, "rb"), 8096), content_type="application/vnd.tcpdump.pcap")
resp["Content-Length"] = os.path.getsize(srcfile)
resp["Content-Disposition"] = "attachment; filename=" + fname
return Response({"error": True, "error_value": "Invalid path"})
if not path_exists(srcfile) or os.path.getsize(srcfile) == 0:
return Response({"error": True, "error_value": f"{os.path.basename(rel_path)} does not exist"})
Comment on lines +1682 to +1686
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The path validation uses normpath(...).startswith(ANALYSIS_BASE_PATH), which does not account for symlinks. If an analysis artifact path is a symlink to a file outside the analysis directory, this check would still pass and could expose host files. Consider validating with path_safe()/Path.resolve() and/or rejecting symlinks before opening the file.

Copilot uses AI. Check for mistakes.
resp = StreamingHttpResponse(FileWrapper(open(srcfile, "rb"), 8192), content_type=content_type)
resp["Content-Length"] = os.path.getsize(srcfile)
resp["Content-Disposition"] = f"attachment; filename={task_id}_{download_name}"
return resp


def _zip_paths(task_id, pairs, download_name):
"""Zip (archive_name, absolute_path) pairs into a disk-backed temporary archive and
return it as a StreamingHttpResponse. Missing / empty sources are skipped."""
buf = tempfile.NamedTemporaryFile(delete=True)
written = 0
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for arcname, p in pairs:
if path_exists(p) and os.path.getsize(p) > 0:
zf.write(p, arcname)
written += 1
if not written:
buf.close()
return Response({"error": True, "error_value": "No artifacts available for this task"})
buf.seek(0, os.SEEK_END)
size = buf.tell()
buf.seek(0)
resp = StreamingHttpResponse(FileWrapper(buf, 8192), content_type="application/zip")
resp["Content-Length"] = size
resp["Content-Disposition"] = f"attachment; filename={task_id}_{download_name}"
return resp


def _serve_folder_zip(task_id, rel_folder, download_name, empty_msg=None):
"""Encrypt-zip an entire directory under the analysis dir and stream it.
Uses `create_zip` (password = ZIP_PWD) for parity with tasks_dropped /
tasks_payloadfiles. Returns a Response with a JSON error if the folder
doesn't exist or is empty."""
srcdir = os.path.join(CUCKOO_ROOT, "storage", "analyses", "%s" % task_id, rel_folder)
if not os.path.normpath(srcdir).startswith(ANALYSIS_BASE_PATH):
return Response({"error": True, "error_value": "Invalid path"})
if not path_exists(srcdir) or not os.listdir(srcdir):
return Response({"error": True, "error_value": empty_msg or f"No {rel_folder} artifacts for task {task_id}"})
mem_zip = create_zip(folder=srcdir, encrypted=True, temp_file=True)
if mem_zip is False:
return Response({"error": True, "error_value": "Can't create zip archive"})
Comment on lines +1717 to +1727
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_serve_folder_zip() assumes encrypted=True yields an AES-encrypted archive, but create_zip() only encrypts when pyzipper is available; otherwise it silently produces an unencrypted zip. If these endpoints must always be encrypted for safety, consider explicitly checking HAVE_PYZIPPER and returning a clear error when it's missing (or adjust the messaging/docs accordingly).

Suggested change
Uses `create_zip` (password = ZIP_PWD) for parity with tasks_dropped /
tasks_payloadfiles. Returns a Response with a JSON error if the folder
doesn't exist or is empty."""
srcdir = os.path.join(CUCKOO_ROOT, "storage", "analyses", "%s" % task_id, rel_folder)
if not os.path.normpath(srcdir).startswith(ANALYSIS_BASE_PATH):
return Response({"error": True, "error_value": "Invalid path"})
if not path_exists(srcdir) or not os.listdir(srcdir):
return Response({"error": True, "error_value": empty_msg or f"No {rel_folder} artifacts for task {task_id}"})
mem_zip = create_zip(folder=srcdir, encrypted=True, temp_file=True)
if mem_zip is False:
return Response({"error": True, "error_value": "Can't create zip archive"})
Uses AES encryption with `pyzipper` (password = ZIP_PWD). Returns a
Response with a JSON error if the folder doesn't exist, is empty, or the
archive cannot be created."""
srcdir = os.path.join(CUCKOO_ROOT, "storage", "analyses", "%s" % task_id, rel_folder)
if not os.path.normpath(srcdir).startswith(ANALYSIS_BASE_PATH):
return Response({"error": True, "error_value": "Invalid path"})
if not path_exists(srcdir) or not os.listdir(srcdir):
return Response({"error": True, "error_value": empty_msg or f"No {rel_folder} artifacts for task {task_id}"})
mem_zip = tempfile.NamedTemporaryFile(delete=True)
written = 0
try:
with pyzipper.AESZipFile(mem_zip, "w", compression=pyzipper.ZIP_DEFLATED, encryption=pyzipper.WZ_AES) as zf:
zf.setpassword(ZIP_PWD.encode())
for root, _, files in os.walk(srcdir):
for filename in files:
filepath = os.path.join(root, filename)
if path_exists(filepath) and os.path.getsize(filepath) > 0:
arcname = os.path.relpath(filepath, srcdir)
zf.write(filepath, arcname)
written += 1
except Exception:
mem_zip.close()
return Response({"error": True, "error_value": "Can't create zip archive"})
if not written:
mem_zip.close()
return Response({"error": True, "error_value": empty_msg or f"No {rel_folder} artifacts for task {task_id}"})

Copilot uses AI. Check for mistakes.
mem_zip.seek(0, os.SEEK_END)
size = mem_zip.tell()
mem_zip.seek(0)
resp = StreamingHttpResponse(FileWrapper(mem_zip, 8192), content_type="application/zip")
resp["Content-Length"] = size
resp["Content-Disposition"] = f"attachment; filename={task_id}_{download_name}"
return resp


@csrf_exempt
@api_view(["GET"])
def tasks_tlspcap(request, task_id):
"""Back-compat endpoint: originally served PolarProxy's tls.pcap. We've
since moved to SSLproxy + GoGoRoboCap which produces dump_decrypted.pcap;
prefer that, but fall back to the legacy path for old analyses."""
task_id, err = _resolve_task_id(task_id, "tasktlspcap", check_tlp=False)
if err:
return err

decrypted = os.path.join(CUCKOO_ROOT, "storage", "analyses", "%s" % task_id, "dump_decrypted.pcap")
legacy = os.path.join(CUCKOO_ROOT, "storage", "analyses", "%s" % task_id, "polarproxy", "tls.pcap")
for srcfile, fname in ((decrypted, "dump_decrypted.pcap"), (legacy, "tls.pcap")):
if not os.path.normpath(srcfile).startswith(ANALYSIS_BASE_PATH):
continue
if path_exists(srcfile) and os.path.getsize(srcfile) > 0:
resp = StreamingHttpResponse(
FileWrapper(open(srcfile, "rb"), 8096), content_type="application/vnd.tcpdump.pcap"
)
resp["Content-Length"] = os.path.getsize(srcfile)
resp["Content-Disposition"] = f"attachment; filename={task_id}_{fname}"
return resp
Comment on lines +1756 to +1758
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tasks_tlspcap is described as a back-compat endpoint, but it changes the downloaded filename from the previous *_tls.pcap to *_{dump_decrypted.pcap|tls.pcap}. Clients that key off the attachment name may break; consider keeping the historical filename (e.g., always <task>_tls.pcap) while serving the preferred content.

Copilot uses AI. Check for mistakes.
return Response({"error": True, "error_value": "TLS PCAP does not exist"})


# Variant tables used by the consolidated dispatcher endpoints. Each handler
# validates <variant> against a whitelist before touching the filesystem so
# the URL parameter can't be used to probe paths outside the analysis dir.

_PCAP_VARIANTS = {
"decrypted": ("dump_decrypted.pcap", "dump_decrypted.pcap"),
"mixed": ("dump_mixed.pcap", "dump_mixed.pcap"),
"sslproxy": (os.path.join("sslproxy", "sslproxy.pcap"), "sslproxy.pcap"),
}

_KEY_SOURCES = {
"tls": (os.path.join("tlsdump", "tlsdump.log"), "tlsdump.log"),
"ssl": (os.path.join("aux", "sslkeylogfile", "sslkeys.log"), "sslkeys.log"),
"master": (os.path.join("sslproxy", "master_keys.log"), "master_keys.log"),
}

_ETW_JSON_SOURCES = {
"dns": (os.path.join("aux", "dns_etw.json"), "dns_etw.json"),
"network": (os.path.join("aux", "network_etw.json"), "network_etw.json"),
"wmi": (os.path.join("aux", "wmi_etw.json"), "wmi_etw.json"),
}

_BULKZIP_FOLDERS = {"logs", "network", "memory", "selfextracted"}


def _pcapng_response(task_id):
"""On-the-fly PCAPNG with TLS keylog records embedded. Output goes to
a per-request tempfile — concurrent callers must not race on a shared
path inside the analysis dir."""
try:
from lib.cuckoo.common.pcap_utils import PcapToNg
except ImportError:
return Response({"error": True, "error_value": "PCAPNG conversion helper unavailable"})
adir = os.path.join(CUCKOO_ROOT, "storage", "analyses", "%s" % task_id)
pcap_path = os.path.join(adir, "dump.pcap")
if not path_exists(pcap_path):
return Response({"error": True, "error_value": "dump.pcap does not exist"})
tls_log_path = os.path.join(adir, "tlsdump", "tlsdump.log")
ssl_key_log_path = os.path.join(adir, "aux", "sslkeylogfile", "sslkeys.log")
tmp = tempfile.NamedTemporaryFile(prefix=f"{task_id}_pcapng_", suffix=".pcapng", delete=False)
tmp.close()
try:
PcapToNg(pcap_path, tls_log_path, ssl_key_log_path).generate(tmp.name)
if not path_exists(tmp.name) or os.path.getsize(tmp.name) == 0:
return Response({"error": True, "error_value": "PCAPNG generation failed"})
size = os.path.getsize(tmp.name)
# Hand the open fd to the streaming response; unlinking the path now
# keeps the fd alive through streaming and lets the kernel reclaim
# the inode as soon as the response finishes.
fd = open(tmp.name, "rb")
try:
os.unlink(tmp.name)
except OSError:
pass
resp = StreamingHttpResponse(FileWrapper(fd, 8192), content_type="application/x-pcapng")
resp["Content-Length"] = size
resp["Content-Disposition"] = f"attachment; filename={task_id}_dump.pcapng"
return resp
except Exception:
try:
os.unlink(tmp.name)
except OSError:
pass
raise

else:
resp = {"error": True, "error_value": "TLS PCAP does not exist"}
return Response(resp)

def _pcapzip_response(task_id):
"""Zip every available pcap variant (original, decrypted, mixed, sslproxy
raw, sslproxy cleaned). Variants that are missing or empty are silently
dropped so consumers only receive what actually ran."""
adir = os.path.join(CUCKOO_ROOT, "storage", "analyses", "%s" % task_id)
pairs = [
("dump.pcap", os.path.join(adir, "dump.pcap")),
("dump_decrypted.pcap", os.path.join(adir, "dump_decrypted.pcap")),
("dump_mixed.pcap", os.path.join(adir, "dump_mixed.pcap")),
("sslproxy.pcap", os.path.join(adir, "sslproxy", "sslproxy.pcap")),
("sslproxy_clean.pcap", os.path.join(adir, "sslproxy", "sslproxy_clean.pcap")),
]
return _zip_paths(task_id, pairs, "pcaps.zip")


@csrf_exempt
@api_view(["GET"])
def tasks_pcap_variant(request, task_id, variant):
"""Alternate PCAP artifacts for <task_id>. variant ∈
{decrypted, mixed, sslproxy, zip, pcapng}. The bare tasks/get/pcap/<id>/
remains for back-compat with existing callers (serves dump.pcap)."""
task_id, err = _resolve_task_id(task_id, "taskpcap")
if err:
return err
v = (variant or "").lower()
if v in _PCAP_VARIANTS:
rel_path, fname = _PCAP_VARIANTS[v]
return _serve_analysis_file(task_id, rel_path, fname, content_type="application/vnd.tcpdump.pcap")
if v == "zip":
return _pcapzip_response(task_id)
if v == "pcapng":
return _pcapng_response(task_id)
return Response({"error": True, "error_value": f"Unknown pcap variant: {variant}"})


@csrf_exempt
@api_view(["GET"])
def tasks_keys(request, task_id, kind):
"""TLS keylog material. kind ∈ {tls, ssl, master} — each refers to a
different hook source (tls: MockSSL → tlsdump.log; ssl: bcrypt/NCrypt →
aux/sslkeylogfile/sslkeys.log; master: SSLproxy → master_keys.log).
All three are NSS-format keylogs."""
task_id, err = _resolve_task_id(task_id, "tasktlskeys")
if err:
return err
k = (kind or "").lower()
if k not in _KEY_SOURCES:
return Response({"error": True, "error_value": f"Unknown keys kind: {kind}"})
rel_path, fname = _KEY_SOURCES[k]
return _serve_analysis_file(task_id, rel_path, fname, content_type="text/plain")


@csrf_exempt
@api_view(["GET"])
def tasks_etw(request, task_id, kind):
"""ETW telemetry downloads. kind ∈ {dns, network, wmi} each map to an
NDJSON stream; kind == amsi zips the per-buffer AMSI script captures."""
task_id, err = _resolve_task_id(task_id, "tasketw")
if err:
return err
k = (kind or "").lower()
if k in _ETW_JSON_SOURCES:
rel_path, fname = _ETW_JSON_SOURCES[k]
return _serve_analysis_file(task_id, rel_path, fname, content_type="application/x-ndjson")
if k == "amsi":
return _serve_folder_zip(task_id, os.path.join("aux", "amsi_etw"), "amsi_etw.zip")
return Response({"error": True, "error_value": f"Unknown etw kind: {kind}"})


@csrf_exempt
@api_view(["GET"])
def tasks_bulkzip(request, task_id, folder):
"""Encrypt-zip an entire analysis subdirectory. folder is whitelisted
to {logs, network, memory, selfextracted}. Archive is AES-encrypted
with ZIP_PWD for parity with tasks_dropped / tasks_payloadfiles /
tasks_procdumpfiles."""
task_id, err = _resolve_task_id(task_id, "taskbulkzip")
if err:
return err
f = (folder or "").lower()
if f not in _BULKZIP_FOLDERS:
return Response({"error": True, "error_value": f"Unknown bulkzip folder: {folder}"})
return _serve_folder_zip(task_id, f, f"{f}.zip")


@csrf_exempt
Expand Down
Loading