-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfile_operations.py
More file actions
78 lines (62 loc) · 2.41 KB
/
file_operations.py
File metadata and controls
78 lines (62 loc) · 2.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
"""Example: Write files, run Python, and read outputs in the sandbox.
Uses Leap0 filesystem tools and the Python code interpreter (no shell tool).
Requirements:
- LEAP0_API_KEY
- OPENROUTER_API_KEY (Gemini via OpenRouter; see examples/README.md)
- pip install 'leap0-adk[openrouter]'
"""
from __future__ import annotations
import asyncio
import sys
from pathlib import Path
from google.adk.agents import Agent
from google.adk.runners import InMemoryRunner
from leap0_adk import Leap0Plugin
_examples_root = Path(__file__).resolve().parent
if str(_examples_root) not in sys.path:
sys.path.insert(0, str(_examples_root))
from common import openrouter_gemini_llm # noqa: E402
async def main() -> None:
"""Write analysis code as a file, run Python, read outputs."""
plugin = Leap0Plugin()
try:
agent = Agent(
model=openrouter_gemini_llm(),
name="file_operations_agent",
tools=plugin.get_tools(),
)
async with InMemoryRunner(
app_name="leap0_file_operations_example",
agent=agent,
plugins=[plugin],
) as runner:
print("\n" + "=" * 60)
print("Python file + interpreter")
print("=" * 60)
response = await runner.run_debug(
"""Use leap0_fs_write to create /tmp/analysis.py with this exact Python source code:
import sys
import os
with open("/tmp/analysis_output.txt", "w") as f:
f.write(f"Python Version: {sys.version}\\n")
f.write(f"Platform: {sys.platform}\\n")
f.write("Analysis complete!\\n")
print("Analysis script defined")
Then use execute_code_in_leap0 to run: exec(open("/tmp/analysis.py").read())
Then use leap0_fs_read to show the contents of /tmp/analysis_output.txt"""
)
print(f"Response: {response}\n")
print("\n" + "=" * 60)
print("Report via write + execute + read")
print("=" * 60)
response = await runner.run_debug(
"""Write UTF-8 text to /tmp/report.txt with a short "system report" (hostname line, date line).
Then run Python that reads /tmp/report.txt and prints it to stdout.
Then read /tmp/report.txt with leap0_fs_read and summarize."""
)
print(f"Response: {response}\n")
print("\nFile operations example finished.")
finally:
await plugin.close()
if __name__ == "__main__":
asyncio.run(main())