diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e4541c8a..65e04e69d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Bug Fixes +- resolve simple NeMo Hydra `_target_` interpolations before callable classification and fail closed on unresolved dynamic selectors - scan ONNX external data references in sparse initializers, tensor-valued attributes, and function defaults - avoid false-positive process-launch findings for parsed framed Python string literals - detect dangerous Python calls retrieved through module namespace dictionaries in ZIP and TAR members diff --git a/modelaudit/scanners/nemo_scanner.py b/modelaudit/scanners/nemo_scanner.py index a10fcbba1..9f305d685 100644 --- a/modelaudit/scanners/nemo_scanner.py +++ b/modelaudit/scanners/nemo_scanner.py @@ -160,6 +160,11 @@ "vars", ) _TARGET_TOKEN_RE = re.compile(r"__import__|[A-Z]+(?=[A-Z][a-z0-9]|[0-9_]|$)|[A-Z]?[a-z0-9]+") +_HYDRA_INTERPOLATION_RE = re.compile(r"\$\{([^{}]+)\}") +_HYDRA_PATH_TOKEN_RE = re.compile(r"[^\.\[\]]+") +_HYDRA_LIST_INDEX_RE = re.compile(r"-?\d+") +_HYDRA_INTERPOLATION_MAX_DEPTH = 8 +_HYDRA_REFERENCE_MISSING = object() CVE_2025_23304_ID = "CVE-2025-23304" CVE_2025_23304_CVSS = 7.6 @@ -213,6 +218,182 @@ def _find_suspicious_safe_prefixed_target_pattern(target: str) -> str | None: return None +def _hydra_interpolation_matches(value: str) -> list[re.Match[str]]: + matches: list[re.Match[str]] = [] + for match in _HYDRA_INTERPOLATION_RE.finditer(value): + preceding_backslashes = 0 + cursor = match.start() - 1 + while cursor >= 0 and value[cursor] == "\\": + preceding_backslashes += 1 + cursor -= 1 + if preceding_backslashes % 2 == 0: + matches.append(match) + return matches + + +def _hydra_path_tokens(path: str) -> list[str] | None: + tokens: list[str] = [] + cursor = 0 + expect_token = True + while cursor < len(path): + if path[cursor] == ".": + if expect_token: + return None + cursor += 1 + expect_token = True + continue + if path[cursor] == "[": + close = path.find("]", cursor + 1) + if close < 0: + return None + token = path[cursor + 1 : close] + if not token or any(character in token for character in ".[]"): + return None + tokens.append(token) + cursor = close + 1 + expect_token = False + continue + if not expect_token: + return None + match = _HYDRA_PATH_TOKEN_RE.match(path, cursor) + if match is None: + return None + tokens.append(match.group(0)) + cursor = match.end() + expect_token = False + return None if expect_token and tokens else tokens + + +def _render_hydra_path(tokens: list[str]) -> str: + return ".".join(tokens) + + +def _hydra_reference_path(container_path: str, reference: str) -> str | None: + reference = reference.strip() + if not reference or ":" in reference: + return None + if not reference.startswith("."): + tokens = _hydra_path_tokens(reference) + return None if not tokens else _render_hydra_path(tokens) + + leading_dots = len(reference) - len(reference.lstrip(".")) + suffix = reference[leading_dots:] + if not suffix: + return None + + base_tokens = _hydra_path_tokens(container_path) + suffix_tokens = _hydra_path_tokens(suffix) + parent_hops = leading_dots - 1 + if base_tokens is None or not suffix_tokens or parent_hops > len(base_tokens): + return None + if parent_hops: + del base_tokens[-parent_hops:] + return _render_hydra_path([*base_tokens, *suffix_tokens]) + + +def _hydra_reference_container_path(path: str) -> str | None: + tokens = _hydra_path_tokens(path) + if not tokens: + return None + return _render_hydra_path(tokens[:-1]) + + +def _lookup_hydra_path(root_config: Any, path: str) -> Any: + tokens = _hydra_path_tokens(path) + if tokens is None: + return _HYDRA_REFERENCE_MISSING + + value = root_config + for token in tokens: + if isinstance(value, list): + if _HYDRA_LIST_INDEX_RE.fullmatch(token) is None: + return _HYDRA_REFERENCE_MISSING + index = int(token) + if not -len(value) <= index < len(value): + return _HYDRA_REFERENCE_MISSING + value = value[index] + continue + if not isinstance(value, dict): + return _HYDRA_REFERENCE_MISSING + if token in value: + value = value[token] + continue + if _HYDRA_LIST_INDEX_RE.fullmatch(token) is None or int(token) not in value: + return _HYDRA_REFERENCE_MISSING + value = value[int(token)] + return value + + +def _resolve_hydra_target_interpolation( + target: str, + root_config: Any, + container_path: str, +) -> tuple[str, tuple[str, ...]]: + """Resolve simple OmegaConf-style references inside a Hydra target.""" + return _resolve_hydra_interpolation_value(target, root_config, container_path, frozenset(), 0) + + +def _resolve_hydra_interpolation_value( + value: str, + root_config: Any, + container_path: str, + resolving_paths: frozenset[str], + depth: int, +) -> tuple[str, tuple[str, ...]]: + matches = _hydra_interpolation_matches(value) + if not matches: + return value, () + if depth >= _HYDRA_INTERPOLATION_MAX_DEPTH: + return value, ("",) + + unresolved_references: list[str] = [] + parts: list[str] = [] + last_end = 0 + for match in matches: + parts.append(value[last_end : match.start()]) + reference = match.group(1).strip() + reference_path = _hydra_reference_path(container_path, reference) + resolved = ( + _HYDRA_REFERENCE_MISSING if reference_path is None else _lookup_hydra_path(root_config, reference_path) + ) + if ( + resolved is _HYDRA_REFERENCE_MISSING + or resolved is None + or isinstance(resolved, dict | list) + or reference_path in resolving_paths + ): + unresolved_references.append(reference) + parts.append(match.group(0)) + elif isinstance(resolved, str): + assert reference_path is not None + referenced_container = _hydra_reference_container_path(reference_path) + if referenced_container is None: + unresolved_references.append(reference) + parts.append(match.group(0)) + else: + nested_value, nested_unresolved = _resolve_hydra_interpolation_value( + resolved, + root_config, + referenced_container, + resolving_paths | {reference_path}, + depth + 1, + ) + unresolved_references.extend(nested_unresolved) + parts.append(nested_value) + else: + parts.append(str(resolved)) + last_end = match.end() + + if unresolved_references: + return value, tuple(unresolved_references) + + parts.append(value[last_end:]) + resolved_value = "".join(parts) + if resolved_value == value: + return value, () + return _resolve_hydra_interpolation_value(resolved_value, root_config, container_path, resolving_paths, depth + 1) + + def _is_runtime_truthy(value: Any) -> bool: """Return whether a Hydra argument would be truthy when passed to Python.""" return bool(value) @@ -2128,8 +2309,12 @@ def _check_hydra_targets( archive_path: str, result: ScanResult, path_prefix: str = "", + root_config: Any | None = None, ) -> None: """Recursively check _target_ values in Hydra config.""" + if root_config is None: + root_config = config + if isinstance(config, list): for index, item in enumerate(config): if isinstance(item, dict | list): @@ -2139,6 +2324,7 @@ def _check_hydra_targets( archive_path, result, f"{path_prefix}[{index}]" if path_prefix else f"[{index}]", + root_config, ) return @@ -2149,9 +2335,18 @@ def _check_hydra_targets( current_path = f"{path_prefix}.{key}" if path_prefix else key if key == "_target_" and isinstance(value, str): - self._evaluate_target(value, current_path, config_name, archive_path, result, config) + self._evaluate_target( + value, + current_path, + config_name, + archive_path, + result, + config, + root_config, + path_prefix, + ) elif isinstance(value, dict | list): - self._check_hydra_targets(value, config_name, archive_path, result, current_path) + self._check_hydra_targets(value, config_name, archive_path, result, current_path, root_config) def _evaluate_target( self, @@ -2161,26 +2356,46 @@ def _evaluate_target( archive_path: str, result: ScanResult, target_config: dict[str, Any] | None = None, + root_config: Any | None = None, + container_path: str = "", ) -> None: """Evaluate a single _target_ value for dangerous patterns.""" + raw_target = target + if root_config is not None and _hydra_interpolation_matches(target): + target, unresolved_references = _resolve_hydra_target_interpolation(target, root_config, container_path) + if unresolved_references: + self._add_unresolved_interpolated_target_check( + raw_target, + unresolved_references, + config_path, + config_name, + archive_path, + result, + ) + return + # Check against known dangerous targets (always flag, even if safe prefix) if target in _DANGEROUS_TARGETS or (target == "numpy.load" and self._numpy_load_allows_pickle(target_config)): + details: dict[str, Any] = { + "target": target, + "config_path": config_path, + "config_file": config_name, + "cve_id": CVE_2025_23304_ID, + "cvss": CVE_2025_23304_CVSS, + "cwe": CVE_2025_23304_CWE, + "description": CVE_2025_23304_DESCRIPTION, + "remediation": CVE_2025_23304_REMEDIATION, + } + if raw_target != target: + details["raw_target"] = raw_target + details["target_resolution"] = "hydra_interpolation" result.add_check( name=f"{CVE_2025_23304_ID}: Dangerous Hydra _target_", passed=False, message=(f"{CVE_2025_23304_ID}: Dangerous _target_ '{target}' at {config_path} in {config_name}"), severity=IssueSeverity.CRITICAL, location=f"{archive_path}:{config_name}", - details={ - "target": target, - "config_path": config_path, - "config_file": config_name, - "cve_id": CVE_2025_23304_ID, - "cvss": CVE_2025_23304_CVSS, - "cwe": CVE_2025_23304_CWE, - "description": CVE_2025_23304_DESCRIPTION, - "remediation": CVE_2025_23304_REMEDIATION, - }, + details=details, why=( f"The _target_ field '{target}' in this NeMo " f"config specifies a dangerous Python callable. " @@ -2202,35 +2417,86 @@ def _evaluate_target( config_name, archive_path, result, + raw_target=raw_target if raw_target != target else None, ) return + details = {"target": target, "config_path": config_path} + if raw_target != target: + details["raw_target"] = raw_target + details["target_resolution"] = "hydra_interpolation" result.add_check( name="Hydra _target_ Safety Check", passed=True, message=(f"Safe _target_ '{target}' at {config_path} in {config_name}"), location=f"{archive_path}:{config_name}", - details={"target": target, "config_path": config_path}, + details=details, ) return # Check for suspicious patterns in target (only for non-safe targets) pattern = _find_suspicious_target_pattern(target) if pattern is not None: - self._add_suspicious_target_check(target, pattern, config_path, config_name, archive_path, result) + self._add_suspicious_target_check( + target, + pattern, + config_path, + config_name, + archive_path, + result, + raw_target=raw_target if raw_target != target else None, + ) return # Unknown target - flag for review + details = { + "target": target, + "config_path": config_path, + "config_file": config_name, + } + if raw_target != target: + details["raw_target"] = raw_target + details["target_resolution"] = "hydra_interpolation" result.add_check( name="Hydra _target_ Review", passed=False, message=(f"Unknown _target_ '{target}' at {config_path} in {config_name} - requires manual review"), severity=IssueSeverity.INFO, location=f"{archive_path}:{config_name}", + details=details, + ) + + def _add_unresolved_interpolated_target_check( + self, + target: str, + unresolved_references: tuple[str, ...], + config_path: str, + config_name: str, + archive_path: str, + result: ScanResult, + ) -> None: + result.add_check( + name=f"{CVE_2025_23304_ID}: Unresolved Hydra _target_ Interpolation", + passed=False, + message=( + f"{CVE_2025_23304_ID}: Unresolved interpolated _target_ '{target}' at {config_path} in {config_name}" + ), + severity=IssueSeverity.CRITICAL, + location=f"{archive_path}:{config_name}", details={ "target": target, + "unresolved_references": list(unresolved_references), "config_path": config_path, "config_file": config_name, + "cve_id": CVE_2025_23304_ID, + "cvss": CVE_2025_23304_CVSS, + "cwe": CVE_2025_23304_CWE, + "description": CVE_2025_23304_DESCRIPTION, + "remediation": CVE_2025_23304_REMEDIATION, }, + why=( + "Hydra and OmegaConf resolve _target_ interpolation before instantiation. " + "ModelAudit could not resolve this dynamic callable selector statically, so it fails closed." + ), ) def _add_suspicious_target_check( @@ -2241,7 +2507,22 @@ def _add_suspicious_target_check( config_name: str, archive_path: str, result: ScanResult, + raw_target: str | None = None, ) -> None: + details: dict[str, Any] = { + "target": target, + "pattern": pattern, + "config_path": config_path, + "config_file": config_name, + "cve_id": CVE_2025_23304_ID, + "cvss": CVE_2025_23304_CVSS, + "cwe": CVE_2025_23304_CWE, + "description": CVE_2025_23304_DESCRIPTION, + "remediation": CVE_2025_23304_REMEDIATION, + } + if raw_target is not None: + details["raw_target"] = raw_target + details["target_resolution"] = "hydra_interpolation" result.add_check( name=f"{CVE_2025_23304_ID}: Suspicious Hydra _target_", passed=False, @@ -2252,17 +2533,7 @@ def _add_suspicious_target_check( ), severity=IssueSeverity.CRITICAL, location=f"{archive_path}:{config_name}", - details={ - "target": target, - "pattern": pattern, - "config_path": config_path, - "config_file": config_name, - "cve_id": CVE_2025_23304_ID, - "cvss": CVE_2025_23304_CVSS, - "cwe": CVE_2025_23304_CWE, - "description": CVE_2025_23304_DESCRIPTION, - "remediation": CVE_2025_23304_REMEDIATION, - }, + details=details, ) @staticmethod diff --git a/tests/scanners/test_nemo_scanner.py b/tests/scanners/test_nemo_scanner.py index 0b0810def..6555953d6 100644 --- a/tests/scanners/test_nemo_scanner.py +++ b/tests/scanners/test_nemo_scanner.py @@ -3453,6 +3453,251 @@ def test_skops_load_target_is_review_only(self, tmp_path: Path) -> None: assert len(review_checks) == 1 assert review_checks[0].severity == IssueSeverity.INFO + @pytest.mark.parametrize( + ("config", "raw_target"), + [ + ( + { + "callable_module": "os", + "callable_leaf": "system", + "model": {"_target_": "${callable_module}.${callable_leaf}", "command": "id"}, + }, + "${callable_module}.${callable_leaf}", + ), + ( + { + "model": { + "callable_module": "os", + "callable_leaf": "system", + "_target_": "${.callable_module}.${.callable_leaf}", + "command": "id", + }, + }, + "${.callable_module}.${.callable_leaf}", + ), + ], + ) + def test_interpolated_target_resolves_before_dangerous_check( + self, + tmp_path: Path, + config: dict[str, Any], + raw_target: str, + ) -> None: + """Hydra interpolation that resolves to a dangerous callable must not fall through to INFO review.""" + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + cve_checks = [ + check + for check in result.checks + if check.name == "CVE-2025-23304: Dangerous Hydra _target_" and check.details.get("target") == "os.system" + ] + assert len(cve_checks) == 1 + assert cve_checks[0].status == CheckStatus.FAILED + assert cve_checks[0].severity == IssueSeverity.CRITICAL + assert cve_checks[0].details["raw_target"] == raw_target + assert cve_checks[0].details["target_resolution"] == "hydra_interpolation" + assert not any( + check.name == "Hydra _target_ Review" and check.details.get("target") == raw_target + for check in result.checks + ) + + @pytest.mark.parametrize("raw_target", ["${targets.0}", "${targets[0]}"]) + def test_interpolated_safe_list_target_remains_safe(self, tmp_path: Path, raw_target: str) -> None: + """OmegaConf list indices support both dot and bracket notation.""" + config = { + "targets": ["nemo.collections.nlp.models.TextClassification"], + "model": {"_target_": raw_target}, + } + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + assert not any(check.name.startswith("CVE-2025-23304") for check in result.checks) + assert any( + check.name == "Hydra _target_ Safety Check" + and check.details.get("target") == "nemo.collections.nlp.models.TextClassification" + for check in result.checks + ) + + def test_interpolated_target_resolves_every_parent_hop(self, tmp_path: Path) -> None: + """Each leading dot after the first must move one level toward the root.""" + config = { + "target": "os.system", + "outer": { + "target": "nemo.Model", + "inner": {"_target_": "${...target}", "command": "id"}, + }, + } + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + assert any( + check.name == "CVE-2025-23304: Dangerous Hydra _target_" + and check.details.get("target") == "os.system" + and check.details.get("raw_target") == "${...target}" + for check in result.checks + ) + assert not any( + check.name == "Hydra _target_ Safety Check" and check.details.get("target") == "nemo.Model" + for check in result.checks + ) + + def test_nested_interpolated_target_uses_referenced_container(self, tmp_path: Path) -> None: + """Nested relative selectors resolve from the referenced node, not the _target_ node.""" + config = { + "targets": {"danger": "os.system", "chosen": "${.danger}"}, + "model": {"danger": "nemo.Model", "_target_": "${targets.chosen}", "command": "id"}, + } + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + assert any( + check.name == "CVE-2025-23304: Dangerous Hydra _target_" + and check.details.get("target") == "os.system" + and check.details.get("raw_target") == "${targets.chosen}" + for check in result.checks + ) + assert not any( + check.name == "Hydra _target_ Safety Check" and check.details.get("target") == "nemo.Model" + for check in result.checks + ) + + def test_nested_interpolated_path_selects_dangerous_target(self, tmp_path: Path) -> None: + """Simple nested bracket selectors must retain dangerous-target classification.""" + config = { + "selected_target": "dangerous", + "targets": {"dangerous": "os.system"}, + "model": {"_target_": "${targets[${selected_target}]}", "command": "id"}, + } + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + assert any( + check.name == "CVE-2025-23304: Dangerous Hydra _target_" + and check.details.get("target") == "os.system" + and check.details.get("raw_target") == "${targets[${selected_target}]}" + for check in result.checks + ) + + def test_escaped_interpolated_target_is_not_resolved(self, tmp_path: Path) -> None: + """An escaped OmegaConf interpolation marker is literal text, not a callable selector.""" + raw_target = r"\${callable}" + config = {"callable": "os.system", "model": {"_target_": raw_target}} + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + assert not any(check.name.startswith("CVE-2025-23304") for check in result.checks) + assert any( + check.name == "Hydra _target_ Review" and check.details.get("target") == raw_target + for check in result.checks + ) + + def test_interpolated_dangerous_target_fails_aggregate_scan(self, tmp_path: Path) -> None: + """The MA-DSS-C191 payload should produce aggregate exit 1 instead of a clean scan.""" + config = { + "callable_module": "os", + "callable_leaf": "system", + "model": {"_target_": "${callable_module}.${callable_leaf}", "command": "id"}, + } + path = _create_nemo_file(tmp_path, config) + + result = scan_model_directory_or_file(str(path), config={"cache_scan_results": False}) + + assert determine_exit_code(result) == 1 + assert any( + issue.severity == IssueSeverity.CRITICAL + and issue.details.get("target") == "os.system" + and issue.details.get("raw_target") == "${callable_module}.${callable_leaf}" + for issue in result.issues + ) + + def test_unresolved_interpolated_target_fails_closed(self, tmp_path: Path) -> None: + """Unsupported Hydra resolvers are dynamic callable selectors, not safe unknown targets.""" + config = {"model": {"_target_": "${oc.env:NEMO_TARGET}", "command": "id"}} + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + unresolved_checks = [ + check for check in result.checks if check.name == "CVE-2025-23304: Unresolved Hydra _target_ Interpolation" + ] + assert len(unresolved_checks) == 1 + assert unresolved_checks[0].status == CheckStatus.FAILED + assert unresolved_checks[0].severity == IssueSeverity.CRITICAL + assert unresolved_checks[0].details["target"] == "${oc.env:NEMO_TARGET}" + assert unresolved_checks[0].details["unresolved_references"] == ["oc.env:NEMO_TARGET"] + assert not any(check.name == "Hydra _target_ Review" for check in result.checks) + + @pytest.mark.parametrize( + ("config", "expected_target"), + [ + ( + { + "safe_target": "nemo.collections.nlp.models.TextClassification", + "model": {"_target_": "${safe_target}"}, + }, + "nemo.collections.nlp.models.TextClassification", + ), + ( + { + "loader_target": "torch.utils.data.DataLoader", + "loader": {"_target_": "${loader_target}", "batch_size": 4}, + }, + "torch.utils.data.DataLoader", + ), + ], + ) + def test_interpolated_safe_target_remains_safe( + self, + tmp_path: Path, + config: dict[str, Any], + expected_target: str, + ) -> None: + """Simple interpolation should not create false positives when it resolves to an allowlisted target.""" + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + assert not any(check.name.startswith("CVE-2025-23304") for check in result.checks) + assert any( + check.name == "Hydra _target_ Safety Check" + and check.status == CheckStatus.PASSED + and check.details.get("target") == expected_target + and check.details.get("target_resolution") == "hydra_interpolation" + for check in result.checks + ) + + def test_torch_cpp_extension_load_inline_fails_aggregate_scan(self, tmp_path: Path) -> None: + """The MA-DSS-C194 target should not be accepted through a broad torch.utils prefix.""" + target = "torch.utils.cpp_extension.load_inline" + config = { + "model": { + "_target_": target, + "name": "audit_inline_ext", + "cpp_sources": ["#include \nint answer() { return 42; }"], + "functions": ["answer"], + }, + } + path = _create_nemo_file(tmp_path, config) + + result = scan_model_directory_or_file(str(path), config={"cache_scan_results": False}) + + assert determine_exit_code(result) == 1 + assert any( + issue.severity == IssueSeverity.CRITICAL and issue.details.get("target") == target + for issue in result.issues + ) + assert not any( + check.name == "Hydra _target_ Safety Check" and check.details.get("target") == target + for check in result.checks + ) + def test_torch_load_target_fails_aggregate_scan(self, tmp_path: Path) -> None: """A NeMo config using torch.load should produce a security failure, not exit 0.""" config = {"model": {"_target_": "torch.load", "f": "payload.pt"}}