diff --git a/CHANGELOG.md b/CHANGELOG.md index 0bc60b316..e1605ec2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - redact compound credential names and malformed userinfo URLs in scanner evidence - restrict JFrog credential forwarding to explicitly trusted HTTPS hosts - block weight distribution `torch.load` on PyTorch prerelease and unknown versions before deserialization +- fail closed on unverified Keras ZIP `StringLookup` vocabulary paths and redact remote URL evidence - fail closed when cloud directory metadata cannot be read for every listed object - treat prereleases of fixed Keras ZIP CVE-2026-1669 versions as vulnerable - fail closed on interpolation-bearing NeMo Hydra `_target_` selectors whose resolved callable cannot be verified diff --git a/modelaudit/scanners/keras_zip_scanner.py b/modelaudit/scanners/keras_zip_scanner.py index 8f153451c..bd4a91317 100644 --- a/modelaudit/scanners/keras_zip_scanner.py +++ b/modelaudit/scanners/keras_zip_scanner.py @@ -8,8 +8,8 @@ import zipfile from pathlib import Path from typing import Any, ClassVar -from urllib.parse import urlsplit, urlunsplit +from modelaudit.detectors.network_comm import redact_url_for_finding from modelaudit.detectors.suspicious_symbols import ( SUSPICIOUS_CONFIG_PROPERTIES, SUSPICIOUS_LAYER_TYPES, @@ -95,7 +95,6 @@ re.IGNORECASE, ) _URL_SCHEME_PATTERN = re.compile(r"^[a-zA-Z][a-zA-Z0-9+.-]*://") -_WINDOWS_ABSOLUTE_PATH_PATTERN = re.compile(r"^(?:[a-zA-Z]:[\\/]|\\\\)") _KERAS_CONFIG_ENTRY = "config.json" _KERAS_CONFIG_MAX_BYTES = 10 * 1024 * 1024 @@ -117,6 +116,9 @@ def _has_get_file_reference(values: list[str]) -> bool: _KERAS_METADATA_ENTRY = "metadata.json" _KERAS_METADATA_MAX_BYTES = 10 * 1024 * 1024 _KERAS_WEIGHTS_ENTRY = "model.weights.h5" +_KERAS_STRINGLOOKUP_EXTERNAL_VOCABULARY_INCONCLUSIVE_REASON = ( + "keras_zip_stringlookup_external_vocabulary_metadata_inconclusive" +) _KERAS_RELEASE_VERSION_PATTERN = re.compile(r"^\s*(\d+)\.(\d+)(?:\.(\d+))?([A-Za-z0-9.+_-]*)\s*$") _KERAS_PRERELEASE_SUFFIX_PATTERN = re.compile( r"(?i)^[._-]?(?:" @@ -134,21 +136,7 @@ def _has_get_file_reference(values: list[str]) -> bool: def _redact_url_for_display(url: str) -> str: - try: - parsed = urlsplit(url) - port = parsed.port - except ValueError: - return "[invalid-url]" - - if not parsed.scheme or not parsed.hostname: - return "[invalid-url]" - - hostname = parsed.hostname - if ":" in hostname and not hostname.startswith("["): - hostname = f"[{hostname}]" - - netloc = f"{hostname}:{port}" if port is not None else hostname - return urlunsplit((parsed.scheme, netloc, parsed.path, "", "")) + return redact_url_for_finding(url) try: @@ -1700,15 +1688,18 @@ def _check_stringlookup_vocabulary_path(self, layer: dict[str, Any], result: Sca return vocabulary = layer_config.get("vocabulary") - if not self._is_external_stringlookup_vocabulary(vocabulary): + if not isinstance(vocabulary, str) or not self._is_external_stringlookup_vocabulary(vocabulary): return keras_version = result.metadata.get("keras_version") + display_vocabulary = ( + _redact_url_for_display(vocabulary) if _URL_SCHEME_PATTERN.match(vocabulary.strip()) else vocabulary + ) location = f"{self.current_file_path} (layer: {layer_name})" details = { "layer_name": layer_name, "layer_class": "StringLookup", - "vocabulary": vocabulary, + "vocabulary": display_vocabulary, "cve_id": "CVE-2025-12058", "cvss": 5.9, "cwe": "CWE-502, CWE-918", @@ -1717,16 +1708,20 @@ def _check_stringlookup_vocabulary_path(self, layer: dict[str, Any], result: Sca ".keras archive is loaded." ), "remediation": "Upgrade Keras to >= 3.12.0 and avoid loading models with external vocabulary paths.", + "affected_versions": "Keras < 3.12.0", } + vulnerability_status = ( + self._is_vulnerable_to_cve_2025_12058(keras_version) if isinstance(keras_version, str) else None + ) - if isinstance(keras_version, str) and self._is_vulnerable_to_cve_2025_12058(keras_version): + if vulnerability_status is True: details["keras_version"] = keras_version result.add_check( name="CVE-2025-12058: StringLookup External Vocabulary Path", passed=False, message=( f"CVE-2025-12058: StringLookup layer '{layer_name}' in Keras {keras_version} references " - f"external vocabulary path '{vocabulary}', which can expose local files or trigger SSRF " + f"external vocabulary path '{display_vocabulary}', which can expose local files or trigger SSRF " "during model loading" ), severity=IssueSeverity.WARNING, @@ -1736,13 +1731,21 @@ def _check_stringlookup_vocabulary_path(self, layer: dict[str, Any], result: Sca ) return - if isinstance(keras_version, str): + if vulnerability_status is False: details["keras_version"] = keras_version + details["metadata_only_assessment"] = True + details["parse_status"] = "metadata_non_vulnerable" + details["analysis_incomplete"] = True + details["scan_outcome_reason"] = _KERAS_STRINGLOOKUP_EXTERNAL_VOCABULARY_INCONCLUSIVE_REASON + self._mark_inconclusive_scan_result( + result, + _KERAS_STRINGLOOKUP_EXTERNAL_VOCABULARY_INCONCLUSIVE_REASON, + ) result.add_check( name="StringLookup External Vocabulary Metadata Check", passed=False, message=( - f"StringLookup layer '{layer_name}' references external vocabulary path '{vocabulary}', " + f"StringLookup layer '{layer_name}' references external vocabulary path '{display_vocabulary}', " f"and archive metadata reports Keras {keras_version} outside the known CVE-2025-12058 " "vulnerable range (<3.12.0), but metadata-only assessment is inconclusive without runtime " "verification" @@ -1750,40 +1753,32 @@ def _check_stringlookup_vocabulary_path(self, layer: dict[str, Any], result: Sca severity=IssueSeverity.INFO, location=location, details=details, + why=get_cve_2025_12058_explanation("stringlookup_external_vocabulary"), ) return + if isinstance(keras_version, str): + details["keras_version"] = keras_version + version_context = f"keras_version '{keras_version}' is non-canonical" + else: + version_context = "keras_version is unavailable" result.add_check( name="StringLookup External Vocabulary Risk (Version Unknown)", passed=False, message=( - f"StringLookup layer '{layer_name}' references external vocabulary path '{vocabulary}', but " - "keras_version is unavailable; cannot confidently attribute CVE-2025-12058 without version context" + f"StringLookup layer '{layer_name}' references external vocabulary path '{display_vocabulary}', but " + f"{version_context}; cannot confidently attribute CVE-2025-12058 without version context" ), severity=IssueSeverity.WARNING, location=location, - details=details | {"affected_versions": "Keras < 3.12.0"}, + details=details, + why=get_cve_2025_12058_explanation("stringlookup_external_vocabulary"), ) @staticmethod def _is_external_stringlookup_vocabulary(vocabulary: Any) -> bool: - """Return True only for scalar vocabulary strings that clearly point outside the archive.""" - if not isinstance(vocabulary, str): - return False - - candidate = vocabulary.strip() - if not candidate: - return False - - normalized = candidate.replace("\\", "/") - return ( - bool(_URL_SCHEME_PATTERN.match(candidate)) - or candidate.startswith("/") - or normalized.startswith("~/") - or bool(_WINDOWS_ABSOLUTE_PATH_PATTERN.match(candidate)) - or normalized.startswith("../") - or "/../" in normalized - ) + """Return whether Keras interprets the vocabulary value as an external path.""" + return isinstance(vocabulary, str) and bool(vocabulary.strip()) def _check_embedded_hdf5_weights_external_references(self, archive: zipfile.ZipFile, result: ScanResult) -> None: """Detect CVE-2026-1669 external HDF5 references inside embedded .keras weights.""" @@ -2304,27 +2299,30 @@ def _classify_keras_release_suffix(suffix: str) -> bool | None: return None @staticmethod - def _is_vulnerable_to_cve_2025_12058(version: str) -> bool: - """Return True for Keras versions lower than 3.12.0, including prereleases of 3.12.0.""" - version_match = re.match(r"^(\d+)\.(\d+)(?:\.(\d+))?([A-Za-z0-9.+-]*)$", version.strip()) + def _is_vulnerable_to_cve_2025_12058(version: str) -> bool | None: + """Classify Keras versions lower than 3.12.0, including fixed-boundary prereleases.""" + version_match = _KERAS_RELEASE_VERSION_PATTERN.match(version) if not version_match: - return False + return None try: major = int(version_match.group(1)) minor = int(version_match.group(2)) patch = int(version_match.group(3) or 0) suffix = (version_match.group(4) or "").strip().lower() + except ValueError: + return None - parsed = (major, minor, patch) - if parsed < (3, 12, 0): - return True - if parsed > (3, 12, 0): - return False + parsed = (major, minor, patch) + if parsed < (3, 12, 0): + return True - return bool(re.search(r"(?:^|[.\-])(dev|rc|a|b|alpha|beta|pre|preview)\d*", suffix)) - except ValueError: + suffix_status = KerasZipScanner._classify_keras_release_suffix(suffix) + if suffix_status is None: + return None + if parsed > (3, 12, 0): return False + return suffix_status @staticmethod def _is_vulnerable_to_cve_2026_1669(version: str) -> bool | None: diff --git a/tests/scanners/test_keras_zip_scanner.py b/tests/scanners/test_keras_zip_scanner.py index 5ef205e7c..b93c13edc 100644 --- a/tests/scanners/test_keras_zip_scanner.py +++ b/tests/scanners/test_keras_zip_scanner.py @@ -1560,6 +1560,63 @@ def test_stringlookup_remote_vocabulary_url_triggers_cve_2025_12058(self, tmp_pa assert any(check.details.get("cve_id") == "CVE-2025-12058" for check in result.checks) + def test_stringlookup_remote_vocabulary_url_redacts_credentials(self, tmp_path: Path) -> None: + """Remote vocabulary findings should not preserve credentials or query secrets.""" + scanner = KerasZipScanner() + config = { + "class_name": "Sequential", + "config": { + "layers": [ + { + "class_name": "StringLookup", + "name": "string_lookup", + "config": { + "vocabulary": ( + "https://user:secret@example.com/download/" + "api_key=sk-secret-value/vocab.txt?token=sensitive#fragment" + ) + }, + }, + ], + }, + } + + model_path = create_configured_keras_zip(tmp_path, config, keras_version="3.11.3") + result = scanner.scan(str(model_path)) + + cve_checks = [check for check in result.checks if check.details.get("cve_id") == "CVE-2025-12058"] + assert len(cve_checks) == 1 + assert cve_checks[0].details["vocabulary"] == "https://example.com/download//vocab.txt" + serialized_result = json.dumps({"details": cve_checks[0].details, "message": cve_checks[0].message}) + assert "user" not in serialized_result + assert "secret" not in serialized_result + assert "sensitive" not in serialized_result + assert "fragment" not in serialized_result + + def test_stringlookup_relative_vocabulary_path_triggers_cve_2025_12058(self, tmp_path: Path) -> None: + """Scalar relative vocabulary paths should be treated as external files.""" + scanner = KerasZipScanner() + config = { + "class_name": "Sequential", + "config": { + "layers": [ + { + "class_name": "StringLookup", + "name": "string_lookup", + "config": {"vocabulary": "vocab.txt"}, + }, + ], + }, + } + + model_path = create_configured_keras_zip(tmp_path, config, keras_version="3.11.3") + result = scanner.scan(str(model_path)) + + assert any( + check.details.get("cve_id") == "CVE-2025-12058" and check.status == CheckStatus.FAILED + for check in result.checks + ) + def test_stringlookup_inline_vocabulary_list_stays_clean(self, tmp_path: Path) -> None: """Inline StringLookup vocabularies are benign and should not emit warnings.""" scanner = KerasZipScanner() @@ -1584,9 +1641,20 @@ def test_stringlookup_inline_vocabulary_list_stays_clean(self, tmp_path: Path) - issue for issue in result.issues if issue.severity in (IssueSeverity.WARNING, IssueSeverity.CRITICAL) ] assert noisy_issues == [] + audit_result = scan_model_directory_or_file(str(model_path)) + assert determine_exit_code(audit_result) == 0 + assert not audit_result.issues - def test_stringlookup_external_vocabulary_path_is_passing_on_fixed_keras(self, tmp_path: Path) -> None: - """Fixed-version metadata from the archive is inconclusive, but should not emit warning noise.""" + @pytest.mark.parametrize( + "keras_version", + ["3.12.0", "3.12.0+local", "3.12.0.post1.dev0", "3.12.1.dev0"], + ) + def test_stringlookup_external_vocabulary_path_is_inconclusive_on_fixed_keras( + self, + tmp_path: Path, + keras_version: str, + ) -> None: + """Fixed-version archive metadata is inconclusive and must fail closed operationally.""" scanner = KerasZipScanner() external_vocab_path = tmp_path / "vocab.txt" external_vocab_path.write_text("token\n", encoding="utf-8") @@ -1603,17 +1671,67 @@ def test_stringlookup_external_vocabulary_path_is_passing_on_fixed_keras(self, t }, } - model_path = create_configured_keras_zip(tmp_path, config, keras_version="3.12.0") + model_path = create_configured_keras_zip(tmp_path, config, keras_version=keras_version) result = scanner.scan(str(model_path)) + reason = keras_zip_scanner_module._KERAS_STRINGLOOKUP_EXTERNAL_VOCABULARY_INCONCLUSIVE_REASON cve_checks = [check for check in result.checks if check.details.get("cve_id") == "CVE-2025-12058"] assert len(cve_checks) == 1 assert cve_checks[0].name == "StringLookup External Vocabulary Metadata Check" assert cve_checks[0].status == CheckStatus.FAILED assert cve_checks[0].severity == IssueSeverity.INFO + assert cve_checks[0].details["analysis_incomplete"] is True + assert cve_checks[0].details["scan_outcome_reason"] == reason assert "metadata-only assessment is inconclusive" in cve_checks[0].message + assert result.success is False + assert result.metadata["scan_outcome"] == INCONCLUSIVE_SCAN_OUTCOME + assert reason in result.metadata["scan_outcome_reasons"] assert not any(issue.severity in (IssueSeverity.WARNING, IssueSeverity.CRITICAL) for issue in result.issues) + audit_result = scan_model_directory_or_file(str(model_path)) + metadata = audit_result.file_metadata[str(model_path)] + assert determine_exit_code(audit_result) == 2 + assert metadata.get("scan_outcome") == INCONCLUSIVE_SCAN_OUTCOME + assert reason in metadata.get("scan_outcome_reasons") + assert not any( + issue.severity in (IssueSeverity.WARNING, IssueSeverity.CRITICAL) for issue in audit_result.issues + ) + + def test_stringlookup_external_vocabulary_path_unknown_version_is_warning_exit1(self, tmp_path: Path) -> None: + """Missing Keras version context should remain a warning-level security decision.""" + scanner = KerasZipScanner() + external_vocab_path = tmp_path / "vocab.txt" + external_vocab_path.write_text("token\n", encoding="utf-8") + config = { + "class_name": "Sequential", + "config": { + "layers": [ + { + "class_name": "StringLookup", + "name": "string_lookup", + "config": {"vocabulary": str(external_vocab_path)}, + }, + ], + }, + } + model_path = tmp_path / "model.keras" + with zipfile.ZipFile(model_path, "w") as zf: + zf.writestr("config.json", json.dumps(config)) + + result = scanner.scan(str(model_path)) + + risk_checks = [ + check for check in result.checks if check.name == "StringLookup External Vocabulary Risk (Version Unknown)" + ] + assert len(risk_checks) == 1 + assert risk_checks[0].status == CheckStatus.FAILED + assert risk_checks[0].severity == IssueSeverity.WARNING + assert result.has_warnings is True + assert result.metadata.get("scan_outcome") != INCONCLUSIVE_SCAN_OUTCOME + + audit_result = scan_model_directory_or_file(str(model_path)) + assert determine_exit_code(audit_result) == 1 + def test_stringlookup_windows_home_relative_path_is_detected(self, tmp_path: Path) -> None: """Windows-style home-relative vocabulary paths should be normalized and detected.""" scanner = KerasZipScanner() @@ -1656,7 +1774,7 @@ def test_stringlookup_prerelease_versions_treated_as_vulnerable(self, tmp_path: }, } - for keras_version in ("3.12.0a0", "3.12.0rc1", "3.12.0.dev0"): + for keras_version in ("3.12.0a0", "3.12.0rc1", "3.12.0_c1", "3.12.0.dev0"): model_path = create_configured_keras_zip(tmp_path, config, keras_version=keras_version) result = scanner.scan(str(model_path)) @@ -1665,6 +1783,67 @@ def test_stringlookup_prerelease_versions_treated_as_vulnerable(self, tmp_path: assert cve_checks[0].status == CheckStatus.FAILED assert cve_checks[0].severity == IssueSeverity.WARNING + def test_stringlookup_noncanonical_version_is_warning_exit1(self, tmp_path: Path) -> None: + """Malformed Keras metadata should not be treated as a verified fixed version.""" + scanner = KerasZipScanner() + config = { + "class_name": "Sequential", + "config": { + "layers": [ + { + "class_name": "StringLookup", + "name": "string_lookup", + "config": {"vocabulary": "vocab.txt"}, + }, + ], + }, + } + + model_path = create_configured_keras_zip(tmp_path, config, keras_version="3.12.0rc1junk") + result = scanner.scan(str(model_path)) + + risk_checks = [ + check for check in result.checks if check.name == "StringLookup External Vocabulary Risk (Version Unknown)" + ] + assert len(risk_checks) == 1 + assert risk_checks[0].status == CheckStatus.FAILED + assert risk_checks[0].severity == IssueSeverity.WARNING + assert risk_checks[0].details["keras_version"] == "3.12.0rc1junk" + assert "non-canonical" in risk_checks[0].message + assert result.metadata.get("scan_outcome") != INCONCLUSIVE_SCAN_OUTCOME + + audit_result = scan_model_directory_or_file(str(model_path)) + assert determine_exit_code(audit_result) == 1 + + def test_stringlookup_noncanonical_pre_fix_version_keeps_cve_attribution(self, tmp_path: Path) -> None: + """A malformed suffix cannot erase a release tuple that is definitely below the fix.""" + scanner = KerasZipScanner() + config = { + "class_name": "Sequential", + "config": { + "layers": [ + { + "class_name": "StringLookup", + "name": "string_lookup", + "config": {"vocabulary": "vocab.txt"}, + }, + ], + }, + } + + model_path = create_configured_keras_zip(tmp_path, config, keras_version="3.11.3rc1junk") + result = scanner.scan(str(model_path)) + + cve_checks = [check for check in result.checks if check.name.startswith("CVE-2025-12058:")] + assert len(cve_checks) == 1 + assert cve_checks[0].status == CheckStatus.FAILED + assert cve_checks[0].severity == IssueSeverity.WARNING + assert cve_checks[0].details["keras_version"] == "3.11.3rc1junk" + assert result.metadata.get("scan_outcome") != INCONCLUSIVE_SCAN_OUTCOME + + audit_result = scan_model_directory_or_file(str(model_path)) + assert determine_exit_code(audit_result) == 1 + def test_custom_registered_objects(self, tmp_path: Path) -> None: """Test detection of custom registered objects.""" scanner = KerasZipScanner()