-
Notifications
You must be signed in to change notification settings - Fork 549
Expand file tree
/
Copy pathinteractive_cli.py
More file actions
197 lines (166 loc) · 6.02 KB
/
Copy pathinteractive_cli.py
File metadata and controls
197 lines (166 loc) · 6.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
r"""Example demonstrating end-to-end flow with LocalConnection.
This example shows how to:
1. Define a custom Python tool and register it with a ToolRunner.
2. Connect an MCP server (pirate math tools) via McpBridge.
3. Configure hook-based tool approval policy with CLI interaction.
4. Start a LocalConnectionStrategy that launches the LocalConnection binary.
5. Run an interactive conversation loop using the Conversation API.
To run:
python local_connection_example.py
Tip: Pass --alsologtostderr to see execution steps in detail.
"""
import asyncio
from collections.abc import Sequence
import os
import sys
from absl import app
from absl import flags
from absl import logging
from google.antigravity import Agent
from google.antigravity import LocalAgentConfig
from google.antigravity import types
from google.antigravity.hooks import policy
from google.antigravity.utils import interactive
from google.antigravity.utils.interactive import async_input
_MODEL_NAME = flags.DEFINE_string(
"model_name", "gemini-3.5-flash", "Gemini model name."
)
_SYSTEM_INSTRUCTION = flags.DEFINE_string(
"system_instruction", None, "System instruction text for the agent."
)
_DISABLE_RUN_COMMAND = flags.DEFINE_bool(
"disable_run_command",
False,
"Whether to disable the run_command tool.",
)
_SHOW_USAGE = flags.DEFINE_bool(
"show_usage",
False,
"Whether to display token usage and trajectory after each turn.",
)
def read_file_upside_down(path: str) -> str:
"""Reads the file at the given path and returns its content with lines inverted.
Args:
path: The path to the file to read.
Returns:
The file content with lines in reverse order.
"""
logging.info("Tool read_file_upside_down called with path: %s", path)
with open(path, "r") as f:
lines = f.readlines()
return "".join(reversed(lines))
def _add(cur: int | None, val: int | None) -> int | None:
"""Adds two nullable ints, preserving None when both are absent."""
if val is None:
return cur
return (cur or 0) + val
def _print_telemetry(
turn_usage: types.UsageMetadata | None,
cumul: types.UsageMetadata,
history: list[types.Step],
) -> None:
"""Prints telemetry data for the current turn."""
print("\n--- Turn Token Usage ---")
if turn_usage:
print(f" Prompt tokens: {turn_usage.prompt_token_count}")
print(f" Cached tokens: {turn_usage.cached_content_token_count}")
print(f" Output tokens: {turn_usage.candidates_token_count}")
print(f" Thinking tokens: {turn_usage.thoughts_token_count}")
print(f" Total tokens: {turn_usage.total_token_count}")
else:
print(" Usage data not available for this turn.")
# Cumulative session usage.
print("\n--- Session Cumulative Usage ---")
print(f" Prompt tokens: {cumul.prompt_token_count}")
print(f" Cached tokens: {cumul.cached_content_token_count}")
print(f" Output tokens: {cumul.candidates_token_count}")
print(f" Thinking tokens: {cumul.thoughts_token_count}")
print(f" Total tokens: {cumul.total_token_count}")
# Trajectory summary.
print(f"\n--- Trajectory ({len(history)} steps) ---")
for i, s in enumerate(history):
label = f" [{i}] {s.type.value} ({s.source.value}) - {s.status.value}"
if s.tool_calls:
names = ", ".join(tc.name for tc in s.tool_calls)
label += f" [{names}]"
print(label)
print()
async def run():
"""Runs the interactive CLI loop for the Google Antigravity SDK."""
mcp_server_path = os.path.join(
os.path.dirname(__file__), "..", "resources", "mcp_server.py"
)
mcp_server = types.McpStdioServer(
name="pirate_math",
command="python3",
args=[mcp_server_path, "--transport=stdio"],
)
gemini_config = types.GeminiConfig(
models=types.ModelConfig(
default=types.ModelEntry(name=_MODEL_NAME.value),
)
)
disabled_tools = (
[types.BuiltinTools.RUN_COMMAND] if _DISABLE_RUN_COMMAND.value else None
)
config = LocalAgentConfig(
tools=[read_file_upside_down],
mcp_servers=[mcp_server],
policies=[policy.ask_user("*", handler=interactive.ask_user_handler)],
hooks=[interactive.AskQuestionHook()],
capabilities=types.CapabilitiesConfig(
disabled_tools=disabled_tools,
),
gemini_config=gemini_config,
system_instructions=_SYSTEM_INSTRUCTION.value,
)
async with Agent(config) as agent:
print("\nGoogle Antigravity SDK Demo")
print("Type your message and press Enter • Ctrl+C to exit\n")
while True:
try:
user_input = await async_input("\n→ ")
user_input = user_input.strip()
if not user_input:
continue
if user_input.lower() in ("exit", "quit"):
print("\nGoodbye! 👋")
break
response = await agent.chat(user_input)
# Stream the response to stdout
async for chunk in response:
sys.stdout.write(chunk)
sys.stdout.flush()
print()
if _SHOW_USAGE.value:
_print_telemetry(
response.usage_metadata,
agent.conversation.total_usage,
agent.conversation.history,
)
except (KeyboardInterrupt, asyncio.CancelledError, EOFError):
print("\nGoodbye! 👋")
break
def main(argv: Sequence[str]) -> None:
"""Entry point for the interactive CLI example.
Args:
argv: List of command-line arguments.
"""
del argv
logging.set_verbosity(logging.INFO)
asyncio.run(run())
if __name__ == "__main__":
app.run(main)