|
| 1 | +import logging |
| 2 | +import shutil |
| 3 | +import subprocess |
| 4 | +import tempfile |
| 5 | +from pathlib import Path |
| 6 | + |
| 7 | +from utils.tls import tlslog_to_sslkeylogfile |
| 8 | + |
| 9 | +EDITCAP = "editcap" |
| 10 | +EDITCAP_TIMEOUT = 60 |
| 11 | + |
| 12 | +log = logging.getLogger(__name__) |
| 13 | + |
| 14 | + |
| 15 | +def append_file_contents_to_file(file_with_contents: Path, append_to_file: Path): |
| 16 | + """Append the contents of one file to another file. |
| 17 | +
|
| 18 | + Args: |
| 19 | + file_with_contents: Path to the source file to read from |
| 20 | + append_to_file: Path to the destination file to append to |
| 21 | + """ |
| 22 | + with file_with_contents.open("r") as src, append_to_file.open("a+") as dst: |
| 23 | + dst.write(src.read()) |
| 24 | + |
| 25 | + |
| 26 | +def file_exists_not_empty(path: Path | None) -> bool: |
| 27 | + """Check if a file exists and is not empty. |
| 28 | +
|
| 29 | + Args: |
| 30 | + path: Path to the file to check, or None |
| 31 | +
|
| 32 | + Returns: |
| 33 | + True if the path is not None, the file exists, and has size > 0, False otherwise |
| 34 | + """ |
| 35 | + return bool(path and path.exists() and path.stat().st_size > 0) |
| 36 | + |
| 37 | + |
| 38 | +def generate_pcapng(sslkeylogfile_path: Path, pcap_path: Path, outfile: Path, timeout: int = EDITCAP_TIMEOUT): |
| 39 | + """Generate a pcapng file from a pcap file and SSL key log file using editcap. |
| 40 | +
|
| 41 | + Args: |
| 42 | + sslkeylogfile_path: Path to the SSL key log file containing TLS decryption keys |
| 43 | + pcap_path: Path to the input pcap file |
| 44 | + outfile: Path where the output pcapng file should be written |
| 45 | + timeout: Maximum time in seconds to wait for editcap to complete (default: EDITCAP_TIMEOUT) |
| 46 | +
|
| 47 | + Raises: |
| 48 | + EmptyPcapError: If the pcap file doesn't exist or is empty |
| 49 | + subprocess.CalledProcessError: If editcap exits with a non-zero status |
| 50 | + subprocess.TimeoutExpired: If editcap execution exceeds the timeout |
| 51 | + """ |
| 52 | + if not file_exists_not_empty(pcap_path): |
| 53 | + raise EmptyPcapError(pcap_path) |
| 54 | + cmd = [EDITCAP, "--inject-secrets", f"tls,{sslkeylogfile_path}", pcap_path, outfile] |
| 55 | + log.debug("generating pcapng with command '%s", cmd) |
| 56 | + subprocess.check_call(cmd, timeout=timeout) |
| 57 | + |
| 58 | + |
| 59 | +def _has_magic(file: str | Path, magic_numbers: tuple[int, ...]) -> bool: |
| 60 | + """Check if a file starts with one of the given magic numbers. |
| 61 | +
|
| 62 | + Args: |
| 63 | + file: Path to the file to check |
| 64 | + magic_numbers: Tuple of magic numbers to check for (as integers in big-endian) |
| 65 | +
|
| 66 | + Returns: |
| 67 | + True if the file starts with one of the magic numbers, False otherwise |
| 68 | +
|
| 69 | + Note: |
| 70 | + Magic numbers are read in big-endian byte order (the natural way to represent |
| 71 | + hex values). If you need to check files with different byte orders, include |
| 72 | + both byte order variations in the magic_numbers tuple. |
| 73 | + """ |
| 74 | + if not magic_numbers: |
| 75 | + return False |
| 76 | + |
| 77 | + max_magic = max(magic_numbers) |
| 78 | + magic_byte_len = (max_magic.bit_length() + 7) // 8 |
| 79 | + |
| 80 | + try: |
| 81 | + with open(file, "rb") as fd: |
| 82 | + magic_bytes = fd.read(magic_byte_len) |
| 83 | + # Return false if the file is too small to contain the magic number |
| 84 | + if len(magic_bytes) < magic_byte_len: |
| 85 | + return False |
| 86 | + |
| 87 | + magic_number = int.from_bytes(magic_bytes, byteorder="big") |
| 88 | + return magic_number in magic_numbers |
| 89 | + except (OSError, IOError): |
| 90 | + return False |
| 91 | + |
| 92 | + |
| 93 | +def is_pcap(file: str | Path) -> bool: |
| 94 | + """Check if a file is a PCAP file by checking its magic number. |
| 95 | +
|
| 96 | + PCAP files start with either 0xA1B2C3D4 (big-endian) or 0xD4C3B2A1 (little-endian). |
| 97 | + """ |
| 98 | + return _has_magic(file, (0xA1B2C3D4, 0xD4C3B2A1)) |
| 99 | + |
| 100 | + |
| 101 | +def is_pcapng(file: str | Path) -> bool: |
| 102 | + """Check if a file is a PCAPNG file by checking its magic number. |
| 103 | +
|
| 104 | + PCAPNG files start with 0x0A0D0D0A (Section Header Block magic). |
| 105 | + """ |
| 106 | + return _has_magic(file, (0x0A0D0D0A,)) |
| 107 | + |
| 108 | + |
| 109 | +class EmptyPcapError(Exception): |
| 110 | + """Exception raised when a pcap file is empty or doesn't exist.""" |
| 111 | + |
| 112 | + def __init__(self, pcap_path: Path): |
| 113 | + """Initialize the EmptyPcapError. |
| 114 | +
|
| 115 | + Args: |
| 116 | + pcap_path: Path to the empty or non-existent pcap file |
| 117 | + """ |
| 118 | + self.pcap_path = pcap_path |
| 119 | + super().__init__(f"pcap file is empty: {pcap_path}") |
| 120 | + |
| 121 | + |
| 122 | +class PcapToNg: |
| 123 | + """Combine a PCAP, TLS key log and SSL key log into a .pcapng file. |
| 124 | +
|
| 125 | + Requires the `editcap` executable.""" |
| 126 | + |
| 127 | + def __init__(self, pcap_path: str | Path, tlsdump_log: Path | str | None = None, sslkeys_log: Path | str | None = None): |
| 128 | + """Initialize the PcapToNg converter. |
| 129 | +
|
| 130 | + Args: |
| 131 | + pcap_path: Path to the source pcap file |
| 132 | + tlsdump_log: Optional path to the CAPEMON TLS dump log file |
| 133 | + sslkeys_log: Optional path to the SSLKEYLOGFILE format key log |
| 134 | + """ |
| 135 | + self.pcap_path = Path(pcap_path) |
| 136 | + self.pcapng_path = Path(f"{self.pcap_path}ng") |
| 137 | + self.tlsdump_log = Path(tlsdump_log) if tlsdump_log else None |
| 138 | + self.sslkeys_log = Path(sslkeys_log) if sslkeys_log else None |
| 139 | + |
| 140 | + def generate(self, outfile: Path | str | None = None): |
| 141 | + """Generate a pcapng file by combining the pcap with TLS/SSL key logs. |
| 142 | +
|
| 143 | + This method will: |
| 144 | + 1. Skip generation if the output already exists and is a valid pcapng |
| 145 | + 2. Combine TLS dump logs and SSL key logs into a temporary file |
| 146 | + 3. Use editcap to inject the TLS secrets into the pcap to create a pcapng |
| 147 | +
|
| 148 | + Args: |
| 149 | + outfile: Optional path where the pcapng should be written. |
| 150 | + If None, uses the pcap_path with 'ng' suffix. |
| 151 | +
|
| 152 | + Note: |
| 153 | + Errors are logged but not raised. The method returns silently if: |
| 154 | + - The output file already exists |
| 155 | + - The input pcap doesn't exist or is empty |
| 156 | + - editcap is not found in PATH |
| 157 | + - editcap execution fails |
| 158 | + """ |
| 159 | + if not outfile: |
| 160 | + outfile = self.pcapng_path |
| 161 | + elif isinstance(outfile, str): |
| 162 | + outfile = Path(outfile) |
| 163 | + |
| 164 | + if outfile.exists() and is_pcapng(outfile): |
| 165 | + log.debug('pcapng already exists, nothing to do "%s"', outfile) |
| 166 | + return |
| 167 | + |
| 168 | + if not self.pcap_path.exists(): |
| 169 | + log.debug('pcap not found, nothing to do "%s"', self.pcap_path) |
| 170 | + return |
| 171 | + |
| 172 | + if self.pcap_path.stat().st_size == 0: |
| 173 | + log.debug('pcap is empty, nothing to do "%s"', self.pcap_path) |
| 174 | + return |
| 175 | + |
| 176 | + if not shutil.which(EDITCAP): |
| 177 | + log.error("%s not in path and is required", EDITCAP) |
| 178 | + return |
| 179 | + |
| 180 | + failmsg = "failed to generate .pcapng" |
| 181 | + try: |
| 182 | + # Combine all TLS logs into a single file in a format that can be read by editcap |
| 183 | + with tempfile.NamedTemporaryFile("w", dir=self.pcap_path.parent, encoding="utf-8") as tmp_ssl_key_log: |
| 184 | + tmp_ssl_key_log_path = Path(tmp_ssl_key_log.name) |
| 185 | + # Write CAPEMON keys |
| 186 | + if file_exists_not_empty(self.tlsdump_log): |
| 187 | + log.debug("writing tlsdump.log to temp key log file") |
| 188 | + tlslog_to_sslkeylogfile(self.tlsdump_log, tmp_ssl_key_log_path) |
| 189 | + # Write SSLKEYLOGFILE keys |
| 190 | + if file_exists_not_empty(self.sslkeys_log): |
| 191 | + log.debug("writing SSLKEYLOGFILE to temp key log file") |
| 192 | + append_file_contents_to_file(self.sslkeys_log, tmp_ssl_key_log_path) |
| 193 | + generate_pcapng(tmp_ssl_key_log_path, self.pcap_path, outfile) |
| 194 | + except subprocess.CalledProcessError as exc: |
| 195 | + log.error("%s: editcap exited with code: %d", failmsg, exc.returncode) |
| 196 | + except subprocess.TimeoutExpired: |
| 197 | + log.error("%s: editcap reached timeout", failmsg) |
| 198 | + except (OSError, EmptyPcapError) as exc: |
| 199 | + log.error("%s: %s", failmsg, exc) |
0 commit comments