forked from lastmile-ai/mcp-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
116 lines (93 loc) · 4.1 KB
/
Copy pathmain.py
File metadata and controls
116 lines (93 loc) · 4.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import asyncio
from pathlib import Path
from mcp_agent.app import MCPApp
from mcp_agent.logging.logger import get_logger
from mcp_agent.mcp.mcp_aggregator import MCPAggregator
from rich import print
app = MCPApp(name="mcp_server_aggregator")
async def example_usage_persistent():
context = app.context
logger = get_logger("mcp_server_aggregator.example_usage_persistent")
logger.info("Hello, world! Let's create an MCP aggregator (server-of-servers)...")
logger.info("Current config:", data=context.config)
# Create an MCP aggregator that connects to the fetch and filesystem servers
aggregator = None
try:
aggregator = await MCPAggregator.create(
server_names=["fetch", "filesystem"],
connection_persistence=True, # By default connections are torn down after each call
)
# Call list_tools on the aggregator, which will search all servers for the tool
logger.info("Aggregator: Calling list_tools...")
result = await aggregator.list_tools()
logger.info("Tools available:", data=result)
# Call read_file on the aggregator, which will search all servers for the tool
result = await aggregator.call_tool(
name="read_file",
arguments={"path": str(Path.cwd() / "README.md")},
)
logger.info("read_file result:", data=result)
# Call fetch.fetch on the aggregator
# (i.e. server-namespacing -- fetch is the servername, which exposes fetch tool)
result = await aggregator.call_tool(
name="fetch_fetch",
arguments={"url": "https://jsonplaceholder.typicode.com/todos/1"},
)
logger.info("fetch result:", data=result)
except Exception as e:
logger.error("Error in example_usage_persistent:", data=e)
finally:
logger.info("Closing all server connections on aggregator...")
await aggregator.close()
async def example_usage():
logger = get_logger("mcp_server_aggregator.example_usage")
context = app.context
logger.info("Hello, world! Let's create an MCP aggregator (server-of-servers)...")
logger.info("Current config:", data=context.config)
# Create an MCP aggregator that connects to the fetch and filesystem servers
aggregator = None
try:
aggregator = await MCPAggregator.create(
server_names=["fetch", "filesystem"],
connection_persistence=False,
)
# Call list_tools on the aggregator, which will search all servers for the tool
logger.info("Aggregator: Calling list_tools...")
result = await aggregator.list_tools()
logger.info("Tools available:", data=result)
# Call read_file on the aggregator, which will search all servers for the tool
result = await aggregator.call_tool(
name="read_file",
arguments={"path": str(Path.cwd() / "README.md")},
)
logger.info("read_file result:", data=result)
# Call fetch.fetch on the aggregator
# (i.e. server-namespacing -- fetch is the servername, which exposes fetch tool)
result = await aggregator.call_tool(
name="fetch_fetch",
arguments={"url": "https://jsonplaceholder.typicode.com/todos/1"},
)
logger.info(f"fetch result: {str(result)}")
except Exception as e:
logger.error("Error in example_usage:", data=e)
finally:
logger.info("Closing all server connections on aggregator...")
await aggregator.close()
if __name__ == "__main__":
import time
async def main():
try:
await app.initialize()
start = time.time()
await example_usage_persistent()
end = time.time()
persistent_time = end - start
start = time.time()
await example_usage()
end = time.time()
non_persistent_time = end - start
print(f"\nPersistent connection time: {persistent_time:.2f}s")
print(f"\nNon-persistent connection time: {non_persistent_time:.2f}s")
finally:
await app.cleanup()
asyncio.run(main())