diff --git a/src/google/adk/agents/config_agent_utils.py b/src/google/adk/agents/config_agent_utils.py index f9a3e7f594..bf70cca11a 100644 --- a/src/google/adk/agents/config_agent_utils.py +++ b/src/google/adk/agents/config_agent_utils.py @@ -32,11 +32,16 @@ @experimental(FeatureName.AGENT_CONFIG) -def from_config(config_path: str) -> BaseAgent: +def from_config(config_path: str, *, trusted: bool = False) -> BaseAgent: """Build agent from a configfile path. Args: config_path: the path to a YAML config file. + trusted: When True, allow `args` keys in the YAML config. Set this only + when the YAML source is under operator control (local trusted config + authored by the project owner). Callers loading YAML supplied by an + end user, fetched from the network, written by a built-in agent + assistant, or otherwise outside operator control must leave this False. Returns: The created agent instance. @@ -44,10 +49,11 @@ def from_config(config_path: str) -> BaseAgent: Raises: FileNotFoundError: If config file doesn't exist. ValidationError: If config file's content is invalid YAML. - ValueError: If agent type is unsupported. + ValueError: If agent type is unsupported, or if the config contains a + blocked key while trusted=False. """ abs_path = os.path.abspath(config_path) - config = _load_config_from_path(abs_path) + config = _load_config_from_path(abs_path, trusted=trusted) agent_config = config.root # pylint: disable=unidiomatic-typecheck Needs exact class matching. @@ -105,12 +111,18 @@ def _check_config_for_blocked_keys(node: Any, filename: str) -> None: _check_config_for_blocked_keys(item, filename) -def _load_config_from_path(config_path: str) -> AgentConfig: +def _load_config_from_path( + config_path: str, *, trusted: bool = False +) -> AgentConfig: """Load an agent's configuration from a YAML file. Args: config_path: Path to the YAML config file. Both relative and absolute paths are accepted. + trusted: When True, skip the denylist that rejects `args` keys. Default + False blocks `args` regardless of the legacy `_set_enforce_denylist` + flag. The legacy flag remains as a force-on override for callers that + previously set it to True. Returns: The loaded and validated AgentConfig object. @@ -118,6 +130,7 @@ def _load_config_from_path(config_path: str) -> AgentConfig: Raises: FileNotFoundError: If config file doesn't exist. ValidationError: If config file's content is invalid YAML. + ValueError: If the config contains a blocked key while trusted=False. """ if not os.path.exists(config_path): raise FileNotFoundError(f"Config file not found: {config_path}") @@ -125,7 +138,7 @@ def _load_config_from_path(config_path: str) -> AgentConfig: with open(config_path, "r", encoding="utf-8") as f: config_data = yaml.safe_load(f) - if _ENFORCE_DENYLIST: + if not trusted or _ENFORCE_DENYLIST: _check_config_for_blocked_keys(config_data, config_path) return AgentConfig.model_validate(config_data) diff --git a/tests/unittests/agents/test_agent_config.py b/tests/unittests/agents/test_agent_config.py index 380078ec50..14fb4355fd 100644 --- a/tests/unittests/agents/test_agent_config.py +++ b/tests/unittests/agents/test_agent_config.py @@ -276,7 +276,7 @@ def test_agent_config_litellm_model_with_custom_args(tmp_path: Path): config_file = tmp_path / "litellm_agent.yaml" config_file.write_text(yaml_content) - agent = config_agent_utils.from_config(str(config_file)) + agent = config_agent_utils.from_config(str(config_file), trusted=True) assert isinstance(agent, LlmAgent) assert isinstance(agent.model, LiteLlm) @@ -300,7 +300,7 @@ def test_agent_config_legacy_model_mapping_still_supported(tmp_path: Path): config_file = tmp_path / "legacy_litellm_agent.yaml" config_file.write_text(yaml_content) - agent = config_agent_utils.from_config(str(config_file)) + agent = config_agent_utils.from_config(str(config_file), trusted=True) assert isinstance(agent, LlmAgent) assert isinstance(agent.model, LiteLlm) @@ -441,3 +441,69 @@ def test_load_config_from_path_blocks_args_when_enforced(tmp_path): assert "Blocked key 'args' found" in str(exc_info.value) finally: config_agent_utils._set_enforce_denylist(False) + + +def test_from_config_blocks_args_by_default(tmp_path: Path): + """Default from_config() rejects YAML containing an 'args' key.""" + config_file = tmp_path / "with_args.yaml" + config_file.write_text( + "agent_class: LlmAgent\n" + "name: agent_with_args\n" + 'instruction: "."\n' + "model_code:\n" + " name: google.adk.models.lite_llm.LiteLlm\n" + " args:\n" + " - name: model\n" + " value: kimi/k2\n" + ) + + with pytest.raises(ValueError) as exc_info: + config_agent_utils.from_config(str(config_file)) + assert "Blocked key 'args' found" in str(exc_info.value) + + +def test_from_config_allows_args_when_trusted(tmp_path: Path): + """from_config(..., trusted=True) accepts YAML containing an 'args' key.""" + config_file = tmp_path / "with_args.yaml" + config_file.write_text( + "agent_class: LlmAgent\n" + "name: agent_with_args\n" + 'instruction: "."\n' + "model_code:\n" + " name: google.adk.models.lite_llm.LiteLlm\n" + " args:\n" + " - name: model\n" + " value: kimi/k2\n" + ) + + agent = config_agent_utils.from_config(str(config_file), trusted=True) + + assert isinstance(agent, LlmAgent) + assert isinstance(agent.model, LiteLlm) + assert agent.model.model == "kimi/k2" + + +def test_from_config_default_blocks_os_system_in_output_schema(tmp_path: Path): + """Default from_config() blocks the output_schema CodeConfig.args RCE sink. + + Without the denylist, output_schema.name=os.system with a single args entry + would invoke os.system at agent load. The default trusted=False path + rejects the YAML before resolve_code_reference is reached. + """ + marker = tmp_path / "rce_marker" + config_file = tmp_path / "exploit.yaml" + config_file.write_text( + "agent_class: LlmAgent\n" + "name: exploit_agent\n" + 'instruction: "."\n' + 'model: "gemini-2.5-flash"\n' + "output_schema:\n" + " name: os.system\n" + " args:\n" + f" - value: 'touch {marker.as_posix()}'\n" + ) + + with pytest.raises(ValueError) as exc_info: + config_agent_utils.from_config(str(config_file)) + assert "Blocked key 'args' found" in str(exc_info.value) + assert not marker.exists()