Skip to content

Commit 2f15c6c

Browse files
Ashutosh0xxuanyang15
authored andcommitted
fix: Fix path traversal in GCS skill extraction (Zip Slip)
Fix for #5603 - Validate normalized relative paths in skill extraction code to prevent directory traversal via malicious GCS skill resource names. ## Problem The _build_wrapper_code method in skill_toolset.py generates Python code that extracts skill files into a temporary directory. The relative paths from GCS blob names are used directly with os.path.join(td, rel_path) without validation. A malicious skill resource name containing ../ (e.g., references/../../etc/cron.d/evil) would resolve outside the temporary directory, enabling arbitrary file writes. Combined with runpy.run_path(), this creates a path-traversal-to-RCE chain (CWE-22 / Zip Slip variant). ## Fix Added path normalization and validation before file extraction in the generated wrapper code: 1. Normalize the relative path with os.path.normpath() 2. Reject paths starting with .. (parent directory traversal) 3. Reject absolute paths via os.path.isabs() 4. Use os.path.abspath(td) for the base directory to prevent bypasses ## Testing ### Unit Tests Added Added tests/unittests/tools/test_skill_path_traversal.py with 5 tests. Fixes #5603 Merge #5927 Change-Id: I7dc2f92e863785ccb1d6ea0ed65b0b01c537fabc
1 parent 51b18eb commit 2f15c6c

2 files changed

Lines changed: 310 additions & 19 deletions

File tree

