From 706f2e28ce43e73e5fb2784830a1467df575c36e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 18:51:06 +0000 Subject: [PATCH 1/2] Initial plan From ab3834588c8856c29f9a0200ca62b968a33424a7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 18:56:36 +0000 Subject: [PATCH 2/2] Make encrypted IP regex handling secure by default with opt-out --- README.md | 13 ++++ .../_extraction_heuristic_pre_validator.py | 25 +++++- ...test_extraction_heuristic_pre_validator.py | 76 +++++++++++++++++++ 3 files changed, 113 insertions(+), 1 deletion(-) create mode 100644 tests/test_extraction_heuristic_pre_validator.py diff --git a/README.md b/README.md index ecd9a056..f29bef6d 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,19 @@ export IPINFO_API_KEY="your_token_here" export OPENCAGE_API_KEY="your_token_here" ``` +The extraction heuristic validator encrypts internal IP regex patterns by default. To preserve this security default, set: + +```bash +export S3_LOG_EXTRACTION_PASSWORD="your_encryption_password" +``` + +On trusted internal processing servers where you want to skip decryption overhead, you can disable encrypted regex handling and provide the plaintext regex directly: + +```bash +export S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX="false" +export S3_LOG_EXTRACTION_DROGON_IP_REGEX="" +``` + To update the region codes and their coordinates: ```bash diff --git a/src/s3_log_extraction/validate/_extraction_heuristic_pre_validator.py b/src/s3_log_extraction/validate/_extraction_heuristic_pre_validator.py index 14fc6851..ea23c946 100644 --- a/src/s3_log_extraction/validate/_extraction_heuristic_pre_validator.py +++ b/src/s3_log_extraction/validate/_extraction_heuristic_pre_validator.py @@ -1,4 +1,5 @@ import hashlib +import os import pathlib import subprocess @@ -29,7 +30,7 @@ def __hash__(self) -> int: # TODO: parallelize def __init__(self): - self.DROGON_IP_REGEX = decrypt_bytes(encrypted_data=DROGON_IP_REGEX_ENCRYPTED) + self.DROGON_IP_REGEX = _get_drogon_ip_regex() # TODO: does this hold after bundling? self._relative_awk_script_path = ( @@ -58,3 +59,25 @@ def _run_validation(self, file_path: pathlib.Path) -> None: f"stderr: {result.stderr}\n" ) raise RuntimeError(message) + + +def _get_drogon_ip_regex() -> str: + encrypt_ip_regex = os.environ.get("S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX", "true").lower() not in { + "0", + "false", + "no", + "off", + } + + if encrypt_ip_regex: + return decrypt_bytes(encrypted_data=DROGON_IP_REGEX_ENCRYPTED).decode("utf-8") + + drogon_ip_regex = os.environ.get("S3_LOG_EXTRACTION_DROGON_IP_REGEX") + if drogon_ip_regex is None: + message = ( + "Environment variable `S3_LOG_EXTRACTION_DROGON_IP_REGEX` is required when " + "`S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX` is disabled." + ) + raise EnvironmentError(message) + + return drogon_ip_regex diff --git a/tests/test_extraction_heuristic_pre_validator.py b/tests/test_extraction_heuristic_pre_validator.py new file mode 100644 index 00000000..8ab3c1ff --- /dev/null +++ b/tests/test_extraction_heuristic_pre_validator.py @@ -0,0 +1,76 @@ +import importlib.util +import pathlib +import sys +import types + +import pytest + + +def _load_extraction_heuristic_module(): + source_root = pathlib.Path(__file__).parent.parent / "src" / "s3_log_extraction" + + for name in ( + "s3_log_extraction", + "s3_log_extraction.validate", + "s3_log_extraction.utils", + "s3_log_extraction._regex", + "s3_log_extraction.utils.encryption", + "s3_log_extraction.validate._base_validator", + "s3_log_extraction.validate._extraction_heuristic_pre_validator", + ): + sys.modules.pop(name, None) + + package = types.ModuleType("s3_log_extraction") + package.__path__ = [str(source_root)] # type: ignore[attr-defined] + sys.modules["s3_log_extraction"] = package + + validate_package = types.ModuleType("s3_log_extraction.validate") + validate_package.__path__ = [str(source_root / "validate")] # type: ignore[attr-defined] + sys.modules["s3_log_extraction.validate"] = validate_package + + utils_package = types.ModuleType("s3_log_extraction.utils") + utils_package.__path__ = [str(source_root / "utils")] # type: ignore[attr-defined] + sys.modules["s3_log_extraction.utils"] = utils_package + + for module_name, relative_path in ( + ("s3_log_extraction._regex", "_regex.py"), + ("s3_log_extraction.utils.encryption", "utils/encryption.py"), + ("s3_log_extraction.validate._base_validator", "validate/_base_validator.py"), + ( + "s3_log_extraction.validate._extraction_heuristic_pre_validator", + "validate/_extraction_heuristic_pre_validator.py", + ), + ): + spec = importlib.util.spec_from_file_location(module_name, source_root / relative_path) + module = importlib.util.module_from_spec(spec) + assert spec is not None and spec.loader is not None + sys.modules[module_name] = module + spec.loader.exec_module(module) + + return sys.modules["s3_log_extraction.validate._extraction_heuristic_pre_validator"] + + +def test_drogon_ip_regex_uses_encryption_by_default(monkeypatch: pytest.MonkeyPatch) -> None: + module = _load_extraction_heuristic_module() + monkeypatch.delenv("S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX", raising=False) + monkeypatch.setattr(module, "decrypt_bytes", lambda *, encrypted_data: b"decoded-regex") + + assert module._get_drogon_ip_regex() == "decoded-regex" + + +def test_drogon_ip_regex_uses_plaintext_override_when_disabled(monkeypatch: pytest.MonkeyPatch) -> None: + module = _load_extraction_heuristic_module() + monkeypatch.setenv("S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX", "false") + monkeypatch.setenv("S3_LOG_EXTRACTION_DROGON_IP_REGEX", "plain-regex") + monkeypatch.setattr(module, "decrypt_bytes", lambda *, encrypted_data: b"should-not-be-used") + + assert module._get_drogon_ip_regex() == "plain-regex" + + +def test_drogon_ip_regex_requires_plaintext_value_when_encryption_disabled(monkeypatch: pytest.MonkeyPatch) -> None: + module = _load_extraction_heuristic_module() + monkeypatch.setenv("S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX", "false") + monkeypatch.delenv("S3_LOG_EXTRACTION_DROGON_IP_REGEX", raising=False) + + with pytest.raises(EnvironmentError, match="S3_LOG_EXTRACTION_DROGON_IP_REGEX"): + module._get_drogon_ip_regex()