Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- redact compound credential names and malformed userinfo URLs in scanner evidence
- restrict JFrog credential forwarding to explicitly trusted HTTPS hosts
- block weight distribution `torch.load` on PyTorch prerelease and unknown versions before deserialization
- fail closed on unverified Keras ZIP `StringLookup` vocabulary paths and redact remote URL evidence
- fail closed when cloud directory metadata cannot be read for every listed object
- treat prereleases of fixed Keras ZIP CVE-2026-1669 versions as vulnerable
- fail closed on interpolation-bearing NeMo Hydra `_target_` selectors whose resolved callable cannot be verified
Expand Down
106 changes: 52 additions & 54 deletions modelaudit/scanners/keras_zip_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import zipfile
from pathlib import Path
from typing import Any, ClassVar
from urllib.parse import urlsplit, urlunsplit

from modelaudit.detectors.network_comm import redact_url_for_finding
from modelaudit.detectors.suspicious_symbols import (
SUSPICIOUS_CONFIG_PROPERTIES,
SUSPICIOUS_LAYER_TYPES,
Expand Down Expand Up @@ -95,7 +95,6 @@
re.IGNORECASE,
)
_URL_SCHEME_PATTERN = re.compile(r"^[a-zA-Z][a-zA-Z0-9+.-]*://")
_WINDOWS_ABSOLUTE_PATH_PATTERN = re.compile(r"^(?:[a-zA-Z]:[\\/]|\\\\)")
_KERAS_CONFIG_ENTRY = "config.json"
_KERAS_CONFIG_MAX_BYTES = 10 * 1024 * 1024

