From a15fcd864c855b954ea120f929c2b60303c9759d Mon Sep 17 00:00:00 2001 From: Michael D'Angelo Date: Sun, 31 May 2026 00:35:02 -0400 Subject: [PATCH 1/2] fix: fail closed on StringLookup external vocab metadata --- CHANGELOG.md | 1 + modelaudit/scanners/keras_zip_scanner.py | 11 +++++ tests/scanners/test_keras_zip_scanner.py | 57 +++++++++++++++++++++++- 3 files changed, 67 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e4541c8a..5b19710c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - redact compound credential names and malformed userinfo URLs in scanner evidence - restrict JFrog credential forwarding to explicitly trusted HTTPS hosts - block weight distribution `torch.load` on PyTorch prerelease and unknown versions before deserialization +- fail closed when fixed-version Keras ZIP metadata cannot verify external `StringLookup` vocabulary paths - treat prereleases of fixed Keras ZIP CVE-2026-1669 versions as vulnerable - detect external references in weights-only Keras HDF5 layouts without Keras metadata - restrict JFrog credential forwarding to explicitly trusted HTTPS hosts diff --git a/modelaudit/scanners/keras_zip_scanner.py b/modelaudit/scanners/keras_zip_scanner.py index e25df012c..7ed627ef8 100644 --- a/modelaudit/scanners/keras_zip_scanner.py +++ b/modelaudit/scanners/keras_zip_scanner.py @@ -116,6 +116,9 @@ def _has_get_file_reference(values: list[str]) -> bool: _KERAS_METADATA_ENTRY = "metadata.json" _KERAS_METADATA_MAX_BYTES = 10 * 1024 * 1024 _KERAS_WEIGHTS_ENTRY = "model.weights.h5" +_KERAS_STRINGLOOKUP_EXTERNAL_VOCABULARY_INCONCLUSIVE_REASON = ( + "keras_zip_stringlookup_external_vocabulary_metadata_inconclusive" +) _KERAS_RELEASE_VERSION_PATTERN = re.compile(r"^\s*(\d+)\.(\d+)(?:\.(\d+))?([A-Za-z0-9.+_-]*)\s*$") _KERAS_PRERELEASE_SUFFIX_PATTERN = re.compile(r"(?i)^(?:a|alpha|b|beta|c|rc|pre|preview|dev)") @@ -1456,6 +1459,14 @@ def _check_stringlookup_vocabulary_path(self, layer: dict[str, Any], result: Sca if isinstance(keras_version, str): details["keras_version"] = keras_version + details["metadata_only_assessment"] = True + details["parse_status"] = "metadata_non_vulnerable" + details["analysis_incomplete"] = True + details["scan_outcome_reason"] = _KERAS_STRINGLOOKUP_EXTERNAL_VOCABULARY_INCONCLUSIVE_REASON + self._mark_inconclusive_scan_result( + result, + _KERAS_STRINGLOOKUP_EXTERNAL_VOCABULARY_INCONCLUSIVE_REASON, + ) result.add_check( name="StringLookup External Vocabulary Metadata Check", passed=False, diff --git a/tests/scanners/test_keras_zip_scanner.py b/tests/scanners/test_keras_zip_scanner.py index d7059bb3b..1ce40e2a2 100644 --- a/tests/scanners/test_keras_zip_scanner.py +++ b/tests/scanners/test_keras_zip_scanner.py @@ -1523,9 +1523,12 @@ def test_stringlookup_inline_vocabulary_list_stays_clean(self, tmp_path: Path) - issue for issue in result.issues if issue.severity in (IssueSeverity.WARNING, IssueSeverity.CRITICAL) ] assert noisy_issues == [] + audit_result = scan_model_directory_or_file(str(model_path)) + assert determine_exit_code(audit_result) == 0 + assert not audit_result.issues - def test_stringlookup_external_vocabulary_path_is_passing_on_fixed_keras(self, tmp_path: Path) -> None: - """Fixed-version metadata from the archive is inconclusive, but should not emit warning noise.""" + def test_stringlookup_external_vocabulary_path_is_inconclusive_on_fixed_keras(self, tmp_path: Path) -> None: + """Fixed-version archive metadata is inconclusive and must fail closed operationally.""" scanner = KerasZipScanner() external_vocab_path = tmp_path / "vocab.txt" external_vocab_path.write_text("token\n", encoding="utf-8") @@ -1544,15 +1547,65 @@ def test_stringlookup_external_vocabulary_path_is_passing_on_fixed_keras(self, t model_path = create_configured_keras_zip(tmp_path, config, keras_version="3.12.0") result = scanner.scan(str(model_path)) + reason = keras_zip_scanner_module._KERAS_STRINGLOOKUP_EXTERNAL_VOCABULARY_INCONCLUSIVE_REASON cve_checks = [check for check in result.checks if check.details.get("cve_id") == "CVE-2025-12058"] assert len(cve_checks) == 1 assert cve_checks[0].name == "StringLookup External Vocabulary Metadata Check" assert cve_checks[0].status == CheckStatus.FAILED assert cve_checks[0].severity == IssueSeverity.INFO + assert cve_checks[0].details["analysis_incomplete"] is True + assert cve_checks[0].details["scan_outcome_reason"] == reason assert "metadata-only assessment is inconclusive" in cve_checks[0].message + assert result.success is False + assert result.metadata["scan_outcome"] == INCONCLUSIVE_SCAN_OUTCOME + assert reason in result.metadata["scan_outcome_reasons"] assert not any(issue.severity in (IssueSeverity.WARNING, IssueSeverity.CRITICAL) for issue in result.issues) + audit_result = scan_model_directory_or_file(str(model_path)) + metadata = audit_result.file_metadata[str(model_path)] + assert determine_exit_code(audit_result) == 2 + assert metadata.get("scan_outcome") == INCONCLUSIVE_SCAN_OUTCOME + assert reason in metadata.get("scan_outcome_reasons") + assert not any( + issue.severity in (IssueSeverity.WARNING, IssueSeverity.CRITICAL) for issue in audit_result.issues + ) + + def test_stringlookup_external_vocabulary_path_unknown_version_is_warning_exit1(self, tmp_path: Path) -> None: + """Missing Keras version context should remain a warning-level security decision.""" + scanner = KerasZipScanner() + external_vocab_path = tmp_path / "vocab.txt" + external_vocab_path.write_text("token\n", encoding="utf-8") + config = { + "class_name": "Sequential", + "config": { + "layers": [ + { + "class_name": "StringLookup", + "name": "string_lookup", + "config": {"vocabulary": str(external_vocab_path)}, + }, + ], + }, + } + model_path = tmp_path / "model.keras" + with zipfile.ZipFile(model_path, "w") as zf: + zf.writestr("config.json", json.dumps(config)) + + result = scanner.scan(str(model_path)) + + risk_checks = [ + check for check in result.checks if check.name == "StringLookup External Vocabulary Risk (Version Unknown)" + ] + assert len(risk_checks) == 1 + assert risk_checks[0].status == CheckStatus.FAILED + assert risk_checks[0].severity == IssueSeverity.WARNING + assert result.has_warnings is True + assert result.metadata.get("scan_outcome") != INCONCLUSIVE_SCAN_OUTCOME + + audit_result = scan_model_directory_or_file(str(model_path)) + assert determine_exit_code(audit_result) == 1 + def test_stringlookup_windows_home_relative_path_is_detected(self, tmp_path: Path) -> None: """Windows-style home-relative vocabulary paths should be normalized and detected.""" scanner = KerasZipScanner() From 6b521ef08c60b9225059a6556375da66640b904a Mon Sep 17 00:00:00 2001 From: Michael D'Angelo Date: Sun, 31 May 2026 13:01:17 +0000 Subject: [PATCH 2/2] fix: harden keras stringlookup metadata handling --- CHANGELOG.md | 2 +- modelaudit/scanners/keras_zip_scanner.py | 96 ++++++++++++++---------- tests/scanners/test_keras_zip_scanner.py | 86 ++++++++++++++++++++- 3 files changed, 144 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b19710c5..c726a2806 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,7 +39,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - redact compound credential names and malformed userinfo URLs in scanner evidence - restrict JFrog credential forwarding to explicitly trusted HTTPS hosts - block weight distribution `torch.load` on PyTorch prerelease and unknown versions before deserialization -- fail closed when fixed-version Keras ZIP metadata cannot verify external `StringLookup` vocabulary paths +- fail closed on unverified Keras ZIP `StringLookup` vocabulary paths and redact remote URL evidence - treat prereleases of fixed Keras ZIP CVE-2026-1669 versions as vulnerable - detect external references in weights-only Keras HDF5 layouts without Keras metadata - restrict JFrog credential forwarding to explicitly trusted HTTPS hosts diff --git a/modelaudit/scanners/keras_zip_scanner.py b/modelaudit/scanners/keras_zip_scanner.py index 7ed627ef8..9e2f34071 100644 --- a/modelaudit/scanners/keras_zip_scanner.py +++ b/modelaudit/scanners/keras_zip_scanner.py @@ -94,7 +94,6 @@ re.IGNORECASE, ) _URL_SCHEME_PATTERN = re.compile(r"^[a-zA-Z][a-zA-Z0-9+.-]*://") -_WINDOWS_ABSOLUTE_PATH_PATTERN = re.compile(r"^(?:[a-zA-Z]:[\\/]|\\\\)") _KERAS_CONFIG_ENTRY = "config.json" _KERAS_CONFIG_MAX_BYTES = 10 * 1024 * 1024 @@ -121,6 +120,14 @@ def _has_get_file_reference(values: list[str]) -> bool: ) _KERAS_RELEASE_VERSION_PATTERN = re.compile(r"^\s*(\d+)\.(\d+)(?:\.(\d+))?([A-Za-z0-9.+_-]*)\s*$") _KERAS_PRERELEASE_SUFFIX_PATTERN = re.compile(r"(?i)^(?:a|alpha|b|beta|c|rc|pre|preview|dev)") +_KERAS_LOCAL_SUFFIX = r"\+[a-z0-9]+(?:[._-][a-z0-9]+)*" +_KERAS_FIXED_BOUNDARY_PRERELEASE_SUFFIX_PATTERN = re.compile( + rf"(?i)^[._-]?(?:(?:a|alpha|b|beta|c|rc|pre|preview)\d*" + rf"(?:[._-]?(?:post|rev|r)\d*)?(?:[._-]?dev\d*)?|dev\d*)(?:{_KERAS_LOCAL_SUFFIX})?$" +) +_KERAS_FIXED_BOUNDARY_POST_OR_LOCAL_SUFFIX_PATTERN = re.compile( + rf"(?i)^(?:{_KERAS_LOCAL_SUFFIX}|[._-]?(?:post|rev|r)\d*(?:[._-]?dev\d*)?(?:{_KERAS_LOCAL_SUFFIX})?)$" +) def _redact_url_for_display(url: str) -> str: @@ -141,6 +148,17 @@ def _redact_url_for_display(url: str) -> str: return urlunsplit((parsed.scheme, netloc, parsed.path, "", "")) +def _classify_keras_fixed_boundary_suffix(suffix: str) -> bool | None: + """Return whether a fixed-boundary suffix is a prerelease, or None when malformed.""" + if not suffix: + return False + if _KERAS_FIXED_BOUNDARY_PRERELEASE_SUFFIX_PATTERN.fullmatch(suffix): + return True + if _KERAS_FIXED_BOUNDARY_POST_OR_LOCAL_SUFFIX_PATTERN.fullmatch(suffix): + return False + return None + + try: import h5py @@ -1421,15 +1439,18 @@ def _check_stringlookup_vocabulary_path(self, layer: dict[str, Any], result: Sca return vocabulary = layer_config.get("vocabulary") - if not self._is_external_stringlookup_vocabulary(vocabulary): + if not isinstance(vocabulary, str) or not self._is_external_stringlookup_vocabulary(vocabulary): return keras_version = result.metadata.get("keras_version") + display_vocabulary = ( + _redact_url_for_display(vocabulary) if _URL_SCHEME_PATTERN.match(vocabulary.strip()) else vocabulary + ) location = f"{self.current_file_path} (layer: {layer_name})" details = { "layer_name": layer_name, "layer_class": "StringLookup", - "vocabulary": vocabulary, + "vocabulary": display_vocabulary, "cve_id": "CVE-2025-12058", "cvss": 5.9, "cwe": "CWE-502, CWE-918", @@ -1438,16 +1459,20 @@ def _check_stringlookup_vocabulary_path(self, layer: dict[str, Any], result: Sca ".keras archive is loaded." ), "remediation": "Upgrade Keras to >= 3.12.0 and avoid loading models with external vocabulary paths.", + "affected_versions": "Keras < 3.12.0", } + vulnerability_status = ( + self._is_vulnerable_to_cve_2025_12058(keras_version) if isinstance(keras_version, str) else None + ) - if isinstance(keras_version, str) and self._is_vulnerable_to_cve_2025_12058(keras_version): + if vulnerability_status is True: details["keras_version"] = keras_version result.add_check( name="CVE-2025-12058: StringLookup External Vocabulary Path", passed=False, message=( f"CVE-2025-12058: StringLookup layer '{layer_name}' in Keras {keras_version} references " - f"external vocabulary path '{vocabulary}', which can expose local files or trigger SSRF " + f"external vocabulary path '{display_vocabulary}', which can expose local files or trigger SSRF " "during model loading" ), severity=IssueSeverity.WARNING, @@ -1457,7 +1482,7 @@ def _check_stringlookup_vocabulary_path(self, layer: dict[str, Any], result: Sca ) return - if isinstance(keras_version, str): + if vulnerability_status is False: details["keras_version"] = keras_version details["metadata_only_assessment"] = True details["parse_status"] = "metadata_non_vulnerable" @@ -1471,7 +1496,7 @@ def _check_stringlookup_vocabulary_path(self, layer: dict[str, Any], result: Sca name="StringLookup External Vocabulary Metadata Check", passed=False, message=( - f"StringLookup layer '{layer_name}' references external vocabulary path '{vocabulary}', " + f"StringLookup layer '{layer_name}' references external vocabulary path '{display_vocabulary}', " f"and archive metadata reports Keras {keras_version} outside the known CVE-2025-12058 " "vulnerable range (<3.12.0), but metadata-only assessment is inconclusive without runtime " "verification" @@ -1479,40 +1504,32 @@ def _check_stringlookup_vocabulary_path(self, layer: dict[str, Any], result: Sca severity=IssueSeverity.INFO, location=location, details=details, + why=get_cve_2025_12058_explanation("stringlookup_external_vocabulary"), ) return + if isinstance(keras_version, str): + details["keras_version"] = keras_version + version_context = f"keras_version '{keras_version}' is non-canonical" + else: + version_context = "keras_version is unavailable" result.add_check( name="StringLookup External Vocabulary Risk (Version Unknown)", passed=False, message=( - f"StringLookup layer '{layer_name}' references external vocabulary path '{vocabulary}', but " - "keras_version is unavailable; cannot confidently attribute CVE-2025-12058 without version context" + f"StringLookup layer '{layer_name}' references external vocabulary path '{display_vocabulary}', but " + f"{version_context}; cannot confidently attribute CVE-2025-12058 without version context" ), severity=IssueSeverity.WARNING, location=location, - details=details | {"affected_versions": "Keras < 3.12.0"}, + details=details, + why=get_cve_2025_12058_explanation("stringlookup_external_vocabulary"), ) @staticmethod def _is_external_stringlookup_vocabulary(vocabulary: Any) -> bool: - """Return True only for scalar vocabulary strings that clearly point outside the archive.""" - if not isinstance(vocabulary, str): - return False - - candidate = vocabulary.strip() - if not candidate: - return False - - normalized = candidate.replace("\\", "/") - return ( - bool(_URL_SCHEME_PATTERN.match(candidate)) - or candidate.startswith("/") - or normalized.startswith("~/") - or bool(_WINDOWS_ABSOLUTE_PATH_PATTERN.match(candidate)) - or normalized.startswith("../") - or "/../" in normalized - ) + """Return whether Keras interprets the vocabulary value as an external path.""" + return isinstance(vocabulary, str) and bool(vocabulary.strip()) def _check_embedded_hdf5_weights_external_references(self, archive: zipfile.ZipFile, result: ScanResult) -> None: """Detect CVE-2026-1669 external HDF5 references inside embedded .keras weights.""" @@ -2000,27 +2017,30 @@ def _is_vulnerable_to_cve_2024_3660(version: str) -> bool: return False @staticmethod - def _is_vulnerable_to_cve_2025_12058(version: str) -> bool: - """Return True for Keras versions lower than 3.12.0, including prereleases of 3.12.0.""" - version_match = re.match(r"^(\d+)\.(\d+)(?:\.(\d+))?([A-Za-z0-9.+-]*)$", version.strip()) + def _is_vulnerable_to_cve_2025_12058(version: str) -> bool | None: + """Classify Keras versions lower than 3.12.0, including fixed-boundary prereleases.""" + version_match = _KERAS_RELEASE_VERSION_PATTERN.match(version) if not version_match: - return False + return None try: major = int(version_match.group(1)) minor = int(version_match.group(2)) patch = int(version_match.group(3) or 0) suffix = (version_match.group(4) or "").strip().lower() + except ValueError: + return None - parsed = (major, minor, patch) - if parsed < (3, 12, 0): - return True - if parsed > (3, 12, 0): - return False + is_prerelease = _classify_keras_fixed_boundary_suffix(suffix) + if is_prerelease is None: + return None - return bool(re.search(r"(?:^|[.\-])(dev|rc|a|b|alpha|beta|pre|preview)\d*", suffix)) - except ValueError: + parsed = (major, minor, patch) + if parsed < (3, 12, 0): + return True + if parsed > (3, 12, 0): return False + return is_prerelease @staticmethod def _is_vulnerable_to_cve_2026_1669(version: str) -> bool: diff --git a/tests/scanners/test_keras_zip_scanner.py b/tests/scanners/test_keras_zip_scanner.py index 1ce40e2a2..76cd12deb 100644 --- a/tests/scanners/test_keras_zip_scanner.py +++ b/tests/scanners/test_keras_zip_scanner.py @@ -1499,6 +1499,58 @@ def test_stringlookup_remote_vocabulary_url_triggers_cve_2025_12058(self, tmp_pa assert any(check.details.get("cve_id") == "CVE-2025-12058" for check in result.checks) + def test_stringlookup_remote_vocabulary_url_redacts_credentials(self, tmp_path: Path) -> None: + """Remote vocabulary findings should not preserve credentials or query secrets.""" + scanner = KerasZipScanner() + config = { + "class_name": "Sequential", + "config": { + "layers": [ + { + "class_name": "StringLookup", + "name": "string_lookup", + "config": {"vocabulary": "https://user:secret@example.com/vocab.txt?token=sensitive#fragment"}, + }, + ], + }, + } + + model_path = create_configured_keras_zip(tmp_path, config, keras_version="3.11.3") + result = scanner.scan(str(model_path)) + + cve_checks = [check for check in result.checks if check.details.get("cve_id") == "CVE-2025-12058"] + assert len(cve_checks) == 1 + assert cve_checks[0].details["vocabulary"] == "https://example.com/vocab.txt" + serialized_result = json.dumps({"details": cve_checks[0].details, "message": cve_checks[0].message}) + assert "user" not in serialized_result + assert "secret" not in serialized_result + assert "sensitive" not in serialized_result + assert "fragment" not in serialized_result + + def test_stringlookup_relative_vocabulary_path_triggers_cve_2025_12058(self, tmp_path: Path) -> None: + """Scalar relative vocabulary paths should be treated as external files.""" + scanner = KerasZipScanner() + config = { + "class_name": "Sequential", + "config": { + "layers": [ + { + "class_name": "StringLookup", + "name": "string_lookup", + "config": {"vocabulary": "vocab.txt"}, + }, + ], + }, + } + + model_path = create_configured_keras_zip(tmp_path, config, keras_version="3.11.3") + result = scanner.scan(str(model_path)) + + assert any( + check.details.get("cve_id") == "CVE-2025-12058" and check.status == CheckStatus.FAILED + for check in result.checks + ) + def test_stringlookup_inline_vocabulary_list_stays_clean(self, tmp_path: Path) -> None: """Inline StringLookup vocabularies are benign and should not emit warnings.""" scanner = KerasZipScanner() @@ -1648,7 +1700,7 @@ def test_stringlookup_prerelease_versions_treated_as_vulnerable(self, tmp_path: }, } - for keras_version in ("3.12.0a0", "3.12.0rc1", "3.12.0.dev0"): + for keras_version in ("3.12.0a0", "3.12.0rc1", "3.12.0_c1", "3.12.0.dev0"): model_path = create_configured_keras_zip(tmp_path, config, keras_version=keras_version) result = scanner.scan(str(model_path)) @@ -1657,6 +1709,38 @@ def test_stringlookup_prerelease_versions_treated_as_vulnerable(self, tmp_path: assert cve_checks[0].status == CheckStatus.FAILED assert cve_checks[0].severity == IssueSeverity.WARNING + def test_stringlookup_noncanonical_version_is_warning_exit1(self, tmp_path: Path) -> None: + """Malformed Keras metadata should not be treated as a verified fixed version.""" + scanner = KerasZipScanner() + config = { + "class_name": "Sequential", + "config": { + "layers": [ + { + "class_name": "StringLookup", + "name": "string_lookup", + "config": {"vocabulary": "vocab.txt"}, + }, + ], + }, + } + + model_path = create_configured_keras_zip(tmp_path, config, keras_version="3.12.0rc1junk") + result = scanner.scan(str(model_path)) + + risk_checks = [ + check for check in result.checks if check.name == "StringLookup External Vocabulary Risk (Version Unknown)" + ] + assert len(risk_checks) == 1 + assert risk_checks[0].status == CheckStatus.FAILED + assert risk_checks[0].severity == IssueSeverity.WARNING + assert risk_checks[0].details["keras_version"] == "3.12.0rc1junk" + assert "non-canonical" in risk_checks[0].message + assert result.metadata.get("scan_outcome") != INCONCLUSIVE_SCAN_OUTCOME + + audit_result = scan_model_directory_or_file(str(model_path)) + assert determine_exit_code(audit_result) == 1 + def test_custom_registered_objects(self, tmp_path: Path) -> None: """Test detection of custom registered objects.""" scanner = KerasZipScanner()