diff --git a/README.md b/README.md index ecd9a056..f29bef6d 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,19 @@ export IPINFO_API_KEY="your_token_here" export OPENCAGE_API_KEY="your_token_here" ``` +The extraction heuristic validator encrypts internal IP regex patterns by default. To preserve this security default, set: + +```bash +export S3_LOG_EXTRACTION_PASSWORD="your_encryption_password" +``` + +On trusted internal processing servers where you want to skip decryption overhead, you can disable encrypted regex handling and provide the plaintext regex directly: + +```bash +export S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX="false" +export S3_LOG_EXTRACTION_DROGON_IP_REGEX="" +``` + To update the region codes and their coordinates: ```bash diff --git a/src/s3_log_extraction/validate/_extraction_heuristic_pre_validator.py b/src/s3_log_extraction/validate/_extraction_heuristic_pre_validator.py index 14fc6851..ea23c946 100644 --- a/src/s3_log_extraction/validate/_extraction_heuristic_pre_validator.py +++ b/src/s3_log_extraction/validate/_extraction_heuristic_pre_validator.py @@ -1,4 +1,5 @@ import hashlib +import os import pathlib import subprocess @@ -29,7 +30,7 @@ def __hash__(self) -> int: # TODO: parallelize def __init__(self): - self.DROGON_IP_REGEX = decrypt_bytes(encrypted_data=DROGON_IP_REGEX_ENCRYPTED) + self.DROGON_IP_REGEX = _get_drogon_ip_regex() # TODO: does this hold after bundling? self._relative_awk_script_path = ( @@ -58,3 +59,25 @@ def _run_validation(self, file_path: pathlib.Path) -> None: f"stderr: {result.stderr}\n" ) raise RuntimeError(message) + + +def _get_drogon_ip_regex() -> str: + encrypt_ip_regex = os.environ.get("S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX", "true").lower() not in { + "0", + "false", + "no", + "off", + } + + if encrypt_ip_regex: + return decrypt_bytes(encrypted_data=DROGON_IP_REGEX_ENCRYPTED).decode("utf-8") + + drogon_ip_regex = os.environ.get("S3_LOG_EXTRACTION_DROGON_IP_REGEX") + if drogon_ip_regex is None: + message = ( + "Environment variable `S3_LOG_EXTRACTION_DROGON_IP_REGEX` is required when " + "`S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX` is disabled." + ) + raise EnvironmentError(message) + + return drogon_ip_regex diff --git a/tests/test_extraction_heuristic_pre_validator.py b/tests/test_extraction_heuristic_pre_validator.py new file mode 100644 index 00000000..8ab3c1ff --- /dev/null +++ b/tests/test_extraction_heuristic_pre_validator.py @@ -0,0 +1,76 @@ +import importlib.util +import pathlib +import sys +import types + +import pytest + + +def _load_extraction_heuristic_module(): + source_root = pathlib.Path(__file__).parent.parent / "src" / "s3_log_extraction" + + for name in ( + "s3_log_extraction", + "s3_log_extraction.validate", + "s3_log_extraction.utils", + "s3_log_extraction._regex", + "s3_log_extraction.utils.encryption", + "s3_log_extraction.validate._base_validator", + "s3_log_extraction.validate._extraction_heuristic_pre_validator", + ): + sys.modules.pop(name, None) + + package = types.ModuleType("s3_log_extraction") + package.__path__ = [str(source_root)] # type: ignore[attr-defined] + sys.modules["s3_log_extraction"] = package + + validate_package = types.ModuleType("s3_log_extraction.validate") + validate_package.__path__ = [str(source_root / "validate")] # type: ignore[attr-defined] + sys.modules["s3_log_extraction.validate"] = validate_package + + utils_package = types.ModuleType("s3_log_extraction.utils") + utils_package.__path__ = [str(source_root / "utils")] # type: ignore[attr-defined] + sys.modules["s3_log_extraction.utils"] = utils_package + + for module_name, relative_path in ( + ("s3_log_extraction._regex", "_regex.py"), + ("s3_log_extraction.utils.encryption", "utils/encryption.py"), + ("s3_log_extraction.validate._base_validator", "validate/_base_validator.py"), + ( + "s3_log_extraction.validate._extraction_heuristic_pre_validator", + "validate/_extraction_heuristic_pre_validator.py", + ), + ): + spec = importlib.util.spec_from_file_location(module_name, source_root / relative_path) + module = importlib.util.module_from_spec(spec) + assert spec is not None and spec.loader is not None + sys.modules[module_name] = module + spec.loader.exec_module(module) + + return sys.modules["s3_log_extraction.validate._extraction_heuristic_pre_validator"] + + +def test_drogon_ip_regex_uses_encryption_by_default(monkeypatch: pytest.MonkeyPatch) -> None: + module = _load_extraction_heuristic_module() + monkeypatch.delenv("S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX", raising=False) + monkeypatch.setattr(module, "decrypt_bytes", lambda *, encrypted_data: b"decoded-regex") + + assert module._get_drogon_ip_regex() == "decoded-regex" + + +def test_drogon_ip_regex_uses_plaintext_override_when_disabled(monkeypatch: pytest.MonkeyPatch) -> None: + module = _load_extraction_heuristic_module() + monkeypatch.setenv("S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX", "false") + monkeypatch.setenv("S3_LOG_EXTRACTION_DROGON_IP_REGEX", "plain-regex") + monkeypatch.setattr(module, "decrypt_bytes", lambda *, encrypted_data: b"should-not-be-used") + + assert module._get_drogon_ip_regex() == "plain-regex" + + +def test_drogon_ip_regex_requires_plaintext_value_when_encryption_disabled(monkeypatch: pytest.MonkeyPatch) -> None: + module = _load_extraction_heuristic_module() + monkeypatch.setenv("S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX", "false") + monkeypatch.delenv("S3_LOG_EXTRACTION_DROGON_IP_REGEX", raising=False) + + with pytest.raises(EnvironmentError, match="S3_LOG_EXTRACTION_DROGON_IP_REGEX"): + module._get_drogon_ip_regex()