diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e4541c8a..5fe943a28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Bug Fixes +- treat Keras Lambda CVE fixed-boundary prereleases as vulnerable - scan ONNX external data references in sparse initializers, tensor-valued attributes, and function defaults - avoid false-positive process-launch findings for parsed framed Python string literals - detect dangerous Python calls retrieved through module namespace dictionaries in ZIP and TAR members diff --git a/modelaudit/scanners/keras_zip_scanner.py b/modelaudit/scanners/keras_zip_scanner.py index e25df012c..84d085e07 100644 --- a/modelaudit/scanners/keras_zip_scanner.py +++ b/modelaudit/scanners/keras_zip_scanner.py @@ -118,6 +118,26 @@ def _has_get_file_reference(values: list[str]) -> bool: _KERAS_WEIGHTS_ENTRY = "model.weights.h5" _KERAS_RELEASE_VERSION_PATTERN = re.compile(r"^\s*(\d+)\.(\d+)(?:\.(\d+))?([A-Za-z0-9.+_-]*)\s*$") _KERAS_PRERELEASE_SUFFIX_PATTERN = re.compile(r"(?i)^(?:a|alpha|b|beta|c|rc|pre|preview|dev)") +_KERAS_LOCAL_VERSION_SUFFIX = r"\+[a-z0-9]+(?:[._-][a-z0-9]+)*" +_KERAS_FIXED_BOUNDARY_PRERELEASE_SUFFIX_PATTERN = re.compile( + rf"(?i)^[._-]?(?:(?:a|alpha|b|beta|c|rc|pre|preview)\d*" + rf"(?:[._-]?(?:post|rev|r)\d*)?(?:[._-]?dev\d*)?|dev\d*)(?:{_KERAS_LOCAL_VERSION_SUFFIX})?$" +) +_KERAS_FIXED_BOUNDARY_POST_OR_LOCAL_SUFFIX_PATTERN = re.compile( + rf"(?i)^(?:{_KERAS_LOCAL_VERSION_SUFFIX}|" + rf"[._-]?(?:post|rev|r)\d*(?:[._-]?dev\d*)?(?:{_KERAS_LOCAL_VERSION_SUFFIX})?)$" +) + + +def _classify_keras_fixed_boundary_suffix(suffix: str) -> bool | None: + """Return whether a fixed-boundary suffix is a prerelease, or None when non-canonical.""" + if not suffix: + return False + if _KERAS_FIXED_BOUNDARY_PRERELEASE_SUFFIX_PATTERN.fullmatch(suffix): + return True + if _KERAS_FIXED_BOUNDARY_POST_OR_LOCAL_SUFFIX_PATTERN.fullmatch(suffix): + return False + return None def _redact_url_for_display(url: str) -> str: @@ -802,7 +822,10 @@ def _scan_model_config(self, model_config: dict[str, Any], result: ScanResult) - if is_lambda_layer: self._check_lambda_layer(layer, result, layer_name) keras_version = result.metadata.get("keras_version") - if isinstance(keras_version, str) and self._is_vulnerable_to_cve_2024_3660(keras_version): + vulnerability_status = ( + self._is_vulnerable_to_cve_2024_3660(keras_version) if isinstance(keras_version, str) else None + ) + if vulnerability_status is True: # CVE-2024-3660: Lambda layers enable arbitrary code injection result.add_check( name="CVE-2024-3660: Lambda Layer Code Injection", @@ -825,7 +848,7 @@ def _scan_model_config(self, model_config: dict[str, Any], result: ScanResult) - }, why=get_cve_2024_3660_explanation("lambda_code_injection"), ) - elif isinstance(keras_version, str): + elif vulnerability_status is False: result.add_check( name="Lambda Version Risk Check", passed=True, @@ -837,21 +860,33 @@ def _scan_model_config(self, model_config: dict[str, Any], result: ScanResult) - details={"layer_name": layer_name, "layer_class": "Lambda", "keras_version": keras_version}, ) else: + version_context = ( + f"keras_version '{keras_version}' is non-canonical" + if isinstance(keras_version, str) + else "keras_version is unavailable" + ) result.add_check( name="Lambda Risk (Version Unknown)", passed=False, message=( - f"Lambda layer '{layer_name}' detected but keras_version is unavailable; " - "cannot confidently attribute CVE-2024-3660 without version context" + f"Lambda layer '{layer_name}' detected but {version_context}; " + "cannot confidently attribute CVE-2024-3660 without reliable version context" ), severity=IssueSeverity.WARNING, location=f"{self.current_file_path} (layer: {layer_name})", details={ "layer_name": layer_name, "layer_class": "Lambda", + "keras_version": keras_version, + "version_parse_status": "unknown", "cve_id": "CVE-2024-3660", + "cvss": 9.8, + "cwe": "CWE-94", + "description": "Lambda layer deserialization can enable arbitrary code injection.", + "remediation": "Remove Lambda layers or upgrade Keras to >= 2.13", "affected_versions": "Keras < 2.13.0", }, + why=get_cve_2024_3660_explanation("lambda_code_injection"), ) elif layer_class in self.suspicious_layer_types: result.add_check( @@ -1968,25 +2003,26 @@ def _check_lambda_layer(self, layer: dict[str, Any], result: ScanResult, layer_n ) @staticmethod - def _is_vulnerable_to_cve_2024_3660(version: str) -> bool: - """Return True for Keras versions lower than 2.13.0. + def _is_vulnerable_to_cve_2024_3660(version: str) -> bool | None: + """Return True for Keras versions lower than 2.13.0, including prereleases of 2.13.0.""" + version_match = _KERAS_RELEASE_VERSION_PATTERN.match(version) + if not version_match: + return None - Handles two-part versions (e.g. "2.10") by treating missing patch as 0. - """ - parts = version.split(".", 2) - if len(parts) < 2: - return False try: - major = int(parts[0]) - minor = int(parts[1]) - patch = 0 - if len(parts) == 3: - patch_digits = "".join(ch for ch in parts[2] if ch.isdigit()) - if patch_digits: - patch = int(patch_digits) - return (major, minor, patch) < (2, 13, 0) + major = int(version_match.group(1)) + minor = int(version_match.group(2)) + patch = int(version_match.group(3) or 0) + suffix = (version_match.group(4) or "").strip().lower() + + parsed = (major, minor, patch) + if parsed < (2, 13, 0): + return True + if parsed > (2, 13, 0): + return False + return _classify_keras_fixed_boundary_suffix(suffix) except ValueError: - return False + return None @staticmethod def _is_vulnerable_to_cve_2025_12058(version: str) -> bool: diff --git a/tests/scanners/test_keras_zip_scanner.py b/tests/scanners/test_keras_zip_scanner.py index d7059bb3b..f6958f4cf 100644 --- a/tests/scanners/test_keras_zip_scanner.py +++ b/tests/scanners/test_keras_zip_scanner.py @@ -3550,6 +3550,117 @@ def test_no_cve_for_fixed_keras_version(self, tmp_path: Path) -> None: cve_issues = [i for i in result.issues if "CVE-2024-3660" in i.message] assert len(cve_issues) == 0 + @pytest.mark.parametrize( + "prerelease_version", + ["2.13.0a0", "2.13.0rc1", "2.13.0.dev0", "2.13.0_c1", "2.13.0-rc1", "2.13.0rc1+local"], + ) + def test_lambda_cve_for_fixed_boundary_prereleases(self, tmp_path: Path, prerelease_version: str) -> None: + """Prereleases of the fixed 2.13.0 boundary should remain CVE-attributed.""" + scanner = KerasZipScanner() + encoded = base64.b64encode(b"lambda x: x * 2").decode() + config = { + "class_name": "Sequential", + "config": { + "layers": [ + { + "class_name": "Lambda", + "name": "my_lambda", + "config": {"function": [encoded, None, None]}, + } + ] + }, + } + + result = scanner.scan(self._make_keras_zip(config, tmp_path, keras_version=prerelease_version)) + + cve_issues = [issue for issue in result.issues if issue.details.get("cve_id") == "CVE-2024-3660"] + assert len(cve_issues) == 1 + assert cve_issues[0].severity == IssueSeverity.CRITICAL + assert cve_issues[0].details["keras_version"] == prerelease_version + + @pytest.mark.parametrize("fixed_version", ["2.13.0", "2.13.0+local", "2.13.0.post1", "2.13.0_post1"]) + def test_lambda_no_cve_for_fixed_boundary_final_local_or_post( + self, + tmp_path: Path, + fixed_version: str, + ) -> None: + """Final, local, and post fixed-boundary versions should not get CVE attribution.""" + scanner = KerasZipScanner() + encoded = base64.b64encode(b"lambda x: x * 2").decode() + config = { + "class_name": "Sequential", + "config": { + "layers": [ + { + "class_name": "Lambda", + "name": "my_lambda", + "config": {"function": [encoded, None, None]}, + } + ] + }, + } + + result = scanner.scan(self._make_keras_zip(config, tmp_path, keras_version=fixed_version)) + + cve_issues = [issue for issue in result.issues if issue.details.get("cve_id") == "CVE-2024-3660"] + assert cve_issues == [] + version_checks = [check for check in result.checks if check.name == "Lambda Version Risk Check"] + assert len(version_checks) == 1 + assert version_checks[0].status == CheckStatus.PASSED + + @pytest.mark.parametrize("vulnerable_version", ["2.12.0_rc1", "2.12.0_post1"]) + def test_lambda_cve_for_underscore_vulnerable_version(self, tmp_path: Path, vulnerable_version: str) -> None: + """Underscore-separated metadata below the fixed boundary must remain vulnerable.""" + scanner = KerasZipScanner() + encoded = base64.b64encode(b"lambda x: x * 2").decode() + config = { + "class_name": "Sequential", + "config": { + "layers": [ + { + "class_name": "Lambda", + "name": "my_lambda", + "config": {"function": [encoded, None, None]}, + } + ] + }, + } + + result = scanner.scan(self._make_keras_zip(config, tmp_path, keras_version=vulnerable_version)) + + cve_issues = [issue for issue in result.issues if issue.details.get("cve_id") == "CVE-2024-3660"] + assert len(cve_issues) == 1 + assert cve_issues[0].severity == IssueSeverity.CRITICAL + assert cve_issues[0].details["keras_version"] == vulnerable_version + + @pytest.mark.parametrize("noncanonical_version", ["2.13.0rc1junk", "2.13.0preflight", "not-a-version"]) + def test_lambda_noncanonical_version_warns_unknown(self, tmp_path: Path, noncanonical_version: str) -> None: + """Malformed fixed-boundary suffixes must not produce CVE or clean-version false positives.""" + scanner = KerasZipScanner() + encoded = base64.b64encode(b"lambda x: x * 2").decode() + config = { + "class_name": "Sequential", + "config": { + "layers": [ + { + "class_name": "Lambda", + "name": "my_lambda", + "config": {"function": [encoded, None, None]}, + } + ] + }, + } + + result = scanner.scan(self._make_keras_zip(config, tmp_path, keras_version=noncanonical_version)) + + cve_issues = [issue for issue in result.issues if issue.details.get("cve_id") == "CVE-2024-3660"] + assert len(cve_issues) == 1 + assert cve_issues[0].severity == IssueSeverity.WARNING + assert cve_issues[0].details["keras_version"] == noncanonical_version + assert cve_issues[0].details["version_parse_status"] == "unknown" + assert "non-canonical" in cve_issues[0].message + assert all(check.name != "Lambda Version Risk Check" for check in result.checks) + def test_cve_for_two_part_keras_version(self, tmp_path: Path) -> None: """Lambda in Keras 2.10 (two-part version) should be CVE-attributed.""" scanner = KerasZipScanner()