diff --git a/src/google/adk/skills/_utils.py b/src/google/adk/skills/_utils.py index cab70a8d4b..e3185135e7 100644 --- a/src/google/adk/skills/_utils.py +++ b/src/google/adk/skills/_utils.py @@ -17,6 +17,7 @@ from __future__ import annotations import logging +import os import pathlib from typing import Union @@ -401,7 +402,7 @@ def _load_skill_from_gcs_dir( f" name '{skill_name_expected}'." ) - def _load_files_in_dir(subdir: str) -> Dict[str, Union[str, bytes]]: + def _load_files_in_dir(subdir: str) -> dict[str, str | bytes]: prefix = f"{skill_dir_prefix}{subdir}/" blobs = bucket.list_blobs(prefix=prefix) result = {} @@ -411,10 +412,26 @@ def _load_files_in_dir(subdir: str) -> Dict[str, Union[str, bytes]]: if not relative_path: continue + # Use PurePosixPath for platform-independent GCS path validation + p = pathlib.PurePosixPath(relative_path) + + # Reject absolute paths and traversal sequences + if p.is_absolute() or ".." in p.parts: + raise ValueError(f"Unsafe path in skill resource: {relative_path!r}") + + normalized = p.as_posix() + + # Prevent silent file overwrites via path aliasing + if normalized in result: + raise ValueError(f"Duplicate normalized path detected: {normalized!r}") + + # NOTE: Final path safety enforced during materialization + # via realpath + commonpath checks in skill_toolset.py try: - result[relative_path] = blob.download_as_text() + result[normalized] = blob.download_as_text() except UnicodeDecodeError: - result[relative_path] = blob.download_as_bytes() + result[normalized] = blob.download_as_bytes() + return result references = _load_files_in_dir("references") diff --git a/src/google/adk/tools/skill_toolset.py b/src/google/adk/tools/skill_toolset.py index 32ab45e7d2..a9d870dbfb 100644 --- a/src/google/adk/tools/skill_toolset.py +++ b/src/google/adk/tools/skill_toolset.py @@ -535,6 +535,7 @@ def _build_wrapper_code( ) # Build the boilerplate extract string + code_lines = [ "import os", "import tempfile", @@ -546,8 +547,14 @@ def _build_wrapper_code( "def _materialize_and_run():", " _orig_cwd = os.getcwd()", " with tempfile.TemporaryDirectory() as td:", + " _real_base = os.path.realpath(td)", " for rel_path, content in _files.items():", - " full_path = os.path.join(td, rel_path)", + " if os.path.isabs(rel_path):", + " raise ValueError(f'Absolute path rejected: {rel_path!r}')", + " _safe = os.path.realpath(os.path.join(td, rel_path))", + " if os.path.commonpath([_real_base, _safe]) != _real_base:", + " raise ValueError(f'Path traversal detected: {rel_path!r}')", + " full_path = _safe", " os.makedirs(os.path.dirname(full_path), exist_ok=True)", " mode = 'wb' if isinstance(content, bytes) else 'w'", " with open(full_path, mode) as f:",