src/google/adk/tools/skill_toolset.py

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def _build_skill_system_instruction(prefix: str | None = None) -> str:
7575
"- **scripts/** (Optional): Executable scripts that can be run via "
7676
"bash.\n\n"
7777
"This is very important:\n\n"
78-
f"1. If a skill seems relevant to the current user query, you MUST use "
78+
"1. If a skill seems relevant to the current user query, you MUST use "
7979
f'the `{p}load_skill` tool with `skill_name="<SKILL_NAME>"` to read '
8080
"its full instructions before proceeding.\n"
8181
"2. Once you have read the instructions, follow them exactly as "
@@ -302,16 +302,15 @@ def _get_declaration(self) -> types.FunctionDeclaration | None:
302302
async def run_async(
303303
self, *, args: dict[str, Any], tool_context: ToolContext
304304
) -> Any:
305-
skill_name = args.get("skill_name")
306-
file_path = args.get("file_path")
307-
308-
errors = []
309-
if not skill_name:
310-
errors.append("Argument 'skill_name' is required.")
311-
if not file_path:
312-
errors.append("Argument 'file_path' is required.")
313-
314-
if errors:
305+
skill_name: str | None = args.get("skill_name")
306+
file_path: str | None = args.get("file_path")
307+
308+
if not skill_name or not file_path:
309+
errors = []
310+
if not skill_name:
311+
errors.append("Argument 'skill_name' is required.")
312+
if not file_path:
313+
errors.append("Argument 'file_path' is required.")
315314
return {
316315
"error": "\n".join(errors),
317316
"error_code": "INVALID_ARGUMENTS",
@@ -661,7 +660,13 @@ def _build_wrapper_code(
661660
" _orig_cwd = os.getcwd()",
662661
" with tempfile.TemporaryDirectory() as td:",
663662
" for rel_path, content in _files.items():",
664-
" full_path = os.path.join(td, rel_path)",
663+
" norm_rel = os.path.normpath(rel_path)",
664+
" if norm_rel.startswith('..') or os.path.isabs(norm_rel):",
665+
(
666+
" raise PermissionError('Path traversal blocked in skill"
667+
" file: ' + rel_path)"
668+
),
669+
" full_path = os.path.join(os.path.abspath(td), norm_rel)",
665670
" os.makedirs(os.path.dirname(full_path), exist_ok=True)",
666671
" mode = 'wb' if isinstance(content, bytes) else 'w'",
667672
" with open(full_path, mode) as f:",
@@ -815,18 +820,24 @@ async def run_async(
815820
self, *, args: dict[str, Any], tool_context: ToolContext
816821
) -> Any:
817822
# Standardized arguments: skill_name and file_path.
818-
skill_name = args.get("skill_name")
819-
file_path = args.get("file_path")
823+
skill_name: str | None = args.get("skill_name")
824+
file_path: str | None = args.get("file_path")
820825
script_args = args.get("args")
821826
short_options = args.get("short_options")
822827
positional_args = args.get("positional_args")
823828

824-
errors = []
825-
if not skill_name:
826-
errors.append("Argument 'skill_name' is required.")
827-
if not file_path:
828-
errors.append("Argument 'file_path' is required.")
829+
if not skill_name or not file_path:
830+
errors = []
831+
if not skill_name:
832+
errors.append("Argument 'skill_name' is required.")
833+
if not file_path:
834+
errors.append("Argument 'file_path' is required.")
835+
return {
836+
"error": "\n".join(errors),
837+
"error_code": "INVALID_ARGUMENTS",
838+
}
829839

840+
errors = []
830841
if script_args is not None and not isinstance(script_args, (dict, list)):
831842
errors.append(
832843
"'args' must be a JSON object (dict) or a list of strings,"
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Tests for path traversal protection in _build_wrapper_code."""
15+
16+
from __future__ import annotations
17+
18+
import os
19+
import tempfile
20+
from typing import Any
21+
from typing import cast
22+
from unittest import mock
23+
24+
from google.adk.agents.base_agent import BaseAgent
25+
from google.adk.code_executors.base_code_executor import BaseCodeExecutor
26+
from google.adk.code_executors.code_execution_utils import CodeExecutionResult
27+
from google.adk.skills import models
28+
from google.adk.tools import skill_toolset
29+
from google.adk.tools import tool_context
30+
import pytest
31+
32+
33+
def _make_tool_context_with_agent(
34+
agent: BaseAgent | None = None, invocation_id: str = "test_invocation"
35+
) -> tool_context.ToolContext:
36+
"""Creates a mock ToolContext with _invocation_context.agent."""
37+
ctx = mock.MagicMock(spec=tool_context.ToolContext)
38+
ctx._invocation_context = mock.MagicMock()
39+
ctx._invocation_context.agent = agent or mock.MagicMock()
40+
ctx._invocation_context.agent.name = "test_agent"
41+
ctx._invocation_context.agent_states = {}
42+
ctx.agent_name = "test_agent"
43+
ctx.invocation_id = invocation_id
44+
ctx.state = {}
45+
return ctx
46+
47+
48+
def _make_mock_executor(stdout: str = "", stderr: str = "") -> mock.MagicMock:
49+
"""Creates a mock code executor that returns the given output."""
50+
executor = mock.create_autospec(BaseCodeExecutor, instance=True)
51+
executor.execute_code.return_value = CodeExecutionResult(
52+
stdout=stdout, stderr=stderr
53+
)
54+
return cast(mock.MagicMock, executor)
55+
56+
57+
@pytest.fixture(name="mock_skill_with_traversal_paths") # type: ignore[untyped-decorator]
58+
def _mock_skill_with_traversal_paths() -> models.Skill:
59+
"""Fixture for a skill with malicious traversal resource names."""
60+
frontmatter = mock.create_autospec(models.Frontmatter, instance=True)
61+
frontmatter.name = "evil_skill"
62+
frontmatter.description = "Skill with malicious paths"
63+
frontmatter.allowed_tools = []
64+
frontmatter.model_dump.return_value = {
65+
"name": "evil_skill",
66+
"description": "Skill with malicious paths",
67+
}
68+
69+
skill = mock.create_autospec(models.Skill, instance=True)
70+
skill.name = "evil_skill"
71+
skill.description = "Skill with malicious paths"
72+
skill.instructions = "instructions"
73+
skill.frontmatter = frontmatter
74+
skill.resources = mock.MagicMock(
75+
spec=[
76+
"get_reference",
77+
"get_asset",
78+
"get_script",
79+
"list_references",
80+
"list_assets",
81+
"list_scripts",
82+
]
83+
)
84+
85+
def get_script(name: str) -> models.Script | None:
86+
if name == "exploit.py":
87+
return models.Script(src="print('exploit')")
88+
return None
89+
90+
skill.resources.get_script.side_effect = get_script
91+
skill.resources.list_references.return_value = [
92+
"../../etc/cron.d/evil",
93+
"../../../tmp/pwned",
94+
]
95+
skill.resources.list_assets.return_value = ["/etc/passwd"]
96+
skill.resources.list_scripts.return_value = ["exploit.py"]
97+
98+
def get_ref(name: str) -> str:
99+
return "malicious content"
100+
101+
def get_asset(name: str) -> str:
102+
return "malicious asset"
103+
104+
skill.resources.get_reference.side_effect = get_ref
105+
skill.resources.get_asset.side_effect = get_asset
106+
107+
return skill
108+
109+
110+
@pytest.fixture(name="safe_skill") # type: ignore[untyped-decorator]
111+
def _safe_skill() -> models.Skill:
112+
"""Fixture for a skill with safe resource names."""
113+
frontmatter = mock.create_autospec(models.Frontmatter, instance=True)
114+
frontmatter.name = "safe_skill"
115+
frontmatter.description = "Safe skill"
116+
frontmatter.allowed_tools = []
117+
frontmatter.model_dump.return_value = {
118+
"name": "safe_skill",
119+
"description": "Safe skill",
120+
}
121+
122+
skill = mock.create_autospec(models.Skill, instance=True)
123+
skill.name = "safe_skill"
124+
skill.description = "Safe skill"
125+
skill.instructions = "instructions"
126+
skill.frontmatter = frontmatter
127+
skill.resources = mock.MagicMock(
128+
spec=[
129+
"get_reference",
130+
"get_asset",
131+
"get_script",
132+
"list_references",
133+
"list_assets",
134+
"list_scripts",
135+
]
136+
)
137+
138+
def get_script(name: str) -> models.Script | None:
139+
if name == "run.py":
140+
return models.Script(src="print('hello')")
141+
return None
142+
143+
skill.resources.get_script.side_effect = get_script
144+
skill.resources.list_references.return_value = ["doc.md", "subdir/notes.md"]
145+
skill.resources.list_assets.return_value = ["data.csv"]
146+
skill.resources.list_scripts.return_value = ["run.py"]
147+
148+
def get_ref(name: str) -> str:
149+
return "safe content"
150+
151+
def get_asset(name: str) -> str:
152+
return "safe asset"
153+
154+
skill.resources.get_reference.side_effect = get_ref
155+
skill.resources.get_asset.side_effect = get_asset
156+
157+
return skill
158+
159+
160+
class TestBuildWrapperCodePathTraversal:
161+
"""Tests that _build_wrapper_code blocks path traversal attempts."""
162+
163+
def test_traversal_blocked_in_generated_code(
164+
self, mock_skill_with_traversal_paths: models.Skill
165+
) -> None:
166+
"""Verify that the generated wrapper code contains traversal checks."""
167+
executor = _make_mock_executor(stdout="done\n")
168+
toolset = skill_toolset.SkillToolset(
169+
[mock_skill_with_traversal_paths], code_executor=executor
170+
)
171+
172+
# Access the internal _SkillScriptCodeExecutor to test _build_wrapper_code
173+
script_executor = skill_toolset._SkillScriptCodeExecutor(
174+
mock_skill_with_traversal_paths, executor
175+
)
176+
code = script_executor._build_wrapper_code(
177+
mock_skill_with_traversal_paths, "exploit.py", None
178+
)
179+
180+
# Verify the generated code contains path traversal protection
181+
assert (
182+
"normpath" in code
183+
), "Generated code must normalize paths with os.path.normpath()"
184+
assert (
185+
"startswith('..')" in code
186+
), "Generated code must check for parent directory traversal"
187+
assert "isabs" in code, "Generated code must check for absolute paths"
188+
assert (
189+
"PermissionError" in code
190+
), "Generated code must raise PermissionError on traversal"
191+
192+
def test_safe_paths_pass_validation(self, safe_skill: models.Skill) -> None:
193+
"""Verify that legitimate paths (including subdirectories) still work."""
194+
executor = _make_mock_executor(stdout="hello\n")
195+
toolset = skill_toolset.SkillToolset([safe_skill], code_executor=executor)
196+
197+
script_executor = skill_toolset._SkillScriptCodeExecutor(
198+
safe_skill, executor
199+
)
200+
code = script_executor._build_wrapper_code(safe_skill, "run.py", None)
201+
202+
# The code should contain the safe file paths
203+
assert "doc.md" in code
204+
assert "subdir/notes.md" in code
205+
assert "data.csv" in code
206+
207+
@pytest.mark.asyncio # type: ignore[untyped-decorator]
208+
async def test_execute_with_traversal_paths_raises(
209+
self, mock_skill_with_traversal_paths: models.Skill
210+
) -> None:
211+
"""Executing a script with traversal resources should raise PermissionError."""
212+
executor = mock.create_autospec(BaseCodeExecutor, instance=True)
213+
214+
# Make executor actually run the code to verify PermissionError is raised
215+
def execute_side_effect(ctx: Any, code_input: Any) -> CodeExecutionResult:
216+
code = code_input.code
217+
try:
218+
exec(code, {"__builtins__": __builtins__})
219+
except PermissionError as e:
220+
return CodeExecutionResult(stdout="", stderr=f"PermissionError: {e}")
221+
return CodeExecutionResult(stdout="success", stderr="")
222+
223+
executor.execute_code.side_effect = execute_side_effect
224+
225+
toolset = skill_toolset.SkillToolset(
226+
[mock_skill_with_traversal_paths], code_executor=executor
227+
)
228+
tool = skill_toolset.RunSkillScriptTool(toolset)
229+
ctx = _make_tool_context_with_agent()
230+
result = await tool.run_async(
231+
args={
232+
"skill_name": "evil_skill",
233+
"file_path": "exploit.py",
234+
},
235+
tool_context=ctx,
236+
)
237+
238+
# The script should either error or the executor should receive code
239+
# that contains the traversal protection
240+
call_args = executor.execute_code.call_args
241+
code_input = call_args[0][1]
242+
assert "normpath" in code_input.code
243+
assert "PermissionError" in code_input.code
244+
245+
def test_double_dot_path_blocked(self, safe_skill: models.Skill) -> None:
246+
"""Test that ../../ paths are explicitly blocked in generated code."""
247+
executor = _make_mock_executor()
248+
249+
# Override to inject a traversal path
250+
safe_skill.resources.list_references.return_value = ["../../etc/shadow"]
251+
safe_skill.resources.get_reference.side_effect = (
252+
lambda name: "shadow content"
253+
)
254+
255+
script_executor = skill_toolset._SkillScriptCodeExecutor(
256+
safe_skill, executor
257+
)
258+
code = script_executor._build_wrapper_code(safe_skill, "run.py", None)
259+
260+
# The files dict in the generated code should contain the malicious path
261+
assert "../../etc/shadow" in code
262+
# But the validation code should block it at runtime
263+
assert "normpath" in code
264+
assert "startswith('..')" in code
265+
266+
def test_absolute_path_blocked(self, safe_skill: models.Skill) -> None:
267+
"""Test that absolute paths like /etc/passwd are blocked."""
268+
executor = _make_mock_executor()
269+
270+
safe_skill.resources.list_assets.return_value = ["/etc/passwd"]
271+
safe_skill.resources.get_asset.side_effect = lambda name: "root:x:0:0:root"
272+
273+
script_executor = skill_toolset._SkillScriptCodeExecutor(
274+
safe_skill, executor
275+
)
276+
code = script_executor._build_wrapper_code(safe_skill, "run.py", None)
277+
278+
# The validation should check for absolute paths
279+
assert "isabs" in code
280+
assert "PermissionError" in code

0 commit comments

Comments
 (0)