forked from elastic/observability-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent.py
More file actions
90 lines (77 loc) · 2.73 KB
/
agent.py
File metadata and controls
90 lines (77 loc) · 2.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# run like this: uv run --exact -q --env-file .env agent.py
# /// script
# dependencies = [
# "openai-agents",
# "httpx",
# "mcp",
# "elastic-opentelemetry",
# "openinference-instrumentation-openai-agents",
# "opentelemetry-instrumentation-httpx",
# "openinference-instrumentation-mcp",
# ]
# ///
# ruff: noqa: E402
from opentelemetry.instrumentation import auto_instrumentation
# This must precede any other imports you want to instrument!
auto_instrumentation.initialize()
import argparse
import asyncio
import os
from datetime import datetime, timedelta
from agents import (
Agent,
HostedMCPTool,
OpenAIProvider,
RunConfig,
Runner,
Tool,
)
from agents.mcp import MCPServerStreamableHttp, MCPUtil
from openai.types.responses.tool_param import Mcp
async def run_agent(tools: list[Tool], model_name: str, use_responses: bool):
model = OpenAIProvider(use_responses=use_responses).get_model(model_name)
agent = Agent(
name="flight-search-agent",
model=model,
tools=tools,
)
next_week = (datetime.now() + timedelta(weeks=1)).strftime("%Y-%m-%d")
result = await Runner.run(
starting_agent=agent,
input=f"Give me the best flight from New York to Kota Kinabalu on {next_week}",
run_config=RunConfig(workflow_name="flight search"),
)
print(result.final_output)
async def main():
parser = argparse.ArgumentParser(description="MCP-enabled flight search agent")
parser.add_argument("--use-responses-api", action="store_true", help="Use Responses API instead of Agents")
args = parser.parse_args()
model_name = os.getenv("AGENT_MODEL", "gpt-5-nano")
mcp_url = os.getenv("MCP_URL", "https://mcp.kiwi.com")
mcp_headers = dict(h.split("=", 1) for h in os.getenv("MCP_HEADERS", "").split(",") if h)
if args.use_responses_api:
# Server-side MCP via Responses API
tools = [
HostedMCPTool(
tool_config=Mcp(
type="mcp",
server_url=mcp_url,
server_label="kiwi-flights",
headers=mcp_headers,
require_approval="never",
)
)
]
await run_agent(tools, model_name, use_responses=True)
return
# Client-side MCP orchestration
async with MCPServerStreamableHttp(
{"url": mcp_url, "headers": mcp_headers, "timeout": 30.0},
client_session_timeout_seconds=60.0,
) as server:
tools = await server.list_tools()
util = MCPUtil()
tools = [util.to_function_tool(tool, server, False) for tool in tools]
await run_agent(tools, model_name, use_responses=False)
if __name__ == "__main__":
asyncio.run(main())