Skip to content

Commit 1647423

Browse files
FEAT: Enable MCP client tool for Agent (#1678)
Signed-off-by: lvliang-intel <liang1.lv@intel.com>
1 parent 2583fd0 commit 1647423

7 files changed

Lines changed: 698 additions & 0 deletions

File tree

comps/cores/mcp/README.md

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# OPEA MCP Tool
2+
3+
The **OPEA MCP Tool** is a client tool designed to facilitate seamless integration between agents and MCP (Model Context Protocol) servers. It provides a unified interface for managing MCP clients, enabling agents to access and interact with various tools and data sources exposed by MCP servers.
4+
5+
---
6+
7+
## **OPEA MCP Tool Overview**
8+
9+
The **OPEA MCP Tool** provides a unified interface for managing MCP clients and interacting with tools exposed by MCP servers. It supports both **SSE (Server-Sent Events)** and **Stdio** server configurations, making it flexible for various use cases.
10+
11+
### **Features**
12+
13+
- **Dynamic Tool Registration**: Automatically registers tools exposed by MCP servers for natural invocation.
14+
- **Asynchronous Operations**: Fully asynchronous API for efficient integration with modern Python applications.
15+
- **Context Management**: Supports Python's `async with` syntax for automatic resource management.
16+
- **Error Handling**: Robust error handling for client initialization, tool execution, and disconnection.
17+
18+
---
19+
20+
## **API Usage**
21+
22+
### Initialization
23+
24+
To initialize the OpeaMCPToolsManager, provide an OpeaMCPConfig object containing the server configurations:
25+
26+
```python
27+
from comps.cores.mcp.config import OpeaMCPConfig, OpeaMCPSSEServerConfig, OpeaMCPStdioServerConfig
28+
from comps.cores.mcp.manager import OpeaMCPToolsManager
29+
30+
config = OpeaMCPConfig(
31+
sse_servers=[
32+
OpeaMCPSSEServerConfig(url="http://sse-server-1.com", api_key="your_api_key"),
33+
],
34+
stdio_servers=[
35+
OpeaMCPStdioServerConfig(name="stdio-server-1", command="python", args=["tool.py"]),
36+
],
37+
)
38+
39+
manager = await OpeaMCPToolsManager.create(config)
40+
```
41+
42+
### Tool Execution
43+
44+
Once initialized, you can execute tools exposed by MCP servers using the execute_tool method:
45+
46+
```python
47+
result = await manager.execute_tool("tool_name", {"param1": "value1", "param2": "value2"})
48+
print(result)
49+
```
50+
51+
### Context Management
52+
53+
The OpeaMCPToolsManager supports Python's async with syntax for automatic resource management:
54+
55+
```python
56+
async with await OpeaMCPToolsManager.create(config) as manager:
57+
result = await manager.execute_tool("tool_name", {"param1": "value1"})
58+
print(result)
59+
```
60+
61+
### Dynamic Tool Invocation
62+
63+
Tools are dynamically registered as methods of the manager, allowing for natural invocation:
64+
65+
```python
66+
async with OpeaMCPToolsManager.create(config) as manager:
67+
result = await manager.tool_name(param1="value1", param2="value2")
68+
print(result)
69+
```
70+
71+
## **Examples**
72+
73+
### **Launch a SSE MCP Server**
74+
75+
To launch an SSE MCP server using Playwright, run the following command:
76+
77+
```bash
78+
npx @playwright/mcp@latest --port 8931
79+
```
80+
81+
### **Launch a Stdio MCP Server**
82+
83+
To launch a simple Stdio MCP server, follow these steps:
84+
85+
```bash
86+
git clone https://github.com/modelcontextprotocol/python-sdk.git
87+
cd python-sdk/examples/servers/simple-tool/mcp_simple_tool
88+
uv run mcp-simple-tool
89+
```
90+
91+
### **Run the MCP Client**
92+
93+
The following example demonstrates how to connect to both SSE and Stdio MCP servers and execute tools:
94+
95+
```python
96+
import asyncio
97+
from comps.cores.mcp.config import OpeaMCPConfig, OpeaMCPSSEServerConfig, OpeaMCPStdioServerConfig
98+
from comps.cores.mcp.manager import OpeaMCPToolsManager
99+
100+
101+
async def main():
102+
config = OpeaMCPConfig(
103+
sse_servers=[
104+
OpeaMCPSSEServerConfig(url="http://localhost:8931/sse"),
105+
],
106+
stdio_servers=[
107+
OpeaMCPStdioServerConfig(name="mcp-simple-tool", command="uv", args=["run", "mcp-simple-tool"]),
108+
],
109+
)
110+
111+
async with await OpeaMCPToolsManager.create(config) as manager:
112+
# Execute tools exposed by the servers
113+
result = await manager.execute_tool("browser_snapshot", {})
114+
print(result)
115+
116+
result = await manager.execute_tool("fetch", {"url": "https://opea.dev/"})
117+
print(result)
118+
119+
120+
# Run the async function
121+
asyncio.run(main())
122+
```

comps/cores/mcp/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Copyright (C) 2025 Intel Corporation
2+
# SPDX-License-Identifier: Apache-2.0

comps/cores/mcp/client.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
# Copyright (C) 2025 Intel Corporation
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
import asyncio
5+
import os
6+
from contextlib import AsyncExitStack
7+
from typing import List, Optional
8+
9+
from mcp import ClientSession, StdioServerParameters
10+
from mcp.client.sse import sse_client
11+
from mcp.client.stdio import stdio_client
12+
from pydantic import BaseModel, Field
13+
14+
from comps import CustomLogger
15+
from comps.cores.mcp.tool import OpeaMCPClientTool
16+
17+
logger = CustomLogger("comps-mcp-client")
18+
log_flag = os.getenv("LOGFLAG", False)
19+
20+
21+
class OpeaMCPClient(BaseModel):
22+
"""A client for interacting with MCP servers, managing tools, and handling server communication."""
23+
24+
description: str = "MCP client for server interaction and tool management"
25+
session: Optional[ClientSession] = None
26+
exit_stack: AsyncExitStack = AsyncExitStack()
27+
28+
tools: List[OpeaMCPClientTool] = Field(default_factory=list)
29+
tool_registry: dict[str, OpeaMCPClientTool] = Field(default_factory=dict)
30+
31+
class Config:
32+
arbitrary_types_allowed = True
33+
34+
async def connect_via_sse(self, server_url: str, api_key: Optional[str] = None, timeout: float = 30.0) -> None:
35+
"""Establish a connection to an MCP server using SSE (Server-Sent Events) transport.
36+
37+
Args:
38+
server_url: The URL of the SSE server to connect to.
39+
api_key: Optional API key for authentication.
40+
timeout: Connection timeout in seconds. Default is 30 seconds.
41+
42+
Raises:
43+
ValueError: If the server URL is not provided.
44+
asyncio.TimeoutError: If the connection times out.
45+
Exception: For other connection errors.
46+
"""
47+
if not server_url:
48+
raise ValueError("Server URL is required.")
49+
if self.session:
50+
await self.disconnect()
51+
52+
try:
53+
54+
async def connect_with_timeout():
55+
streams_context = sse_client(
56+
url=server_url,
57+
headers={"Authorization": f"Bearer {api_key}"} if api_key else None,
58+
timeout=timeout,
59+
)
60+
streams = await self.exit_stack.enter_async_context(streams_context)
61+
self.session = await self.exit_stack.enter_async_context(ClientSession(*streams))
62+
await self._initialize_tools()
63+
64+
await asyncio.wait_for(connect_with_timeout(), timeout=timeout)
65+
except asyncio.TimeoutError:
66+
logger.error(f"Connection to {server_url} timed out after {timeout} seconds")
67+
await self.disconnect()
68+
raise
69+
except Exception as e:
70+
logger.error(f"Error connecting to {server_url}: {str(e)}")
71+
await self.disconnect()
72+
raise
73+
74+
async def connect_via_stdio(self, command: str, args: List[str]) -> None:
75+
"""Establish a connection to an MCP server using stdio (standard input/output) transport.
76+
77+
Args:
78+
command: The command to start the server.
79+
args: A list of arguments for the command.
80+
81+
Raises:
82+
ValueError: If the command is not provided.
83+
Exception: For other connection errors.
84+
"""
85+
if not command:
86+
raise ValueError("Server command is required.")
87+
if self.session:
88+
await self.disconnect()
89+
90+
try:
91+
server_params = StdioServerParameters(command=command, args=args)
92+
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
93+
read, write = stdio_transport
94+
self.session = await self.exit_stack.enter_async_context(ClientSession(read, write))
95+
96+
await self._initialize_tools()
97+
except Exception as e:
98+
logger.error(f"Error connecting to {command}: {str(e)}")
99+
await self.disconnect()
100+
raise
101+
102+
async def _initialize_tools(self) -> None:
103+
"""Initialize the client session and populate the tool registry with available tools.
104+
105+
Raises:
106+
RuntimeError: If the session is not initialized.
107+
"""
108+
if not self.session:
109+
raise RuntimeError("Session not initialized.")
110+
111+
await self.session.initialize()
112+
response = await self.session.list_tools()
113+
114+
# Clear existing tools
115+
self.tools = []
116+
self.tool_registry = {}
117+
118+
# Populate tools and registry
119+
for tool in response.tools:
120+
client_tool = OpeaMCPClientTool(
121+
name=tool.name,
122+
description=tool.description,
123+
inputSchema=tool.inputSchema,
124+
session=self.session,
125+
)
126+
self.tool_registry[tool.name] = client_tool
127+
self.tools.append(client_tool)
128+
129+
logger.info(f"Connected to server with tools: {[tool.name for tool in response.tools]}")
130+
131+
async def invoke_tool(self, tool_name: str, parameters: dict):
132+
"""Invoke a tool on the MCP server.
133+
134+
Args:
135+
tool_name: The name of the tool to invoke.
136+
parameters: The parameters to pass to the tool.
137+
138+
Returns:
139+
The result of the tool invocation.
140+
141+
Raises:
142+
ValueError: If the tool is not found in the registry.
143+
RuntimeError: If the client session is not available.
144+
"""
145+
if tool_name not in self.tool_registry:
146+
raise ValueError(f"Tool '{tool_name}' not found in the registry.")
147+
if not self.session:
148+
raise RuntimeError("Client session is not available.")
149+
150+
return await self.session.call_tool(name=tool_name, arguments=parameters)
151+
152+
async def disconnect(self) -> None:
153+
"""Disconnect from the MCP server and clean up resources."""
154+
if self.session:
155+
try:
156+
if hasattr(self.session, "close"):
157+
await self.session.close()
158+
await self.exit_stack.aclose()
159+
except Exception as e:
160+
logger.error(f"Error during disconnect: {str(e)}")
161+
finally:
162+
self.session = None
163+
self.tools = []
164+
self.tool_registry = {}
165+
logger.info("Disconnected from MCP server")

comps/cores/mcp/config.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Copyright (C) 2025 Intel Corporation
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from urllib.parse import urlparse
5+
6+
from pydantic import BaseModel, Field
7+
8+
9+
class OpeaMCPSSEServerConfig(BaseModel):
10+
"""Configuration for a single MCP server.
11+
12+
Attributes:
13+
url: The server URL
14+
api_key: Optional API key for authentication
15+
"""
16+
17+
url: str
18+
api_key: str | None = None
19+
20+
21+
class OpeaMCPStdioServerConfig(BaseModel):
22+
"""Configuration for a MCP (Model Context Protocol) server that uses stdio.
23+
24+
Attributes:
25+
name: The name of the server
26+
command: The command to run the server
27+
args: The arguments to pass to the server
28+
env: The environment variables to set for the server
29+
"""
30+
31+
name: str
32+
command: str
33+
args: list[str] = Field(default_factory=list)
34+
env: dict[str, str] = Field(default_factory=dict)
35+
36+
37+
class OpeaMCPConfig(BaseModel):
38+
"""Configuration for MCP (Model Context Protocol) settings.
39+
40+
Attributes:
41+
sse_servers: List of MCP SSE server configs
42+
stdio_servers: List of MCP stdio server configs. These servers will be added to the MCP Router running inside runtime container.
43+
"""
44+
45+
sse_servers: list[OpeaMCPSSEServerConfig] = Field(default_factory=list)
46+
stdio_servers: list[OpeaMCPStdioServerConfig] = Field(default_factory=list)
47+
48+
def validate_servers(self) -> None:
49+
"""Validate that server URLs are valid and unique."""
50+
urls = [server.url for server in self.sse_servers]
51+
52+
# Check for duplicate server URLs
53+
if len(set(urls)) != len(urls):
54+
raise ValueError("Duplicate MCP server URLs are not allowed")
55+
56+
# Validate URLs
57+
for url in urls:
58+
try:
59+
result = urlparse(url)
60+
if not all([result.scheme, result.netloc]):
61+
raise ValueError(f"Invalid URL format: {url}")
62+
except Exception as e:
63+
raise ValueError(f"Invalid URL {url}: {str(e)}")

0 commit comments

Comments
 (0)