From bf81544369b5b7b263472bf06c5c55958989f926 Mon Sep 17 00:00:00 2001 From: Michael D'Angelo Date: Sun, 31 May 2026 00:53:34 -0400 Subject: [PATCH 1/2] fix: resolve NeMo Hydra target interpolation --- CHANGELOG.md | 1 + modelaudit/scanners/nemo_scanner.py | 249 +++++++++++++++++++++++++--- tests/scanners/test_nemo_scanner.py | 150 +++++++++++++++++ 3 files changed, 375 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e4541c8a..65e04e69d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Bug Fixes +- resolve simple NeMo Hydra `_target_` interpolations before callable classification and fail closed on unresolved dynamic selectors - scan ONNX external data references in sparse initializers, tensor-valued attributes, and function defaults - avoid false-positive process-launch findings for parsed framed Python string literals - detect dangerous Python calls retrieved through module namespace dictionaries in ZIP and TAR members diff --git a/modelaudit/scanners/nemo_scanner.py b/modelaudit/scanners/nemo_scanner.py index a10fcbba1..b68cb01ec 100644 --- a/modelaudit/scanners/nemo_scanner.py +++ b/modelaudit/scanners/nemo_scanner.py @@ -160,6 +160,10 @@ "vars", ) _TARGET_TOKEN_RE = re.compile(r"__import__|[A-Z]+(?=[A-Z][a-z0-9]|[0-9_]|$)|[A-Z]?[a-z0-9]+") +_HYDRA_INTERPOLATION_RE = re.compile(r"\$\{([^{}]+)\}") +_HYDRA_PATH_TOKEN_RE = re.compile(r"([^\.\[\]]+)|\[(\d+)\]") +_HYDRA_INTERPOLATION_MAX_DEPTH = 8 +_HYDRA_REFERENCE_MISSING = object() CVE_2025_23304_ID = "CVE-2025-23304" CVE_2025_23304_CVSS = 7.6 @@ -213,6 +217,111 @@ def _find_suspicious_safe_prefixed_target_pattern(target: str) -> str | None: return None +def _hydra_reference_path(container_path: str, reference: str) -> str | None: + reference = reference.strip() + if not reference or ":" in reference: + return None + if not reference.startswith("."): + return reference + + base_parts = [part for part in container_path.split(".") if part] + relative = reference + while relative.startswith(".."): + if base_parts: + base_parts.pop() + relative = relative[2:] + if relative.startswith("."): + relative = relative[1:] + if relative.startswith("."): + relative = relative[1:] + if relative: + base_parts.append(relative) + return ".".join(base_parts) + + +def _hydra_path_tokens(path: str) -> list[str | int] | None: + tokens: list[str | int] = [] + cursor = 0 + while cursor < len(path): + if path[cursor] == ".": + cursor += 1 + continue + match = _HYDRA_PATH_TOKEN_RE.match(path, cursor) + if match is None: + return None + if match.group(2) is not None: + tokens.append(int(match.group(2))) + else: + tokens.append(match.group(1)) + cursor = match.end() + return tokens + + +def _lookup_hydra_path(root_config: Any, path: str) -> Any: + tokens = _hydra_path_tokens(path) + if tokens is None: + return _HYDRA_REFERENCE_MISSING + + value = root_config + for token in tokens: + if isinstance(token, int): + if not isinstance(value, list) or token >= len(value): + return _HYDRA_REFERENCE_MISSING + value = value[token] + continue + if not isinstance(value, dict) or token not in value: + return _HYDRA_REFERENCE_MISSING + value = value[token] + return value + + +def _resolve_hydra_target_interpolation( + target: str, + root_config: Any, + container_path: str, +) -> tuple[str, tuple[str, ...]]: + """Resolve simple OmegaConf-style references inside a Hydra target.""" + if _HYDRA_INTERPOLATION_RE.search(target) is None: + return target, () + + current = target + seen = {current} + for _ in range(_HYDRA_INTERPOLATION_MAX_DEPTH): + matches = list(_HYDRA_INTERPOLATION_RE.finditer(current)) + if not matches: + return current, () + + unresolved_references: list[str] = [] + parts: list[str] = [] + last_end = 0 + for match in matches: + parts.append(current[last_end : match.start()]) + reference = match.group(1).strip() + reference_path = _hydra_reference_path(container_path, reference) + value = ( + _HYDRA_REFERENCE_MISSING if reference_path is None else _lookup_hydra_path(root_config, reference_path) + ) + if value is _HYDRA_REFERENCE_MISSING or value is None or isinstance(value, dict | list): + unresolved_references.append(reference) + parts.append(match.group(0)) + else: + parts.append(str(value)) + last_end = match.end() + + if unresolved_references: + return current, tuple(unresolved_references) + + parts.append(current[last_end:]) + current = "".join(parts) + if current in seen: + cycle_refs = tuple(match.group(1).strip() for match in _HYDRA_INTERPOLATION_RE.finditer(current)) + return current, cycle_refs or ("",) + seen.add(current) + + unresolved = tuple(match.group(1).strip() for match in _HYDRA_INTERPOLATION_RE.finditer(current)) + return current, unresolved or ("",) + + def _is_runtime_truthy(value: Any) -> bool: """Return whether a Hydra argument would be truthy when passed to Python.""" return bool(value) @@ -2128,8 +2237,12 @@ def _check_hydra_targets( archive_path: str, result: ScanResult, path_prefix: str = "", + root_config: Any | None = None, ) -> None: """Recursively check _target_ values in Hydra config.""" + if root_config is None: + root_config = config + if isinstance(config, list): for index, item in enumerate(config): if isinstance(item, dict | list): @@ -2139,6 +2252,7 @@ def _check_hydra_targets( archive_path, result, f"{path_prefix}[{index}]" if path_prefix else f"[{index}]", + root_config, ) return @@ -2149,9 +2263,18 @@ def _check_hydra_targets( current_path = f"{path_prefix}.{key}" if path_prefix else key if key == "_target_" and isinstance(value, str): - self._evaluate_target(value, current_path, config_name, archive_path, result, config) + self._evaluate_target( + value, + current_path, + config_name, + archive_path, + result, + config, + root_config, + path_prefix, + ) elif isinstance(value, dict | list): - self._check_hydra_targets(value, config_name, archive_path, result, current_path) + self._check_hydra_targets(value, config_name, archive_path, result, current_path, root_config) def _evaluate_target( self, @@ -2161,26 +2284,46 @@ def _evaluate_target( archive_path: str, result: ScanResult, target_config: dict[str, Any] | None = None, + root_config: Any | None = None, + container_path: str = "", ) -> None: """Evaluate a single _target_ value for dangerous patterns.""" + raw_target = target + if root_config is not None and _HYDRA_INTERPOLATION_RE.search(target) is not None: + target, unresolved_references = _resolve_hydra_target_interpolation(target, root_config, container_path) + if unresolved_references: + self._add_unresolved_interpolated_target_check( + raw_target, + unresolved_references, + config_path, + config_name, + archive_path, + result, + ) + return + # Check against known dangerous targets (always flag, even if safe prefix) if target in _DANGEROUS_TARGETS or (target == "numpy.load" and self._numpy_load_allows_pickle(target_config)): + details: dict[str, Any] = { + "target": target, + "config_path": config_path, + "config_file": config_name, + "cve_id": CVE_2025_23304_ID, + "cvss": CVE_2025_23304_CVSS, + "cwe": CVE_2025_23304_CWE, + "description": CVE_2025_23304_DESCRIPTION, + "remediation": CVE_2025_23304_REMEDIATION, + } + if raw_target != target: + details["raw_target"] = raw_target + details["target_resolution"] = "hydra_interpolation" result.add_check( name=f"{CVE_2025_23304_ID}: Dangerous Hydra _target_", passed=False, message=(f"{CVE_2025_23304_ID}: Dangerous _target_ '{target}' at {config_path} in {config_name}"), severity=IssueSeverity.CRITICAL, location=f"{archive_path}:{config_name}", - details={ - "target": target, - "config_path": config_path, - "config_file": config_name, - "cve_id": CVE_2025_23304_ID, - "cvss": CVE_2025_23304_CVSS, - "cwe": CVE_2025_23304_CWE, - "description": CVE_2025_23304_DESCRIPTION, - "remediation": CVE_2025_23304_REMEDIATION, - }, + details=details, why=( f"The _target_ field '{target}' in this NeMo " f"config specifies a dangerous Python callable. " @@ -2202,35 +2345,86 @@ def _evaluate_target( config_name, archive_path, result, + raw_target=raw_target if raw_target != target else None, ) return + details = {"target": target, "config_path": config_path} + if raw_target != target: + details["raw_target"] = raw_target + details["target_resolution"] = "hydra_interpolation" result.add_check( name="Hydra _target_ Safety Check", passed=True, message=(f"Safe _target_ '{target}' at {config_path} in {config_name}"), location=f"{archive_path}:{config_name}", - details={"target": target, "config_path": config_path}, + details=details, ) return # Check for suspicious patterns in target (only for non-safe targets) pattern = _find_suspicious_target_pattern(target) if pattern is not None: - self._add_suspicious_target_check(target, pattern, config_path, config_name, archive_path, result) + self._add_suspicious_target_check( + target, + pattern, + config_path, + config_name, + archive_path, + result, + raw_target=raw_target if raw_target != target else None, + ) return # Unknown target - flag for review + details = { + "target": target, + "config_path": config_path, + "config_file": config_name, + } + if raw_target != target: + details["raw_target"] = raw_target + details["target_resolution"] = "hydra_interpolation" result.add_check( name="Hydra _target_ Review", passed=False, message=(f"Unknown _target_ '{target}' at {config_path} in {config_name} - requires manual review"), severity=IssueSeverity.INFO, location=f"{archive_path}:{config_name}", + details=details, + ) + + def _add_unresolved_interpolated_target_check( + self, + target: str, + unresolved_references: tuple[str, ...], + config_path: str, + config_name: str, + archive_path: str, + result: ScanResult, + ) -> None: + result.add_check( + name=f"{CVE_2025_23304_ID}: Unresolved Hydra _target_ Interpolation", + passed=False, + message=( + f"{CVE_2025_23304_ID}: Unresolved interpolated _target_ '{target}' at {config_path} in {config_name}" + ), + severity=IssueSeverity.CRITICAL, + location=f"{archive_path}:{config_name}", details={ "target": target, + "unresolved_references": list(unresolved_references), "config_path": config_path, "config_file": config_name, + "cve_id": CVE_2025_23304_ID, + "cvss": CVE_2025_23304_CVSS, + "cwe": CVE_2025_23304_CWE, + "description": CVE_2025_23304_DESCRIPTION, + "remediation": CVE_2025_23304_REMEDIATION, }, + why=( + "Hydra and OmegaConf resolve _target_ interpolation before instantiation. " + "ModelAudit could not resolve this dynamic callable selector statically, so it fails closed." + ), ) def _add_suspicious_target_check( @@ -2241,7 +2435,22 @@ def _add_suspicious_target_check( config_name: str, archive_path: str, result: ScanResult, + raw_target: str | None = None, ) -> None: + details: dict[str, Any] = { + "target": target, + "pattern": pattern, + "config_path": config_path, + "config_file": config_name, + "cve_id": CVE_2025_23304_ID, + "cvss": CVE_2025_23304_CVSS, + "cwe": CVE_2025_23304_CWE, + "description": CVE_2025_23304_DESCRIPTION, + "remediation": CVE_2025_23304_REMEDIATION, + } + if raw_target is not None: + details["raw_target"] = raw_target + details["target_resolution"] = "hydra_interpolation" result.add_check( name=f"{CVE_2025_23304_ID}: Suspicious Hydra _target_", passed=False, @@ -2252,17 +2461,7 @@ def _add_suspicious_target_check( ), severity=IssueSeverity.CRITICAL, location=f"{archive_path}:{config_name}", - details={ - "target": target, - "pattern": pattern, - "config_path": config_path, - "config_file": config_name, - "cve_id": CVE_2025_23304_ID, - "cvss": CVE_2025_23304_CVSS, - "cwe": CVE_2025_23304_CWE, - "description": CVE_2025_23304_DESCRIPTION, - "remediation": CVE_2025_23304_REMEDIATION, - }, + details=details, ) @staticmethod diff --git a/tests/scanners/test_nemo_scanner.py b/tests/scanners/test_nemo_scanner.py index 0b0810def..c2809d45c 100644 --- a/tests/scanners/test_nemo_scanner.py +++ b/tests/scanners/test_nemo_scanner.py @@ -3453,6 +3453,156 @@ def test_skops_load_target_is_review_only(self, tmp_path: Path) -> None: assert len(review_checks) == 1 assert review_checks[0].severity == IssueSeverity.INFO + @pytest.mark.parametrize( + ("config", "raw_target"), + [ + ( + { + "callable_module": "os", + "callable_leaf": "system", + "model": {"_target_": "${callable_module}.${callable_leaf}", "command": "id"}, + }, + "${callable_module}.${callable_leaf}", + ), + ( + { + "model": { + "callable_module": "os", + "callable_leaf": "system", + "_target_": "${.callable_module}.${.callable_leaf}", + "command": "id", + }, + }, + "${.callable_module}.${.callable_leaf}", + ), + ], + ) + def test_interpolated_target_resolves_before_dangerous_check( + self, + tmp_path: Path, + config: dict[str, Any], + raw_target: str, + ) -> None: + """Hydra interpolation that resolves to a dangerous callable must not fall through to INFO review.""" + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + cve_checks = [ + check + for check in result.checks + if check.name == "CVE-2025-23304: Dangerous Hydra _target_" and check.details.get("target") == "os.system" + ] + assert len(cve_checks) == 1 + assert cve_checks[0].status == CheckStatus.FAILED + assert cve_checks[0].severity == IssueSeverity.CRITICAL + assert cve_checks[0].details["raw_target"] == raw_target + assert cve_checks[0].details["target_resolution"] == "hydra_interpolation" + assert not any( + check.name == "Hydra _target_ Review" and check.details.get("target") == raw_target + for check in result.checks + ) + + def test_interpolated_dangerous_target_fails_aggregate_scan(self, tmp_path: Path) -> None: + """The MA-DSS-C191 payload should produce aggregate exit 1 instead of a clean scan.""" + config = { + "callable_module": "os", + "callable_leaf": "system", + "model": {"_target_": "${callable_module}.${callable_leaf}", "command": "id"}, + } + path = _create_nemo_file(tmp_path, config) + + result = scan_model_directory_or_file(str(path), config={"cache_scan_results": False}) + + assert determine_exit_code(result) == 1 + assert any( + issue.severity == IssueSeverity.CRITICAL + and issue.details.get("target") == "os.system" + and issue.details.get("raw_target") == "${callable_module}.${callable_leaf}" + for issue in result.issues + ) + + def test_unresolved_interpolated_target_fails_closed(self, tmp_path: Path) -> None: + """Unsupported Hydra resolvers are dynamic callable selectors, not safe unknown targets.""" + config = {"model": {"_target_": "${oc.env:NEMO_TARGET}", "command": "id"}} + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + unresolved_checks = [ + check for check in result.checks if check.name == "CVE-2025-23304: Unresolved Hydra _target_ Interpolation" + ] + assert len(unresolved_checks) == 1 + assert unresolved_checks[0].status == CheckStatus.FAILED + assert unresolved_checks[0].severity == IssueSeverity.CRITICAL + assert unresolved_checks[0].details["target"] == "${oc.env:NEMO_TARGET}" + assert unresolved_checks[0].details["unresolved_references"] == ["oc.env:NEMO_TARGET"] + assert not any(check.name == "Hydra _target_ Review" for check in result.checks) + + @pytest.mark.parametrize( + ("config", "expected_target"), + [ + ( + { + "safe_target": "nemo.collections.nlp.models.TextClassification", + "model": {"_target_": "${safe_target}"}, + }, + "nemo.collections.nlp.models.TextClassification", + ), + ( + { + "loader_target": "torch.utils.data.DataLoader", + "loader": {"_target_": "${loader_target}", "batch_size": 4}, + }, + "torch.utils.data.DataLoader", + ), + ], + ) + def test_interpolated_safe_target_remains_safe( + self, + tmp_path: Path, + config: dict[str, Any], + expected_target: str, + ) -> None: + """Simple interpolation should not create false positives when it resolves to an allowlisted target.""" + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + assert not any(check.name.startswith("CVE-2025-23304") for check in result.checks) + assert any( + check.name == "Hydra _target_ Safety Check" + and check.status == CheckStatus.PASSED + and check.details.get("target") == expected_target + and check.details.get("target_resolution") == "hydra_interpolation" + for check in result.checks + ) + + def test_torch_cpp_extension_load_inline_fails_aggregate_scan(self, tmp_path: Path) -> None: + """The MA-DSS-C194 target should not be accepted through a broad torch.utils prefix.""" + target = "torch.utils.cpp_extension.load_inline" + config = { + "model": { + "_target_": target, + "name": "audit_inline_ext", + "cpp_sources": ["#include \nint answer() { return 42; }"], + "functions": ["answer"], + }, + } + path = _create_nemo_file(tmp_path, config) + + result = scan_model_directory_or_file(str(path), config={"cache_scan_results": False}) + + assert determine_exit_code(result) == 1 + assert any( + issue.severity == IssueSeverity.CRITICAL and issue.details.get("target") == target + for issue in result.issues + ) + assert not any( + check.name == "Hydra _target_ Safety Check" and check.details.get("target") == target + for check in result.checks + ) + def test_torch_load_target_fails_aggregate_scan(self, tmp_path: Path) -> None: """A NeMo config using torch.load should produce a security failure, not exit 0.""" config = {"model": {"_target_": "torch.load", "f": "payload.pt"}} From 70c7134f8b8b138771bb7cde829950f502400529 Mon Sep 17 00:00:00 2001 From: Michael D'Angelo Date: Sun, 31 May 2026 14:51:48 +0000 Subject: [PATCH 2/2] fix: harden NeMo Hydra interpolation resolution --- modelaudit/scanners/nemo_scanner.py | 210 +++++++++++++++++++--------- tests/scanners/test_nemo_scanner.py | 95 +++++++++++++ 2 files changed, 236 insertions(+), 69 deletions(-) diff --git a/modelaudit/scanners/nemo_scanner.py b/modelaudit/scanners/nemo_scanner.py index b68cb01ec..9f305d685 100644 --- a/modelaudit/scanners/nemo_scanner.py +++ b/modelaudit/scanners/nemo_scanner.py @@ -161,7 +161,8 @@ ) _TARGET_TOKEN_RE = re.compile(r"__import__|[A-Z]+(?=[A-Z][a-z0-9]|[0-9_]|$)|[A-Z]?[a-z0-9]+") _HYDRA_INTERPOLATION_RE = re.compile(r"\$\{([^{}]+)\}") -_HYDRA_PATH_TOKEN_RE = re.compile(r"([^\.\[\]]+)|\[(\d+)\]") +_HYDRA_PATH_TOKEN_RE = re.compile(r"[^\.\[\]]+") +_HYDRA_LIST_INDEX_RE = re.compile(r"-?\d+") _HYDRA_INTERPOLATION_MAX_DEPTH = 8 _HYDRA_REFERENCE_MISSING = object() @@ -217,44 +218,84 @@ def _find_suspicious_safe_prefixed_target_pattern(target: str) -> str | None: return None -def _hydra_reference_path(container_path: str, reference: str) -> str | None: - reference = reference.strip() - if not reference or ":" in reference: - return None - if not reference.startswith("."): - return reference - - base_parts = [part for part in container_path.split(".") if part] - relative = reference - while relative.startswith(".."): - if base_parts: - base_parts.pop() - relative = relative[2:] - if relative.startswith("."): - relative = relative[1:] - if relative.startswith("."): - relative = relative[1:] - if relative: - base_parts.append(relative) - return ".".join(base_parts) - - -def _hydra_path_tokens(path: str) -> list[str | int] | None: - tokens: list[str | int] = [] +def _hydra_interpolation_matches(value: str) -> list[re.Match[str]]: + matches: list[re.Match[str]] = [] + for match in _HYDRA_INTERPOLATION_RE.finditer(value): + preceding_backslashes = 0 + cursor = match.start() - 1 + while cursor >= 0 and value[cursor] == "\\": + preceding_backslashes += 1 + cursor -= 1 + if preceding_backslashes % 2 == 0: + matches.append(match) + return matches + + +def _hydra_path_tokens(path: str) -> list[str] | None: + tokens: list[str] = [] cursor = 0 + expect_token = True while cursor < len(path): if path[cursor] == ".": + if expect_token: + return None cursor += 1 + expect_token = True + continue + if path[cursor] == "[": + close = path.find("]", cursor + 1) + if close < 0: + return None + token = path[cursor + 1 : close] + if not token or any(character in token for character in ".[]"): + return None + tokens.append(token) + cursor = close + 1 + expect_token = False continue + if not expect_token: + return None match = _HYDRA_PATH_TOKEN_RE.match(path, cursor) if match is None: return None - if match.group(2) is not None: - tokens.append(int(match.group(2))) - else: - tokens.append(match.group(1)) + tokens.append(match.group(0)) cursor = match.end() - return tokens + expect_token = False + return None if expect_token and tokens else tokens + + +def _render_hydra_path(tokens: list[str]) -> str: + return ".".join(tokens) + + +def _hydra_reference_path(container_path: str, reference: str) -> str | None: + reference = reference.strip() + if not reference or ":" in reference: + return None + if not reference.startswith("."): + tokens = _hydra_path_tokens(reference) + return None if not tokens else _render_hydra_path(tokens) + + leading_dots = len(reference) - len(reference.lstrip(".")) + suffix = reference[leading_dots:] + if not suffix: + return None + + base_tokens = _hydra_path_tokens(container_path) + suffix_tokens = _hydra_path_tokens(suffix) + parent_hops = leading_dots - 1 + if base_tokens is None or not suffix_tokens or parent_hops > len(base_tokens): + return None + if parent_hops: + del base_tokens[-parent_hops:] + return _render_hydra_path([*base_tokens, *suffix_tokens]) + + +def _hydra_reference_container_path(path: str) -> str | None: + tokens = _hydra_path_tokens(path) + if not tokens: + return None + return _render_hydra_path(tokens[:-1]) def _lookup_hydra_path(root_config: Any, path: str) -> Any: @@ -264,14 +305,22 @@ def _lookup_hydra_path(root_config: Any, path: str) -> Any: value = root_config for token in tokens: - if isinstance(token, int): - if not isinstance(value, list) or token >= len(value): + if isinstance(value, list): + if _HYDRA_LIST_INDEX_RE.fullmatch(token) is None: + return _HYDRA_REFERENCE_MISSING + index = int(token) + if not -len(value) <= index < len(value): return _HYDRA_REFERENCE_MISSING + value = value[index] + continue + if not isinstance(value, dict): + return _HYDRA_REFERENCE_MISSING + if token in value: value = value[token] continue - if not isinstance(value, dict) or token not in value: + if _HYDRA_LIST_INDEX_RE.fullmatch(token) is None or int(token) not in value: return _HYDRA_REFERENCE_MISSING - value = value[token] + value = value[int(token)] return value @@ -281,45 +330,68 @@ def _resolve_hydra_target_interpolation( container_path: str, ) -> tuple[str, tuple[str, ...]]: """Resolve simple OmegaConf-style references inside a Hydra target.""" - if _HYDRA_INTERPOLATION_RE.search(target) is None: - return target, () - - current = target - seen = {current} - for _ in range(_HYDRA_INTERPOLATION_MAX_DEPTH): - matches = list(_HYDRA_INTERPOLATION_RE.finditer(current)) - if not matches: - return current, () - - unresolved_references: list[str] = [] - parts: list[str] = [] - last_end = 0 - for match in matches: - parts.append(current[last_end : match.start()]) - reference = match.group(1).strip() - reference_path = _hydra_reference_path(container_path, reference) - value = ( - _HYDRA_REFERENCE_MISSING if reference_path is None else _lookup_hydra_path(root_config, reference_path) - ) - if value is _HYDRA_REFERENCE_MISSING or value is None or isinstance(value, dict | list): + return _resolve_hydra_interpolation_value(target, root_config, container_path, frozenset(), 0) + + +def _resolve_hydra_interpolation_value( + value: str, + root_config: Any, + container_path: str, + resolving_paths: frozenset[str], + depth: int, +) -> tuple[str, tuple[str, ...]]: + matches = _hydra_interpolation_matches(value) + if not matches: + return value, () + if depth >= _HYDRA_INTERPOLATION_MAX_DEPTH: + return value, ("",) + + unresolved_references: list[str] = [] + parts: list[str] = [] + last_end = 0 + for match in matches: + parts.append(value[last_end : match.start()]) + reference = match.group(1).strip() + reference_path = _hydra_reference_path(container_path, reference) + resolved = ( + _HYDRA_REFERENCE_MISSING if reference_path is None else _lookup_hydra_path(root_config, reference_path) + ) + if ( + resolved is _HYDRA_REFERENCE_MISSING + or resolved is None + or isinstance(resolved, dict | list) + or reference_path in resolving_paths + ): + unresolved_references.append(reference) + parts.append(match.group(0)) + elif isinstance(resolved, str): + assert reference_path is not None + referenced_container = _hydra_reference_container_path(reference_path) + if referenced_container is None: unresolved_references.append(reference) parts.append(match.group(0)) else: - parts.append(str(value)) - last_end = match.end() - - if unresolved_references: - return current, tuple(unresolved_references) + nested_value, nested_unresolved = _resolve_hydra_interpolation_value( + resolved, + root_config, + referenced_container, + resolving_paths | {reference_path}, + depth + 1, + ) + unresolved_references.extend(nested_unresolved) + parts.append(nested_value) + else: + parts.append(str(resolved)) + last_end = match.end() - parts.append(current[last_end:]) - current = "".join(parts) - if current in seen: - cycle_refs = tuple(match.group(1).strip() for match in _HYDRA_INTERPOLATION_RE.finditer(current)) - return current, cycle_refs or ("",) - seen.add(current) + if unresolved_references: + return value, tuple(unresolved_references) - unresolved = tuple(match.group(1).strip() for match in _HYDRA_INTERPOLATION_RE.finditer(current)) - return current, unresolved or ("",) + parts.append(value[last_end:]) + resolved_value = "".join(parts) + if resolved_value == value: + return value, () + return _resolve_hydra_interpolation_value(resolved_value, root_config, container_path, resolving_paths, depth + 1) def _is_runtime_truthy(value: Any) -> bool: @@ -2289,7 +2361,7 @@ def _evaluate_target( ) -> None: """Evaluate a single _target_ value for dangerous patterns.""" raw_target = target - if root_config is not None and _HYDRA_INTERPOLATION_RE.search(target) is not None: + if root_config is not None and _hydra_interpolation_matches(target): target, unresolved_references = _resolve_hydra_target_interpolation(target, root_config, container_path) if unresolved_references: self._add_unresolved_interpolated_target_check( diff --git a/tests/scanners/test_nemo_scanner.py b/tests/scanners/test_nemo_scanner.py index c2809d45c..6555953d6 100644 --- a/tests/scanners/test_nemo_scanner.py +++ b/tests/scanners/test_nemo_scanner.py @@ -3503,6 +3503,101 @@ def test_interpolated_target_resolves_before_dangerous_check( for check in result.checks ) + @pytest.mark.parametrize("raw_target", ["${targets.0}", "${targets[0]}"]) + def test_interpolated_safe_list_target_remains_safe(self, tmp_path: Path, raw_target: str) -> None: + """OmegaConf list indices support both dot and bracket notation.""" + config = { + "targets": ["nemo.collections.nlp.models.TextClassification"], + "model": {"_target_": raw_target}, + } + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + assert not any(check.name.startswith("CVE-2025-23304") for check in result.checks) + assert any( + check.name == "Hydra _target_ Safety Check" + and check.details.get("target") == "nemo.collections.nlp.models.TextClassification" + for check in result.checks + ) + + def test_interpolated_target_resolves_every_parent_hop(self, tmp_path: Path) -> None: + """Each leading dot after the first must move one level toward the root.""" + config = { + "target": "os.system", + "outer": { + "target": "nemo.Model", + "inner": {"_target_": "${...target}", "command": "id"}, + }, + } + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + assert any( + check.name == "CVE-2025-23304: Dangerous Hydra _target_" + and check.details.get("target") == "os.system" + and check.details.get("raw_target") == "${...target}" + for check in result.checks + ) + assert not any( + check.name == "Hydra _target_ Safety Check" and check.details.get("target") == "nemo.Model" + for check in result.checks + ) + + def test_nested_interpolated_target_uses_referenced_container(self, tmp_path: Path) -> None: + """Nested relative selectors resolve from the referenced node, not the _target_ node.""" + config = { + "targets": {"danger": "os.system", "chosen": "${.danger}"}, + "model": {"danger": "nemo.Model", "_target_": "${targets.chosen}", "command": "id"}, + } + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + assert any( + check.name == "CVE-2025-23304: Dangerous Hydra _target_" + and check.details.get("target") == "os.system" + and check.details.get("raw_target") == "${targets.chosen}" + for check in result.checks + ) + assert not any( + check.name == "Hydra _target_ Safety Check" and check.details.get("target") == "nemo.Model" + for check in result.checks + ) + + def test_nested_interpolated_path_selects_dangerous_target(self, tmp_path: Path) -> None: + """Simple nested bracket selectors must retain dangerous-target classification.""" + config = { + "selected_target": "dangerous", + "targets": {"dangerous": "os.system"}, + "model": {"_target_": "${targets[${selected_target}]}", "command": "id"}, + } + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + assert any( + check.name == "CVE-2025-23304: Dangerous Hydra _target_" + and check.details.get("target") == "os.system" + and check.details.get("raw_target") == "${targets[${selected_target}]}" + for check in result.checks + ) + + def test_escaped_interpolated_target_is_not_resolved(self, tmp_path: Path) -> None: + """An escaped OmegaConf interpolation marker is literal text, not a callable selector.""" + raw_target = r"\${callable}" + config = {"callable": "os.system", "model": {"_target_": raw_target}} + path = _create_nemo_file(tmp_path, config) + + result = NemoScanner().scan(str(path)) + + assert not any(check.name.startswith("CVE-2025-23304") for check in result.checks) + assert any( + check.name == "Hydra _target_ Review" and check.details.get("target") == raw_target + for check in result.checks + ) + def test_interpolated_dangerous_target_fails_aggregate_scan(self, tmp_path: Path) -> None: """The MA-DSS-C191 payload should produce aggregate exit 1 instead of a clean scan.""" config = {