Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Bug Fixes

- treat Keras Lambda CVE fixed-boundary prereleases as vulnerable
- scan ONNX external data references in sparse initializers, tensor-valued attributes, and function defaults
- avoid false-positive process-launch findings for parsed framed Python string literals
- detect dangerous Python calls retrieved through module namespace dictionaries in ZIP and TAR members
Expand Down
76 changes: 56 additions & 20 deletions modelaudit/scanners/keras_zip_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,26 @@ def _has_get_file_reference(values: list[str]) -> bool:
_KERAS_WEIGHTS_ENTRY = "model.weights.h5"
_KERAS_RELEASE_VERSION_PATTERN = re.compile(r"^\s*(\d+)\.(\d+)(?:\.(\d+))?([A-Za-z0-9.+_-]*)\s*$")
_KERAS_PRERELEASE_SUFFIX_PATTERN = re.compile(r"(?i)^(?:a|alpha|b|beta|c|rc|pre|preview|dev)")
_KERAS_LOCAL_VERSION_SUFFIX = r"\+[a-z0-9]+(?:[._-][a-z0-9]+)*"
_KERAS_FIXED_BOUNDARY_PRERELEASE_SUFFIX_PATTERN = re.compile(
rf"(?i)^[._-]?(?:(?:a|alpha|b|beta|c|rc|pre|preview)\d*"
rf"(?:[._-]?(?:post|rev|r)\d*)?(?:[._-]?dev\d*)?|dev\d*)(?:{_KERAS_LOCAL_VERSION_SUFFIX})?$"
)
_KERAS_FIXED_BOUNDARY_POST_OR_LOCAL_SUFFIX_PATTERN = re.compile(
rf"(?i)^(?:{_KERAS_LOCAL_VERSION_SUFFIX}|"
rf"[._-]?(?:post|rev|r)\d*(?:[._-]?dev\d*)?(?:{_KERAS_LOCAL_VERSION_SUFFIX})?)$"
)


def _classify_keras_fixed_boundary_suffix(suffix: str) -> bool | None:
"""Return whether a fixed-boundary suffix is a prerelease, or None when non-canonical."""
if not suffix:
return False
if _KERAS_FIXED_BOUNDARY_PRERELEASE_SUFFIX_PATTERN.fullmatch(suffix):
return True
if _KERAS_FIXED_BOUNDARY_POST_OR_LOCAL_SUFFIX_PATTERN.fullmatch(suffix):
return False
return None


