-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathclient.py
More file actions
100 lines (81 loc) · 3.35 KB
/
client.py
File metadata and controls
100 lines (81 loc) · 3.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# pip install google-adk google-generativeai mcp python-dotenv
import asyncio
import os
import json
# Either keep logging if we need it
import logging
from dotenv import load_dotenv
from google.genai import types
# Remove genai import if only used for configuration and that's been removed
# import google.generativeai as genai
from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters
# Load environment variables from .env file
load_dotenv()
# Enable debug logging (keep this if you want logging)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# --- Step 1: Define ADK Agent Creation ---
async def get_agent_async():
"""Creates an ADK Agent equipped with tools from the MCP Server."""
print("Attempting to connect to MCP Flight Search server...")
# Create MCPToolset instance using the correct constructor pattern
toolset = MCPToolset(
connection_params=StdioServerParameters(
command="mcp-flight-search",
args=["--connection_type", "stdio"],
env={"SERP_API_KEY": os.getenv("SERP_API_KEY")},
)
)
# Create the LlmAgent with the toolset directly
root_agent = LlmAgent(
model=os.getenv("GEMINI_MODEL", "gemini-2.0-flash"),
name='flight_search_assistant',
instruction='Help user to search for flights using available tools based on prompt. If return date not specified, use an empty string for one-way trips.',
tools=[toolset], # Pass the toolset directly, not individual tools
)
print("MCP Toolset created successfully.")
return root_agent, toolset
# --- Step 2: Main Execution Logic ---
async def async_main():
# Create services
session_service = InMemorySessionService()
# Create a session - use await for proper async handling
session = await session_service.create_session(
state={}, app_name='flight_search_app', user_id='user_flights'
)
# Define the user prompt
query = "Find flights from Atlanta to Las Vegas 2025-07-05"
print(f"User Query: '{query}'")
# Format input as types.Content
content = types.Content(role='user', parts=[types.Part(text=query)])
# Get agent and toolset
root_agent, toolset = await get_agent_async()
# Create Runner
runner = Runner(
app_name='flight_search_app',
agent=root_agent,
session_service=session_service,
)
print("Running agent...")
events_async = runner.run_async(
session_id=session.id,
user_id=session.user_id,
new_message=content
)
async for event in events_async:
print(f"Event received: {event}")
# Always clean up resources using the correct cleanup method
print("Closing MCP server connection...")
await toolset.close()
print("Cleanup complete.")
# --- Step 3: Run the Main Function ---
if __name__ == "__main__":
# Ensure the API key is set
if not os.getenv("GOOGLE_API_KEY"):
raise ValueError("GOOGLE_API_KEY environment variable not set.")
if not os.getenv("SERP_API_KEY"):
raise ValueError("SERP_API_KEY environment variable not set.")
# Run the main async function
asyncio.run(async_main())