diff --git a/analyzer/windows/tests/test_analyzer.py b/analyzer/windows/tests/test_analyzer.py index 353b6401a68..bbe274844c4 100644 --- a/analyzer/windows/tests/test_analyzer.py +++ b/analyzer/windows/tests/test_analyzer.py @@ -977,9 +977,8 @@ def test_handle_process(self, mock_process): # TODO add a couple of mocks random_pid = random.randint(1, 99999999) random_tid = random.randint(1, 9999999) - suspended = 1 - data = bytes(f"{suspended}:{random_pid},{random_tid}".encode()) - # This produces something like b"1:910271,1819029" + data = bytes(f"{random_pid},{random_tid}".encode()) + # This produces something like b"910271,1819029" with patch("analyzer.INJECT_LIST", []): self.pipe_handler._handle_process(data=data) self.assertEqual(1, len(analyzer.INJECT_LIST)) diff --git a/conf/default/web.conf.default b/conf/default/web.conf.default index 5140fd03414..60a44c83bc6 100644 --- a/conf/default/web.conf.default +++ b/conf/default/web.conf.default @@ -223,3 +223,6 @@ packages = [yara_detail] enabled = no + +[pcap_ng] +enabled = no diff --git a/lib/cuckoo/common/pcap_utils.py b/lib/cuckoo/common/pcap_utils.py new file mode 100644 index 00000000000..75302a59850 --- /dev/null +++ b/lib/cuckoo/common/pcap_utils.py @@ -0,0 +1,199 @@ +import logging +import shutil +import subprocess +import tempfile +from pathlib import Path + +from utils.tls import tlslog_to_sslkeylogfile + +EDITCAP = "editcap" +EDITCAP_TIMEOUT = 60 + +log = logging.getLogger(__name__) + + +def append_file_contents_to_file(file_with_contents: Path, append_to_file: Path): + """Append the contents of one file to another file. + + Args: + file_with_contents: Path to the source file to read from + append_to_file: Path to the destination file to append to + """ + with file_with_contents.open("r") as src, append_to_file.open("a+") as dst: + dst.write(src.read()) + + +def file_exists_not_empty(path: Path | None) -> bool: + """Check if a file exists and is not empty. + + Args: + path: Path to the file to check, or None + + Returns: + True if the path is not None, the file exists, and has size > 0, False otherwise + """ + return bool(path and path.exists() and path.stat().st_size > 0) + + +def generate_pcapng(sslkeylogfile_path: Path, pcap_path: Path, outfile: Path, timeout: int = EDITCAP_TIMEOUT): + """Generate a pcapng file from a pcap file and SSL key log file using editcap. + + Args: + sslkeylogfile_path: Path to the SSL key log file containing TLS decryption keys + pcap_path: Path to the input pcap file + outfile: Path where the output pcapng file should be written + timeout: Maximum time in seconds to wait for editcap to complete (default: EDITCAP_TIMEOUT) + + Raises: + EmptyPcapError: If the pcap file doesn't exist or is empty + subprocess.CalledProcessError: If editcap exits with a non-zero status + subprocess.TimeoutExpired: If editcap execution exceeds the timeout + """ + if not file_exists_not_empty(pcap_path): + raise EmptyPcapError(pcap_path) + cmd = [EDITCAP, "--inject-secrets", f"tls,{sslkeylogfile_path}", pcap_path, outfile] + log.debug("generating pcapng with command '%s", cmd) + subprocess.check_call(cmd, timeout=timeout) + + +def _has_magic(file: str | Path, magic_numbers: tuple[int, ...]) -> bool: + """Check if a file starts with one of the given magic numbers. + + Args: + file: Path to the file to check + magic_numbers: Tuple of magic numbers to check for (as integers in big-endian) + + Returns: + True if the file starts with one of the magic numbers, False otherwise + + Note: + Magic numbers are read in big-endian byte order (the natural way to represent + hex values). If you need to check files with different byte orders, include + both byte order variations in the magic_numbers tuple. + """ + if not magic_numbers: + return False + + max_magic = max(magic_numbers) + magic_byte_len = (max_magic.bit_length() + 7) // 8 + + try: + with open(file, "rb") as fd: + magic_bytes = fd.read(magic_byte_len) + # Return false if the file is too small to contain the magic number + if len(magic_bytes) < magic_byte_len: + return False + + magic_number = int.from_bytes(magic_bytes, byteorder="big") + return magic_number in magic_numbers + except (OSError, IOError): + return False + + +def is_pcap(file: str | Path) -> bool: + """Check if a file is a PCAP file by checking its magic number. + + PCAP files start with either 0xA1B2C3D4 (big-endian) or 0xD4C3B2A1 (little-endian). + """ + return _has_magic(file, (0xA1B2C3D4, 0xD4C3B2A1)) + + +def is_pcapng(file: str | Path) -> bool: + """Check if a file is a PCAPNG file by checking its magic number. + + PCAPNG files start with 0x0A0D0D0A (Section Header Block magic). + """ + return _has_magic(file, (0x0A0D0D0A,)) + + +class EmptyPcapError(Exception): + """Exception raised when a pcap file is empty or doesn't exist.""" + + def __init__(self, pcap_path: Path): + """Initialize the EmptyPcapError. + + Args: + pcap_path: Path to the empty or non-existent pcap file + """ + self.pcap_path = pcap_path + super().__init__(f"pcap file is empty: {pcap_path}") + + +class PcapToNg: + """Combine a PCAP, TLS key log and SSL key log into a .pcapng file. + + Requires the `editcap` executable.""" + + def __init__(self, pcap_path: str | Path, tlsdump_log: Path | str | None = None, sslkeys_log: Path | str | None = None): + """Initialize the PcapToNg converter. + + Args: + pcap_path: Path to the source pcap file + tlsdump_log: Optional path to the CAPEMON TLS dump log file + sslkeys_log: Optional path to the SSLKEYLOGFILE format key log + """ + self.pcap_path = Path(pcap_path) + self.pcapng_path = Path(f"{self.pcap_path}ng") + self.tlsdump_log = Path(tlsdump_log) if tlsdump_log else None + self.sslkeys_log = Path(sslkeys_log) if sslkeys_log else None + + def generate(self, outfile: Path | str | None = None): + """Generate a pcapng file by combining the pcap with TLS/SSL key logs. + + This method will: + 1. Skip generation if the output already exists and is a valid pcapng + 2. Combine TLS dump logs and SSL key logs into a temporary file + 3. Use editcap to inject the TLS secrets into the pcap to create a pcapng + + Args: + outfile: Optional path where the pcapng should be written. + If None, uses the pcap_path with 'ng' suffix. + + Note: + Errors are logged but not raised. The method returns silently if: + - The output file already exists + - The input pcap doesn't exist or is empty + - editcap is not found in PATH + - editcap execution fails + """ + if not outfile: + outfile = self.pcapng_path + elif isinstance(outfile, str): + outfile = Path(outfile) + + if outfile.exists() and is_pcapng(outfile): + log.debug('pcapng already exists, nothing to do "%s"', outfile) + return + + if not self.pcap_path.exists(): + log.debug('pcap not found, nothing to do "%s"', self.pcap_path) + return + + if self.pcap_path.stat().st_size == 0: + log.debug('pcap is empty, nothing to do "%s"', self.pcap_path) + return + + if not shutil.which(EDITCAP): + log.error("%s not in path and is required", EDITCAP) + return + + failmsg = "failed to generate .pcapng" + try: + # Combine all TLS logs into a single file in a format that can be read by editcap + with tempfile.NamedTemporaryFile("w", dir=self.pcap_path.parent, encoding="utf-8") as tmp_ssl_key_log: + tmp_ssl_key_log_path = Path(tmp_ssl_key_log.name) + # Write CAPEMON keys + if file_exists_not_empty(self.tlsdump_log): + log.debug("writing tlsdump.log to temp key log file") + tlslog_to_sslkeylogfile(self.tlsdump_log, tmp_ssl_key_log_path) + # Write SSLKEYLOGFILE keys + if file_exists_not_empty(self.sslkeys_log): + log.debug("writing SSLKEYLOGFILE to temp key log file") + append_file_contents_to_file(self.sslkeys_log, tmp_ssl_key_log_path) + generate_pcapng(tmp_ssl_key_log_path, self.pcap_path, outfile) + except subprocess.CalledProcessError as exc: + log.error("%s: editcap exited with code: %d", failmsg, exc.returncode) + except subprocess.TimeoutExpired: + log.error("%s: editcap reached timeout", failmsg) + except (OSError, EmptyPcapError) as exc: + log.error("%s: %s", failmsg, exc) diff --git a/modules/processing/pcapng.py b/modules/processing/pcapng.py index f41e288cb97..f33fb5ec621 100644 --- a/modules/processing/pcapng.py +++ b/modules/processing/pcapng.py @@ -1,86 +1,33 @@ import logging -import os -import shutil -import subprocess -import tempfile from lib.cuckoo.common.abstracts import Processing from lib.cuckoo.common.objects import File -from lib.cuckoo.common.path_utils import path_exists -from utils.tls import tlslog_to_sslkeylogfile - -EDITCAP = "editcap" -EDITCAP_TIMEOUT = 60 +from lib.cuckoo.common.pcap_utils import PcapToNg, file_exists_not_empty, is_pcapng +from pathlib import Path log = logging.getLogger(__name__) class PcapNg(Processing): - """Injects TLS keys into a .pcap, resulting in a .pcapng file. - - Requires the `editcap` executable.""" + """Generate a pcapng file during processing.""" key = "pcapng" - def set_path(self, analysis_path): + def set_path(self, analysis_path: str) -> None: """Set paths. @param analysis_path: analysis folder path. """ super().set_path(analysis_path) - # The file CAPE Monitor logs TLS keys to - self.tlsdump_log = os.path.join(self.analysis_path, "tlsdump", "tlsdump.log") - # The file logged to by libraries that support the SSLKEYLOGFILE env var - self.sslkeys_log = os.path.join(self.analysis_path, "aux/sslkeylogfile", "sslkeys.log") - self.pcapng_path = self.pcap_path + "ng" - - def run(self): - retval = {} - - if not path_exists(self.pcap_path): - log.debug('pcap not found, nothing to do "%s"', self.pcap_path) - return retval - - if os.path.getsize(self.pcap_path) == 0: - log.debug('pcap is empty, nothing to do "%s"', self.pcap_path) - return retval - - if not shutil.which(EDITCAP): - log.error("%s not in path and is required", EDITCAP) - return retval - - try: - failmsg = "failed to generate .pcapng" - tls_dir = os.path.dirname(self.tlsdump_log) - # Combine all TLS logs into a single file in a format that can be read by editcap - with tempfile.NamedTemporaryFile("w", dir=tls_dir, encoding="utf-8") as dest_ssl_key_log: - # Write CAPEMON keys - if self.file_exists_not_empty(self.tlsdump_log): - log.debug("writing tlsdump.log to temp key log file") - tlslog_to_sslkeylogfile(self.tlsdump_log, dest_ssl_key_log.name) - # Write SSLKEYLOGFILE keys - if self.file_exists_not_empty(self.sslkeys_log): - log.debug("writing SSLKEYLOGFILE to temp key log file") - self.append_file_contents_to_file(self.sslkeys_log, dest_ssl_key_log.name) - self.generate_pcapng(dest_ssl_key_log.name) - retval = {"sha256": File(self.pcapng_path).get_sha256()} - except subprocess.CalledProcessError as exc: - log.error("%s: editcap exited with code: %d", failmsg, exc.returncode) - except subprocess.TimeoutExpired: - log.error("%s: editcap reached timeout", failmsg) - except OSError as exc: - log.error("%s: %s", failmsg, exc) - - return retval - - def file_exists_not_empty(self, path): - return bool(path_exists(path) and os.path.getsize(path) > 0) - - def append_file_contents_to_file(self, file_with_contents, append_to_file): - with open(file_with_contents, "r") as src, open(append_to_file, "a+") as dst: - dst.write(src.read()) - - def generate_pcapng(self, sslkeylogfile_path): - # ToDo bail if file is empty - cmd = [EDITCAP, "--inject-secrets", "tls," + sslkeylogfile_path, self.pcap_path, self.pcapng_path] - log.debug("generating pcapng with command '%s", cmd) - subprocess.check_call(cmd, timeout=EDITCAP_TIMEOUT) + self.tlsdump_log = Path(analysis_path) / "tlsdump" / "tlsdump.log" + self.sslkeys_log = Path(analysis_path) / "aux" / "sslkeylogfile" / "sslkeys.log" + self.pcapng_path = Path(self.pcap_path + "ng") + + def run(self) -> dict[str, str | None]: + PcapToNg(self.pcap_path, self.tlsdump_log, self.sslkeys_log).generate(self.pcapng_path) + if not file_exists_not_empty(self.pcapng_path): + log.warning("pcapng file was not created: %s", self.pcapng_path) + return {} + if not is_pcapng(self.pcapng_path): + log.warning("generated pcapng file is not valid: %s", self.pcapng_path) + return {} + return {"sha256": File(self.pcapng_path).get_sha256()} diff --git a/web/analysis/views.py b/web/analysis/views.py index 898b05edf55..db28ee4b10d 100644 --- a/web/analysis/views.py +++ b/web/analysis/views.py @@ -28,6 +28,7 @@ sys.path.append(settings.CUCKOO_PATH) +from lib.cuckoo.common.pcap_utils import PcapToNg import modules.processing.network as network from lib.cuckoo.common.config import Config from lib.cuckoo.common.constants import ANALYSIS_BASE_PATH, CUCKOO_ROOT @@ -1852,8 +1853,14 @@ def file(request, category, task_id, dlfile): path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "dump.pcap") cd = "application/vnd.tcpdump.pcap" elif category == "pcapng": - file_name += ".pcapng" + analysis_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id) + pcap_path = os.path.join(analysis_path, "dump.pcap") + tls_log_path = os.path.join(analysis_path, "tlsdump", "tlsdump.log") + ssl_key_log_path = os.path.join(analysis_path, "aux", "sslkeylogfile", "sslkeys.log") path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "dump.pcapng") + pcapng = PcapToNg(pcap_path, tls_log_path, ssl_key_log_path) + pcapng.generate(path) + file_name += ".pcapng" cd = "application/vnd.tcpdump.pcap" elif category == "debugger_log": path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "debugger", str(dlfile) + ".log") diff --git a/web/templates/analysis/network/index.html b/web/templates/analysis/network/index.html index f32a3e1b3b4..e3bc6349b3d 100644 --- a/web/templates/analysis/network/index.html +++ b/web/templates/analysis/network/index.html @@ -2,8 +2,8 @@
PCAP - {% if pcapng.sha256 %} - PCAP-NG + {% if config.pcap_ng %} + PCAP-NG {% endif %} PCAP {% if tlskeys_exists %}