Expand All @@ -117,6 +116,9 @@ def _has_get_file_reference(values: list[str]) -> bool:
_KERAS_METADATA_ENTRY = "metadata.json"
_KERAS_METADATA_MAX_BYTES = 10 * 1024 * 1024
_KERAS_WEIGHTS_ENTRY = "model.weights.h5"
_KERAS_STRINGLOOKUP_EXTERNAL_VOCABULARY_INCONCLUSIVE_REASON = (
"keras_zip_stringlookup_external_vocabulary_metadata_inconclusive"
)
_KERAS_RELEASE_VERSION_PATTERN = re.compile(r"^\s*(\d+)\.(\d+)(?:\.(\d+))?([A-Za-z0-9.+_-]*)\s*$")
_KERAS_PRERELEASE_SUFFIX_PATTERN = re.compile(
r"(?i)^[._-]?(?:"
Expand All @@ -134,21 +136,7 @@ def _has_get_file_reference(values: list[str]) -> bool:


def _redact_url_for_display(url: str) -> str:
try:
parsed = urlsplit(url)
port = parsed.port
except ValueError:
return "[invalid-url]"

if not parsed.scheme or not parsed.hostname:
return "[invalid-url]"

hostname = parsed.hostname
if ":" in hostname and not hostname.startswith("["):
hostname = f"[{hostname}]"

netloc = f"{hostname}:{port}" if port is not None else hostname
return urlunsplit((parsed.scheme, netloc, parsed.path, "", ""))
return redact_url_for_finding(url)


try:
Expand Down Expand Up @@ -1700,15 +1688,18 @@ def _check_stringlookup_vocabulary_path(self, layer: dict[str, Any], result: Sca
return

vocabulary = layer_config.get("vocabulary")
if not self._is_external_stringlookup_vocabulary(vocabulary):
if not isinstance(vocabulary, str) or not self._is_external_stringlookup_vocabulary(vocabulary):
return

keras_version = result.metadata.get("keras_version")
display_vocabulary = (
_redact_url_for_display(vocabulary) if _URL_SCHEME_PATTERN.match(vocabulary.strip()) else vocabulary
)
location = f"{self.current_file_path} (layer: {layer_name})"
details = {
"layer_name": layer_name,
"layer_class": "StringLookup",
"vocabulary": vocabulary,
"vocabulary": display_vocabulary,
"cve_id": "CVE-2025-12058",
"cvss": 5.9,
"cwe": "CWE-502, CWE-918",
Expand All @@ -1717,16 +1708,20 @@ def _check_stringlookup_vocabulary_path(self, layer: dict[str, Any], result: Sca
".keras archive is loaded."
),
"remediation": "Upgrade Keras to >= 3.12.0 and avoid loading models with external vocabulary paths.",
"affected_versions": "Keras < 3.12.0",
}
vulnerability_status = (
self._is_vulnerable_to_cve_2025_12058(keras_version) if isinstance(keras_version, str) else None
)

if isinstance(keras_version, str) and self._is_vulnerable_to_cve_2025_12058(keras_version):
if vulnerability_status is True:
details["keras_version"] = keras_version
result.add_check(
name="CVE-2025-12058: StringLookup External Vocabulary Path",
passed=False,
message=(
f"CVE-2025-12058: StringLookup layer '{layer_name}' in Keras {keras_version} references "
f"external vocabulary path '{vocabulary}', which can expose local files or trigger SSRF "
f"external vocabulary path '{display_vocabulary}', which can expose local files or trigger SSRF "
"during model loading"
),
severity=IssueSeverity.WARNING,
Expand All @@ -1736,54 +1731,54 @@ def _check_stringlookup_vocabulary_path(self, layer: dict[str, Any], result: Sca
)
return

if isinstance(keras_version, str):
if vulnerability_status is False:
details["keras_version"] = keras_version
details["metadata_only_assessment"] = True
details["parse_status"] = "metadata_non_vulnerable"
details["analysis_incomplete"] = True
details["scan_outcome_reason"] = _KERAS_STRINGLOOKUP_EXTERNAL_VOCABULARY_INCONCLUSIVE_REASON
self._mark_inconclusive_scan_result(
result,
_KERAS_STRINGLOOKUP_EXTERNAL_VOCABULARY_INCONCLUSIVE_REASON,
)
result.add_check(
name="StringLookup External Vocabulary Metadata Check",
passed=False,
message=(
f"StringLookup layer '{layer_name}' references external vocabulary path '{vocabulary}', "
f"StringLookup layer '{layer_name}' references external vocabulary path '{display_vocabulary}', "
f"and archive metadata reports Keras {keras_version} outside the known CVE-2025-12058 "
"vulnerable range (<3.12.0), but metadata-only assessment is inconclusive without runtime "
"verification"
),
severity=IssueSeverity.INFO,
location=location,
details=details,
why=get_cve_2025_12058_explanation("stringlookup_external_vocabulary"),
)
return

if isinstance(keras_version, str):
details["keras_version"] = keras_version
version_context = f"keras_version '{keras_version}' is non-canonical"
else:
version_context = "keras_version is unavailable"
result.add_check(
name="StringLookup External Vocabulary Risk (Version Unknown)",
passed=False,
message=(
f"StringLookup layer '{layer_name}' references external vocabulary path '{vocabulary}', but "
"keras_version is unavailable; cannot confidently attribute CVE-2025-12058 without version context"
f"StringLookup layer '{layer_name}' references external vocabulary path '{display_vocabulary}', but "
f"{version_context}; cannot confidently attribute CVE-2025-12058 without version context"
),
severity=IssueSeverity.WARNING,
location=location,
details=details | {"affected_versions": "Keras < 3.12.0"},
details=details,
why=get_cve_2025_12058_explanation("stringlookup_external_vocabulary"),
)

@staticmethod
def _is_external_stringlookup_vocabulary(vocabulary: Any) -> bool:
"""Return True only for scalar vocabulary strings that clearly point outside the archive."""
if not isinstance(vocabulary, str):
return False

candidate = vocabulary.strip()
if not candidate:
return False

normalized = candidate.replace("\\", "/")
return (
bool(_URL_SCHEME_PATTERN.match(candidate))
or candidate.startswith("/")
or normalized.startswith("~/")
or bool(_WINDOWS_ABSOLUTE_PATH_PATTERN.match(candidate))
or normalized.startswith("../")
or "/../" in normalized
)
"""Return whether Keras interprets the vocabulary value as an external path."""
return isinstance(vocabulary, str) and bool(vocabulary.strip())

def _check_embedded_hdf5_weights_external_references(self, archive: zipfile.ZipFile, result: ScanResult) -> None:
"""Detect CVE-2026-1669 external HDF5 references inside embedded .keras weights."""
Expand Down Expand Up @@ -2304,27 +2299,30 @@ def _classify_keras_release_suffix(suffix: str) -> bool | None:
return None

@staticmethod
def _is_vulnerable_to_cve_2025_12058(version: str) -> bool:
"""Return True for Keras versions lower than 3.12.0, including prereleases of 3.12.0."""
version_match = re.match(r"^(\d+)\.(\d+)(?:\.(\d+))?([A-Za-z0-9.+-]*)$", version.strip())
def _is_vulnerable_to_cve_2025_12058(version: str) -> bool | None:
"""Classify Keras versions lower than 3.12.0, including fixed-boundary prereleases."""
version_match = _KERAS_RELEASE_VERSION_PATTERN.match(version)
if not version_match:
return False
return None

try:
major = int(version_match.group(1))
minor = int(version_match.group(2))
patch = int(version_match.group(3) or 0)
suffix = (version_match.group(4) or "").strip().lower()
except ValueError:
return None

parsed = (major, minor, patch)
if parsed < (3, 12, 0):
return True
if parsed > (3, 12, 0):
return False
parsed = (major, minor, patch)
if parsed < (3, 12, 0):
return True

return bool(re.search(r"(?:^|[.\-])(dev|rc|a|b|alpha|beta|pre|preview)\d*", suffix))
except ValueError:
suffix_status = KerasZipScanner._classify_keras_release_suffix(suffix)
if suffix_status is None:
return None
if parsed > (3, 12, 0):
return False
return suffix_status

@staticmethod
def _is_vulnerable_to_cve_2026_1669(version: str) -> bool | None:
Expand Down
Loading
Loading