Skip to content

Commit 9799b49

Browse files
wyf7107copybara-github
authored andcommitted
fix: Fix path traversal in GCS skill extraction (Zip Slip)
Port of GitHub PR: #5927 Validate normalized relative paths in skill extraction code to prevent directory traversal via malicious GCS skill resource names. Co-authored-by: Yifan Wang <wanyif@google.com> PiperOrigin-RevId: 927503030
1 parent 7ab188c commit 9799b49

2 files changed

Lines changed: 309 additions & 17 deletions

File tree

src/google/adk/tools/skill_toolset.py

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -302,16 +302,15 @@ def _get_declaration(self) -> types.FunctionDeclaration | None:
302302
async def run_async(
303303
self, *, args: dict[str, Any], tool_context: ToolContext
304304
) -> Any:
305-
skill_name = args.get("skill_name")
306-
file_path = args.get("file_path")
307-
308-
errors = []
309-
if not skill_name:
310-
errors.append("Argument 'skill_name' is required.")
311-
if not file_path:
312-
errors.append("Argument 'file_path' is required.")
313-
314-
if errors:
305+
skill_name: str | None = args.get("skill_name")
306+
file_path: str | None = args.get("file_path")
307+
308+
if not skill_name or not file_path:
309+
errors = []
310+
if not skill_name:
311+
errors.append("Argument 'skill_name' is required.")
312+
if not file_path:
313+
errors.append("Argument 'file_path' is required.")
315314
return {
316315
"error": "\n".join(errors),
317316
"error_code": "INVALID_ARGUMENTS",
@@ -661,7 +660,13 @@ def _build_wrapper_code(
661660
" _orig_cwd = os.getcwd()",
662661
" with tempfile.TemporaryDirectory() as td:",
663662
" for rel_path, content in _files.items():",
664-
" full_path = os.path.join(td, rel_path)",
663+
" norm_rel = os.path.normpath(rel_path)",
664+
" if norm_rel.startswith('..') or os.path.isabs(norm_rel):",
665+
(
666+
" raise PermissionError('Path traversal blocked in skill"
667+
" file: ' + rel_path)"
668+
),
669+
" full_path = os.path.join(os.path.abspath(td), norm_rel)",
665670
" os.makedirs(os.path.dirname(full_path), exist_ok=True)",
666671
" mode = 'wb' if isinstance(content, bytes) else 'w'",
667672
" with open(full_path, mode) as f:",
@@ -815,17 +820,24 @@ async def run_async(
815820
self, *, args: dict[str, Any], tool_context: ToolContext
816821
) -> Any:
817822
# Standardized arguments: skill_name and file_path.
818-
skill_name = args.get("skill_name")
819-
file_path = args.get("file_path")
823+
skill_name: str | None = args.get("skill_name")
824+
file_path: str | None = args.get("file_path")
820825
script_args = args.get("args")
821826
short_options = args.get("short_options")
822827
positional_args = args.get("positional_args")
823828

829+
if not skill_name or not file_path:
830+
errors = []
831+
if not skill_name:
832+
errors.append("Argument 'skill_name' is required.")
833+
if not file_path:
834+
errors.append("Argument 'file_path' is required.")
835+
return {
836+
"error": "\n".join(errors),
837+
"error_code": "INVALID_ARGUMENTS",
838+
}
839+
824840
errors = []
825-
if not skill_name:
826-
errors.append("Argument 'skill_name' is required.")
827-
if not file_path:
828-
errors.append("Argument 'file_path' is required.")
829841

830842
if script_args is not None and not isinstance(script_args, (dict, list)):
831843
errors.append(
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Tests for path traversal protection in _build_wrapper_code."""
15+
16+
from __future__ import annotations
17+
18+
import os
19+
import tempfile
20+
from typing import Any
21+
from typing import cast
22+
from unittest import mock
23+
24+
from google.adk.agents.base_agent import BaseAgent
25+
from google.adk.code_executors.base_code_executor import BaseCodeExecutor
26+
from google.adk.code_executors.code_execution_utils import CodeExecutionResult
27+
from google.adk.skills import models
28+
from google.adk.tools import skill_toolset
29+
from google.adk.tools import tool_context
30+
import pytest
31+
32+
33+
def _make_tool_context_with_agent(
34+
agent: BaseAgent | None = None, invocation_id: str = "test_invocation"
35+
) -> tool_context.ToolContext:
36+
"""Creates a mock ToolContext with _invocation_context.agent."""
37+
ctx = mock.MagicMock(spec=tool_context.ToolContext)
38+
ctx._invocation_context = mock.MagicMock()
39+
ctx._invocation_context.agent = agent or mock.MagicMock()
40+
ctx._invocation_context.agent.name = "test_agent"
41+
ctx._invocation_context.agent_states = {}
42+
ctx.agent_name = "test_agent"
43+
ctx.invocation_id = invocation_id
44+
ctx.state = {}
45+
return ctx
46+
47+
48+
def _make_mock_executor(stdout: str = "", stderr: str = "") -> mock.MagicMock:
49+
"""Creates a mock code executor that returns the given output."""
50+
executor = mock.create_autospec(BaseCodeExecutor, instance=True)
51+
executor.execute_code.return_value = CodeExecutionResult(
52+
stdout=stdout, stderr=stderr
53+
)
54+
return cast(mock.MagicMock, executor)
55+
56+
57+
@pytest.fixture(name="mock_skill_with_traversal_paths") # type: ignore[untyped-decorator]
58+
def _mock_skill_with_traversal_paths() -> models.Skill:
59+
"""Fixture for a skill with malicious traversal resource names."""
60+
frontmatter = mock.create_autospec(models.Frontmatter, instance=True)
61+
frontmatter.name = "evil_skill"
62+
frontmatter.description = "Skill with malicious paths"
63+
frontmatter.allowed_tools = []
64+
frontmatter.model_dump.return_value = {
65+
"name": "evil_skill",
66+
"description": "Skill with malicious paths",
67+
}
68+
69+
skill = mock.create_autospec(models.Skill, instance=True)
70+
skill.name = "evil_skill"
71+
skill.description = "Skill with malicious paths"
72+
skill.instructions = "instructions"
73+
skill.frontmatter = frontmatter
74+
skill.resources = mock.MagicMock(
75+
spec=[
76+
"get_reference",
77+
"get_asset",
78+
"get_script",
79+
"list_references",
80+
"list_assets",
81+
"list_scripts",
82+
]
83+
)
84+
85+
def get_script(name: str) -> models.Script | None:
86+
if name == "exploit.py":
87+
return models.Script(src="print('exploit')")
88+
return None
89+
90+
skill.resources.get_script.side_effect = get_script
91+
skill.resources.list_references.return_value = [
92+
"../../etc/cron.d/evil",
93+
"../../../tmp/pwned",
94+
]
95+
skill.resources.list_assets.return_value = ["/etc/passwd"]
96+
skill.resources.list_scripts.return_value = ["exploit.py"]
97+
98+
def get_ref(name: str) -> str:
99+
return "malicious content"
100+
101+
def get_asset(name: str) -> str:
102+
return "malicious asset"
103+
104+
skill.resources.get_reference.side_effect = get_ref
105+
skill.resources.get_asset.side_effect = get_asset
106+
107+
return skill
108+
109+
110+
@pytest.fixture(name="safe_skill") # type: ignore[untyped-decorator]
111+
def _safe_skill() -> models.Skill:
112+
"""Fixture for a skill with safe resource names."""
113+
frontmatter = mock.create_autospec(models.Frontmatter, instance=True)
114+
frontmatter.name = "safe_skill"
115+
frontmatter.description = "Safe skill"
116+
frontmatter.allowed_tools = []
117+
frontmatter.model_dump.return_value = {
118+
"name": "safe_skill",
119+
"description": "Safe skill",
120+
}
121+
122+
skill = mock.create_autospec(models.Skill, instance=True)
123+
skill.name = "safe_skill"
124+
skill.description = "Safe skill"
125+
skill.instructions = "instructions"
126+
skill.frontmatter = frontmatter
127+
skill.resources = mock.MagicMock(
128+
spec=[
129+
"get_reference",
130+
"get_asset",
131+
"get_script",
132+
"list_references",
133+
"list_assets",
134+
"list_scripts",
135+
]
136+
)
137+
138+
def get_script(name: str) -> models.Script | None:
139+
if name == "run.py":
140+
return models.Script(src="print('hello')")
141+
return None
142+
143+
skill.resources.get_script.side_effect = get_script
144+
skill.resources.list_references.return_value = ["doc.md", "subdir/notes.md"]
145+
skill.resources.list_assets.return_value = ["data.csv"]
146+
skill.resources.list_scripts.return_value = ["run.py"]
147+
148+
def get_ref(name: str) -> str:
149+
return "safe content"
150+
151+
def get_asset(name: str) -> str:
152+
return "safe asset"
153+
154+
skill.resources.get_reference.side_effect = get_ref
155+
skill.resources.get_asset.side_effect = get_asset
156+
157+
return skill
158+
159+
160+
class TestBuildWrapperCodePathTraversal:
161+
"""Tests that _build_wrapper_code blocks path traversal attempts."""
162+
163+
def test_traversal_blocked_in_generated_code(
164+
self, mock_skill_with_traversal_paths: models.Skill
165+
) -> None:
166+
"""Verify that the generated wrapper code contains traversal checks."""
167+
executor = _make_mock_executor(stdout="done\n")
168+
toolset = skill_toolset.SkillToolset(
169+
[mock_skill_with_traversal_paths], code_executor=executor
170+
)
171+
172+
# Access the internal _SkillScriptCodeExecutor to test _build_wrapper_code
173+
script_executor = skill_toolset._SkillScriptCodeExecutor(
174+
mock_skill_with_traversal_paths, executor
175+
)
176+
code = script_executor._build_wrapper_code(
177+
mock_skill_with_traversal_paths, "exploit.py", None
178+
)
179+
180+
# Verify the generated code contains path traversal protection
181+
assert (
182+
"normpath" in code
183+
), "Generated code must normalize paths with os.path.normpath()"
184+
assert (
185+
"startswith('..')" in code
186+
), "Generated code must check for parent directory traversal"
187+
assert "isabs" in code, "Generated code must check for absolute paths"
188+
assert (
189+
"PermissionError" in code
190+
), "Generated code must raise PermissionError on traversal"
191+
192+
def test_safe_paths_pass_validation(self, safe_skill: models.Skill) -> None:
193+
"""Verify that legitimate paths (including subdirectories) still work."""
194+
executor = _make_mock_executor(stdout="hello\n")
195+
toolset = skill_toolset.SkillToolset([safe_skill], code_executor=executor)
196+
197+
script_executor = skill_toolset._SkillScriptCodeExecutor(
198+
safe_skill, executor
199+
)
200+
code = script_executor._build_wrapper_code(safe_skill, "run.py", None)
201+
202+
# The code should contain the safe file paths
203+
assert "doc.md" in code
204+
assert "subdir/notes.md" in code
205+
assert "data.csv" in code
206+
207+
@pytest.mark.asyncio # type: ignore[untyped-decorator]
208+
async def test_execute_with_traversal_paths_raises(
209+
self, mock_skill_with_traversal_paths: models.Skill
210+
) -> None:
211+
"""Executing a script with traversal resources should raise PermissionError."""
212+
executor = mock.create_autospec(BaseCodeExecutor, instance=True)
213+
214+
# Make executor actually run the code to verify PermissionError is raised
215+
def execute_side_effect(ctx: Any, code_input: Any) -> CodeExecutionResult:
216+
code = code_input.code
217+
try:
218+
exec(code, {"__builtins__": __builtins__})
219+
except PermissionError as e:
220+
return CodeExecutionResult(stdout="", stderr=f"PermissionError: {e}")
221+
return CodeExecutionResult(stdout="success", stderr="")
222+
223+
executor.execute_code.side_effect = execute_side_effect
224+
225+
toolset = skill_toolset.SkillToolset(
226+
[mock_skill_with_traversal_paths], code_executor=executor
227+
)
228+
tool = skill_toolset.RunSkillScriptTool(toolset)
229+
ctx = _make_tool_context_with_agent()
230+
result = await tool.run_async(
231+
args={
232+
"skill_name": "evil_skill",
233+
"file_path": "exploit.py",
234+
},
235+
tool_context=ctx,
236+
)
237+
238+
# The script should either error or the executor should receive code
239+
# that contains the traversal protection
240+
call_args = executor.execute_code.call_args
241+
code_input = call_args[0][1]
242+
assert "normpath" in code_input.code
243+
assert "PermissionError" in code_input.code
244+
245+
def test_double_dot_path_blocked(self, safe_skill: models.Skill) -> None:
246+
"""Test that ../../ paths are explicitly blocked in generated code."""
247+
executor = _make_mock_executor()
248+
249+
# Override to inject a traversal path
250+
safe_skill.resources.list_references.return_value = ["../../etc/shadow"]
251+
safe_skill.resources.get_reference.side_effect = (
252+
lambda name: "shadow content"
253+
)
254+
255+
script_executor = skill_toolset._SkillScriptCodeExecutor(
256+
safe_skill, executor
257+
)
258+
code = script_executor._build_wrapper_code(safe_skill, "run.py", None)
259+
260+
# The files dict in the generated code should contain the malicious path
261+
assert "../../etc/shadow" in code
262+
# But the validation code should block it at runtime
263+
assert "normpath" in code
264+
assert "startswith('..')" in code
265+
266+
def test_absolute_path_blocked(self, safe_skill: models.Skill) -> None:
267+
"""Test that absolute paths like /etc/passwd are blocked."""
268+
executor = _make_mock_executor()
269+
270+
safe_skill.resources.list_assets.return_value = ["/etc/passwd"]
271+
safe_skill.resources.get_asset.side_effect = lambda name: "root:x:0:0:root"
272+
273+
script_executor = skill_toolset._SkillScriptCodeExecutor(
274+
safe_skill, executor
275+
)
276+
code = script_executor._build_wrapper_code(safe_skill, "run.py", None)
277+
278+
# The validation should check for absolute paths
279+
assert "isabs" in code
280+
assert "PermissionError" in code

0 commit comments

Comments
 (0)