Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,19 @@ export IPINFO_API_KEY="your_token_here"
export OPENCAGE_API_KEY="your_token_here"
```

The extraction heuristic validator encrypts internal IP regex patterns by default. To preserve this security default, set:

```bash
export S3_LOG_EXTRACTION_PASSWORD="your_encryption_password"
```

On trusted internal processing servers where you want to skip decryption overhead, you can disable encrypted regex handling and provide the plaintext regex directly:

```bash
export S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX="false"
export S3_LOG_EXTRACTION_DROGON_IP_REGEX="<plaintext regex>"
```

To update the region codes and their coordinates:

```bash
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import hashlib
import os
import pathlib
import subprocess

Expand Down Expand Up @@ -29,7 +30,7 @@ def __hash__(self) -> int:

# TODO: parallelize
def __init__(self):
self.DROGON_IP_REGEX = decrypt_bytes(encrypted_data=DROGON_IP_REGEX_ENCRYPTED)
self.DROGON_IP_REGEX = _get_drogon_ip_regex()

# TODO: does this hold after bundling?
self._relative_awk_script_path = (
Expand Down Expand Up @@ -58,3 +59,25 @@ def _run_validation(self, file_path: pathlib.Path) -> None:
f"stderr: {result.stderr}\n"
)
raise RuntimeError(message)


def _get_drogon_ip_regex() -> str:
encrypt_ip_regex = os.environ.get("S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX", "true").lower() not in {
"0",
"false",
"no",
"off",
}

if encrypt_ip_regex:
return decrypt_bytes(encrypted_data=DROGON_IP_REGEX_ENCRYPTED).decode("utf-8")

drogon_ip_regex = os.environ.get("S3_LOG_EXTRACTION_DROGON_IP_REGEX")
if drogon_ip_regex is None:
message = (
"Environment variable `S3_LOG_EXTRACTION_DROGON_IP_REGEX` is required when "
"`S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX` is disabled."
)
raise EnvironmentError(message)

return drogon_ip_regex
76 changes: 76 additions & 0 deletions tests/test_extraction_heuristic_pre_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import importlib.util
import pathlib
import sys
import types

import pytest


def _load_extraction_heuristic_module():
source_root = pathlib.Path(__file__).parent.parent / "src" / "s3_log_extraction"

for name in (
"s3_log_extraction",
"s3_log_extraction.validate",
"s3_log_extraction.utils",
"s3_log_extraction._regex",
"s3_log_extraction.utils.encryption",
"s3_log_extraction.validate._base_validator",
"s3_log_extraction.validate._extraction_heuristic_pre_validator",
):
sys.modules.pop(name, None)

package = types.ModuleType("s3_log_extraction")
package.__path__ = [str(source_root)] # type: ignore[attr-defined]
sys.modules["s3_log_extraction"] = package

validate_package = types.ModuleType("s3_log_extraction.validate")
validate_package.__path__ = [str(source_root / "validate")] # type: ignore[attr-defined]
sys.modules["s3_log_extraction.validate"] = validate_package

utils_package = types.ModuleType("s3_log_extraction.utils")
utils_package.__path__ = [str(source_root / "utils")] # type: ignore[attr-defined]
sys.modules["s3_log_extraction.utils"] = utils_package

for module_name, relative_path in (
("s3_log_extraction._regex", "_regex.py"),
("s3_log_extraction.utils.encryption", "utils/encryption.py"),
("s3_log_extraction.validate._base_validator", "validate/_base_validator.py"),
(
"s3_log_extraction.validate._extraction_heuristic_pre_validator",
"validate/_extraction_heuristic_pre_validator.py",
),
):
spec = importlib.util.spec_from_file_location(module_name, source_root / relative_path)
module = importlib.util.module_from_spec(spec)
assert spec is not None and spec.loader is not None
sys.modules[module_name] = module
spec.loader.exec_module(module)

return sys.modules["s3_log_extraction.validate._extraction_heuristic_pre_validator"]


def test_drogon_ip_regex_uses_encryption_by_default(monkeypatch: pytest.MonkeyPatch) -> None:
module = _load_extraction_heuristic_module()
monkeypatch.delenv("S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX", raising=False)
monkeypatch.setattr(module, "decrypt_bytes", lambda *, encrypted_data: b"decoded-regex")

assert module._get_drogon_ip_regex() == "decoded-regex"


def test_drogon_ip_regex_uses_plaintext_override_when_disabled(monkeypatch: pytest.MonkeyPatch) -> None:
module = _load_extraction_heuristic_module()
monkeypatch.setenv("S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX", "false")
monkeypatch.setenv("S3_LOG_EXTRACTION_DROGON_IP_REGEX", "plain-regex")
monkeypatch.setattr(module, "decrypt_bytes", lambda *, encrypted_data: b"should-not-be-used")

assert module._get_drogon_ip_regex() == "plain-regex"


def test_drogon_ip_regex_requires_plaintext_value_when_encryption_disabled(monkeypatch: pytest.MonkeyPatch) -> None:
module = _load_extraction_heuristic_module()
monkeypatch.setenv("S3_LOG_EXTRACTION_ENCRYPT_IP_REGEX", "false")
monkeypatch.delenv("S3_LOG_EXTRACTION_DROGON_IP_REGEX", raising=False)

with pytest.raises(EnvironmentError, match="S3_LOG_EXTRACTION_DROGON_IP_REGEX"):
module._get_drogon_ip_regex()
Loading