def _redact_url_for_display(url: str) -> str:
Expand Down Expand Up @@ -802,7 +822,10 @@ def _scan_model_config(self, model_config: dict[str, Any], result: ScanResult) -
if is_lambda_layer:
self._check_lambda_layer(layer, result, layer_name)
keras_version = result.metadata.get("keras_version")
if isinstance(keras_version, str) and self._is_vulnerable_to_cve_2024_3660(keras_version):
vulnerability_status = (
self._is_vulnerable_to_cve_2024_3660(keras_version) if isinstance(keras_version, str) else None
)
if vulnerability_status is True:
# CVE-2024-3660: Lambda layers enable arbitrary code injection
result.add_check(
name="CVE-2024-3660: Lambda Layer Code Injection",
Expand All @@ -825,7 +848,7 @@ def _scan_model_config(self, model_config: dict[str, Any], result: ScanResult) -
},
why=get_cve_2024_3660_explanation("lambda_code_injection"),
)
elif isinstance(keras_version, str):
elif vulnerability_status is False:
result.add_check(
name="Lambda Version Risk Check",
passed=True,
Expand All @@ -837,21 +860,33 @@ def _scan_model_config(self, model_config: dict[str, Any], result: ScanResult) -
details={"layer_name": layer_name, "layer_class": "Lambda", "keras_version": keras_version},
)
else:
version_context = (
f"keras_version '{keras_version}' is non-canonical"
if isinstance(keras_version, str)
else "keras_version is unavailable"
)
result.add_check(
name="Lambda Risk (Version Unknown)",
passed=False,
message=(
f"Lambda layer '{layer_name}' detected but keras_version is unavailable; "
"cannot confidently attribute CVE-2024-3660 without version context"
f"Lambda layer '{layer_name}' detected but {version_context}; "
"cannot confidently attribute CVE-2024-3660 without reliable version context"
),
severity=IssueSeverity.WARNING,
location=f"{self.current_file_path} (layer: {layer_name})",
details={
"layer_name": layer_name,
"layer_class": "Lambda",
"keras_version": keras_version,
"version_parse_status": "unknown",
"cve_id": "CVE-2024-3660",
"cvss": 9.8,
"cwe": "CWE-94",
"description": "Lambda layer deserialization can enable arbitrary code injection.",
"remediation": "Remove Lambda layers or upgrade Keras to >= 2.13",
"affected_versions": "Keras < 2.13.0",
},
why=get_cve_2024_3660_explanation("lambda_code_injection"),
)
elif layer_class in self.suspicious_layer_types:
result.add_check(
Expand Down Expand Up @@ -1968,25 +2003,26 @@ def _check_lambda_layer(self, layer: dict[str, Any], result: ScanResult, layer_n
)

@staticmethod
def _is_vulnerable_to_cve_2024_3660(version: str) -> bool:
"""Return True for Keras versions lower than 2.13.0.
def _is_vulnerable_to_cve_2024_3660(version: str) -> bool | None:
"""Return True for Keras versions lower than 2.13.0, including prereleases of 2.13.0."""
version_match = _KERAS_RELEASE_VERSION_PATTERN.match(version)
if not version_match:
return None

Handles two-part versions (e.g. "2.10") by treating missing patch as 0.
"""
parts = version.split(".", 2)
if len(parts) < 2:
return False
try:
major = int(parts[0])
minor = int(parts[1])
patch = 0
if len(parts) == 3:
patch_digits = "".join(ch for ch in parts[2] if ch.isdigit())
if patch_digits:
patch = int(patch_digits)
return (major, minor, patch) < (2, 13, 0)
major = int(version_match.group(1))
minor = int(version_match.group(2))
patch = int(version_match.group(3) or 0)
suffix = (version_match.group(4) or "").strip().lower()

parsed = (major, minor, patch)
if parsed < (2, 13, 0):
return True
if parsed > (2, 13, 0):
return False
return _classify_keras_fixed_boundary_suffix(suffix)
except ValueError:
return False
return None

@staticmethod
def _is_vulnerable_to_cve_2025_12058(version: str) -> bool:
Expand Down
111 changes: 111 additions & 0 deletions tests/scanners/test_keras_zip_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3550,6 +3550,117 @@ def test_no_cve_for_fixed_keras_version(self, tmp_path: Path) -> None:
cve_issues = [i for i in result.issues if "CVE-2024-3660" in i.message]
assert len(cve_issues) == 0

@pytest.mark.parametrize(
"prerelease_version",
["2.13.0a0", "2.13.0rc1", "2.13.0.dev0", "2.13.0_c1", "2.13.0-rc1", "2.13.0rc1+local"],
)
def test_lambda_cve_for_fixed_boundary_prereleases(self, tmp_path: Path, prerelease_version: str) -> None:
"""Prereleases of the fixed 2.13.0 boundary should remain CVE-attributed."""
scanner = KerasZipScanner()
encoded = base64.b64encode(b"lambda x: x * 2").decode()
config = {
"class_name": "Sequential",
"config": {
"layers": [
{
"class_name": "Lambda",
"name": "my_lambda",
"config": {"function": [encoded, None, None]},
}
]
},
}

result = scanner.scan(self._make_keras_zip(config, tmp_path, keras_version=prerelease_version))

cve_issues = [issue for issue in result.issues if issue.details.get("cve_id") == "CVE-2024-3660"]
assert len(cve_issues) == 1
assert cve_issues[0].severity == IssueSeverity.CRITICAL
assert cve_issues[0].details["keras_version"] == prerelease_version

@pytest.mark.parametrize("fixed_version", ["2.13.0", "2.13.0+local", "2.13.0.post1", "2.13.0_post1"])
def test_lambda_no_cve_for_fixed_boundary_final_local_or_post(
self,
tmp_path: Path,
fixed_version: str,
) -> None:
"""Final, local, and post fixed-boundary versions should not get CVE attribution."""
scanner = KerasZipScanner()
encoded = base64.b64encode(b"lambda x: x * 2").decode()
config = {
"class_name": "Sequential",
"config": {
"layers": [
{
"class_name": "Lambda",
"name": "my_lambda",
"config": {"function": [encoded, None, None]},
}
]
},
}

result = scanner.scan(self._make_keras_zip(config, tmp_path, keras_version=fixed_version))

cve_issues = [issue for issue in result.issues if issue.details.get("cve_id") == "CVE-2024-3660"]
assert cve_issues == []
version_checks = [check for check in result.checks if check.name == "Lambda Version Risk Check"]
assert len(version_checks) == 1
assert version_checks[0].status == CheckStatus.PASSED

@pytest.mark.parametrize("vulnerable_version", ["2.12.0_rc1", "2.12.0_post1"])
def test_lambda_cve_for_underscore_vulnerable_version(self, tmp_path: Path, vulnerable_version: str) -> None:
"""Underscore-separated metadata below the fixed boundary must remain vulnerable."""
scanner = KerasZipScanner()
encoded = base64.b64encode(b"lambda x: x * 2").decode()
config = {
"class_name": "Sequential",
"config": {
"layers": [
{
"class_name": "Lambda",
"name": "my_lambda",
"config": {"function": [encoded, None, None]},
}
]
},
}

result = scanner.scan(self._make_keras_zip(config, tmp_path, keras_version=vulnerable_version))

cve_issues = [issue for issue in result.issues if issue.details.get("cve_id") == "CVE-2024-3660"]
assert len(cve_issues) == 1
assert cve_issues[0].severity == IssueSeverity.CRITICAL
assert cve_issues[0].details["keras_version"] == vulnerable_version

@pytest.mark.parametrize("noncanonical_version", ["2.13.0rc1junk", "2.13.0preflight", "not-a-version"])
def test_lambda_noncanonical_version_warns_unknown(self, tmp_path: Path, noncanonical_version: str) -> None:
"""Malformed fixed-boundary suffixes must not produce CVE or clean-version false positives."""
scanner = KerasZipScanner()
encoded = base64.b64encode(b"lambda x: x * 2").decode()
config = {
"class_name": "Sequential",
"config": {
"layers": [
{
"class_name": "Lambda",
"name": "my_lambda",
"config": {"function": [encoded, None, None]},
}
]
},
}

result = scanner.scan(self._make_keras_zip(config, tmp_path, keras_version=noncanonical_version))

cve_issues = [issue for issue in result.issues if issue.details.get("cve_id") == "CVE-2024-3660"]
assert len(cve_issues) == 1
assert cve_issues[0].severity == IssueSeverity.WARNING
assert cve_issues[0].details["keras_version"] == noncanonical_version
assert cve_issues[0].details["version_parse_status"] == "unknown"
assert "non-canonical" in cve_issues[0].message
assert all(check.name != "Lambda Version Risk Check" for check in result.checks)

def test_cve_for_two_part_keras_version(self, tmp_path: Path) -> None:
"""Lambda in Keras 2.10 (two-part version) should be CVE-attributed."""
scanner = KerasZipScanner()
Expand Down
Loading