Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import pickle
import shlex
import subprocess
import sys
import time
from importlib.util import find_spec
from pathlib import Path

Expand Down Expand Up @@ -142,6 +144,40 @@ def _aiobotocore_anyio_backend_rce_payload(marker: Path) -> tuple[bytes, str]:
return payload, marker_content


def _assert_pickle_payload_executes_in_subprocess(
payload: bytes, marker: Path, marker_content: str, tmp_path: Path
) -> None:
payload_path = tmp_path / "payload.pkl"
payload_path.write_bytes(payload)
process = subprocess.Popen(
[
sys.executable,
"-c",
"import pickle, pathlib, sys; pickle.loads(pathlib.Path(sys.argv[1]).read_bytes())",
str(payload_path),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
deadline = time.monotonic() + 5
while process.poll() is None and time.monotonic() < deadline:
if marker.exists() and marker.read_text() == marker_content:
break
time.sleep(0.01)

if process.poll() is None:
process.terminate()
try:
_, stderr = process.communicate(timeout=1)
except subprocess.TimeoutExpired:
process.kill()
_, stderr = process.communicate()

assert marker.exists(), f"pickle payload did not execute: {stderr.strip()}"
assert marker.read_text() == marker_content


def _has_critical_call_graph_finding(report: PickleReport, module: str, name: str, sink: str) -> bool:
return any(
finding.severity == Severity.CRITICAL
Expand Down Expand Up @@ -233,12 +269,4 @@ def test_scan_bytes_blocks_aiobotocore_anyio_backend_rce(tmp_path: Path) -> None
)

assert not marker.exists()
result = pickle.loads(payload)
assert result == {
"access_key": "A",
"secret_key": "S",
"token": None,
"expiry_time": None,
"account_id": "1",
}
assert marker.read_text() == marker_content
_assert_pickle_payload_executes_in_subprocess(payload, marker, marker_content, tmp_path)
33 changes: 33 additions & 0 deletions tests/scanners/test_tar_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ def test_scan_tar_ignores_extensionless_runpy_near_match(self, tmp_path: Path) -

result = self.scanner.scan(str(archive_path))

assert result.success is True
assert not any(check.name == "Python Archive Member Security" for check in result.checks)

def test_scan_tar_allows_replaced_runpy_execution(self, tmp_path: Path) -> None:
Expand Down Expand Up @@ -877,6 +878,14 @@ def test_scan_tar_marks_unconfirmed_pe_pointer_inconclusive(self, tmp_path: Path
(b"import subprocess\nsubprocess.run(['echo'], check=False)\n", "S103", "subprocess.run"),
(b"import importlib\nimportlib.import_module('os')\n", "S107", "importlib.import_module"),
(b"import runpy\nrunpy.run_path('payload.py')\n", "S108", "runpy.run_path"),
(b"import ctypes\nctypes.CDLL(LIBRARY_PATH)\n", "S110", "ctypes.CDLL"),
(b"from ctypes import CDLL as load\nload(LIBRARY_PATH)\n", "S110", "ctypes.CDLL"),
(b"import webbrowser\nwebbrowser.open('https://example.invalid')\n", "S109", "webbrowser.open"),
(
b"from webbrowser import open_new_tab as launch\nlaunch('https://example.invalid')\n",
"S109",
"webbrowser.open_new_tab",
),
(b"eval('1 + 1')\n", "S104", "eval"),
(b"import pickle\npickle.loads(b'\\x80\\x04N.')\n", "S213", "pickle.loads"),
],
Expand All @@ -886,6 +895,7 @@ def test_scan_tar_python_member_emits_accurate_rule_code(
) -> None:
"""Each risk category must surface its own rule code (os.system as S101, etc.)."""
archive_path = tmp_path / "model_bundle.tar"
source = source.replace(b"LIBRARY_PATH", repr(str(tmp_path / "libpayload.so")).encode())

with tarfile.open(archive_path, "w") as archive:
info = tarfile.TarInfo("handler.py")
Expand All @@ -900,6 +910,29 @@ def test_scan_tar_python_member_emits_accurate_rule_code(
assert python_checks[0].rule_code == expected_rule_code
assert expected_call in python_checks[0].details["reason"]

@pytest.mark.parametrize(
"source",
[
b"from ctypes import CDLL as load\nload = len\nload([])\n",
b"import ctypes\nctypes.CDLL = len\nctypes.CDLL([])\n",
b"from webbrowser import open as launch\nlaunch = len\nlaunch([])\n",
b"import webbrowser\nwebbrowser.open = len\nwebbrowser.open([])\n",
],
)
def test_scan_tar_allows_shadowed_direct_python_member_primitives(self, tmp_path: Path, source: bytes) -> None:
"""Safe final bindings should not become ctypes or browser findings."""
archive_path = tmp_path / "model_bundle.tar"

with tarfile.open(archive_path, "w") as archive:
info = tarfile.TarInfo("handler.py")
info.size = len(source)
archive.addfile(info, tarfile.io.BytesIO(source)) # type: ignore[attr-defined]

result = self.scanner.scan(str(archive_path))

assert result.success is True
assert not any(check.name == "Python Archive Member Security" for check in result.checks)
Comment thread
mldangelo-oai marked this conversation as resolved.

def test_path_traversal_detection(self):
"""Test detection of path traversal attempts"""
with tempfile.NamedTemporaryFile(suffix=".tar", delete=False) as tmp:
Expand Down
41 changes: 41 additions & 0 deletions tests/scanners/test_torchserve_mar_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,9 +771,25 @@ def test_scan_detects_aliased_getattr_wrapped_handler_execution_primitive(
b"from runpy import run_path as run\ndef handle(data, context):\n return run('payload.py')\n",
"runpy.run_path",
),
(b"import ctypes\ndef handle(data, context):\n return ctypes.CDLL(LIBRARY_PATH)\n", "ctypes.CDLL"),
(
b"from ctypes import cdll as loader\ndef handle(data, context):\n"
b" return loader.LoadLibrary(LIBRARY_PATH)\n",
"ctypes.cdll.LoadLibrary",
),
(
b"import webbrowser\ndef handle(data, context):\n return webbrowser.open('https://example.invalid')\n",
"webbrowser.open",
),
(
b"from webbrowser import open_new_tab as launch\ndef handle(data, context):\n"
b" return launch('https://example.invalid')\n",
"webbrowser.open_new_tab",
),
],
)
def test_scan_detects_handler_execution_primitive(tmp_path: Path, handler_source: bytes, dangerous_name: str) -> None:
handler_source = handler_source.replace(b"LIBRARY_PATH", repr(str(tmp_path / "libpayload.so")).encode())
manifest = {"model": {"handler": "handler.py", "serializedFile": "weights.bin"}}
mar_path = _create_mar_archive(
tmp_path,
Expand Down Expand Up @@ -804,6 +820,31 @@ def test_scan_allows_replaced_runpy_handler_api(tmp_path: Path) -> None:

result = TorchServeMarScanner().scan(str(mar_path))

assert result.success is True
assert _failed_checks(result, "TorchServe Handler Static Analysis") == []


@pytest.mark.parametrize(
"handler_source",
[
b"from ctypes import CDLL as load\ndef handle(data, context):\n load = len\n return load([])\n",
b"import ctypes\ndef handle(data, context):\n ctypes.CDLL = len\n return ctypes.CDLL([])\n",
b"from webbrowser import open as launch\ndef handle(data, context):\n launch = len\n return launch([])\n",
b"import webbrowser\ndef handle(data, context):\n webbrowser.open = len\n return webbrowser.open([])\n",
],
)
def test_scan_allows_shadowed_direct_handler_primitives(tmp_path: Path, handler_source: bytes) -> None:
manifest = {"model": {"handler": "handler.py", "serializedFile": "weights.bin"}}
mar_path = _create_mar_archive(
tmp_path,
manifest=manifest,
entries={"handler.py": handler_source, "weights.bin": b"weights"},
filename="safe_shadowed_direct_handler.mar",
)

result = TorchServeMarScanner().scan(str(mar_path))

assert result.success is True
assert _failed_checks(result, "TorchServe Handler Static Analysis") == []
Comment thread
mldangelo-oai marked this conversation as resolved.


Expand Down
56 changes: 56 additions & 0 deletions tests/scanners/test_zip_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,39 @@ def test_scan_zip_flags_runpy_execution_python_member(tmp_path: Path, source: st
assert python_checks[0].details["reason"] == f"high-risk calls: {dangerous_name}"


@pytest.mark.parametrize(
("source", "rule_code", "dangerous_name"),
[
("import ctypes\nctypes.CDLL(LIBRARY_PATH)\n", "S110", "ctypes.CDLL"),
("from ctypes import CDLL as load_library\nload_library(LIBRARY_PATH)\n", "S110", "ctypes.CDLL"),
("import webbrowser\nwebbrowser.open('https://example.invalid')\n", "S109", "webbrowser.open"),
(
"from webbrowser import open_new_tab as launch\nlaunch('https://example.invalid')\n",
"S109",
"webbrowser.open_new_tab",
),
],
)
def test_scan_zip_flags_direct_imported_python_member_primitives(
tmp_path: Path, source: str, rule_code: str, dangerous_name: str
) -> None:
archive_path = tmp_path / "model_bundle.zip"
source = source.replace("LIBRARY_PATH", repr(str(tmp_path / "libpayload.so")))
with zipfile.ZipFile(archive_path, "w") as archive:
archive.writestr("handler.py", source)

result = ZipScanner().scan(str(archive_path))

python_checks = [
check
for check in result.checks
if check.name == "Python Archive Member Security" and check.status == CheckStatus.FAILED
]
assert len(python_checks) == 1
assert python_checks[0].rule_code == rule_code
assert python_checks[0].details["reason"] == f"high-risk calls: {dangerous_name}"


def test_scan_zip_flags_webbrowser_and_ctypes_python_member(tmp_path: Path) -> None:
archive_path = tmp_path / "model_bundle.zip"
source = (
Expand Down Expand Up @@ -894,6 +927,7 @@ def test_scan_zip_honors_safe_dynamic_member_aliases_and_method_overwrites(tmp_p

result = ZipScanner().scan(str(archive_path))

assert result.success is True
assert not any(
check.name == "Python Archive Member Security" and check.status == CheckStatus.FAILED for check in result.checks
)
Expand Down Expand Up @@ -3482,6 +3516,28 @@ def test_scan_zip_preserves_safe_runpy_overwrite_before_conditional(tmp_path: Pa
)


@pytest.mark.parametrize(
"source",
[
"from ctypes import CDLL as load\nload = len\nload([])\n",
"import ctypes\nctypes.CDLL = len\nctypes.CDLL([])\n",
"from webbrowser import open as launch\nlaunch = len\nlaunch([])\n",
"import webbrowser\nwebbrowser.open = len\nwebbrowser.open([])\n",
],
)
def test_scan_zip_allows_shadowed_direct_python_member_primitives(tmp_path: Path, source: str) -> None:
archive_path = tmp_path / "model_bundle.zip"
with zipfile.ZipFile(archive_path, "w") as archive:
archive.writestr("handler.py", source)

result = ZipScanner().scan(str(archive_path))

assert result.success is True
assert not any(
check.name == "Python Archive Member Security" and check.status == CheckStatus.FAILED for check in result.checks
)
Comment thread
mldangelo-oai marked this conversation as resolved.


@pytest.mark.parametrize(
"source",
[
Expand Down
Loading