Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions examples/grpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# gRPC Transport Example for MCP

This example demonstrates how to implement a custom gRPC transport for the Model Context Protocol (MCP) Python SDK.

## Overview

- `mcp.proto`: Defines the gRPC service service mirroring the MCP protocol.
- `server.py`: Implements a gRPC server that bridges requests to the `mcp.server.Server`.
- `client.py`: A gRPC client that calls the server to list resources.

## Prerequisites

You need Python 3.10+ and the `grpcio-tools` package.

## Setup & Running

### Files

- `mcp.proto`: The Protobuf definition for the MCP Service.
- `grpc_server.py`: A reusable gRPC Servicer adapter that wraps any MCP `Server` or `FastMCP` instance.
- `server.py`: An example user application that defines tools/resources using `FastMCP` and runs them over gRPC using the adapter.
- `client.py`: A gRPC client that calls the server's methods (ListResources, ListTools, CallTool).
- `run_server.sh`: Helper script to install dependencies, generate code, and run the server.
- `run_client.sh`: Helper script to run the client.

### How to Run

1. **Start the Server**:
```bash
./examples/grpc/run_server.sh
```
This script handles virtual environment creation, dependency installation, Protobuf code generation, and starting the server on port 50051.

2. **Run the Client** (in a new terminal):
```bash
./examples/grpc/run_client.sh
```
You should see the client list resources, list tools (including the `add` tool), and successfully call the `add` tool to get the result `30`.

### Implementation Details

- **`GrpcMcpService` Adapter**: This class acts as a bridge. It accepts an `mcp.server.Server` instance. It starts the MCP server loop in the background using `anyio` memory streams.
- **Initialization Handshake**: The adapter automatically handles the MCP initialization sequence (`initialize` -> `notifications/initialized`) so the server is ready to accept requests immediately.
- **Message Mapping**: The adapter converts between gRPC messages (defined in `mcp.proto`) and the internal JSON-RPC messages used by the MCP SDK. It handles `CallTool` content type mapping (Text vs Image).
- **`FastMCP` Integration**: The example `server.py` uses `FastMCP` to define tools and resources, demonstrating how the `GrpcMcpService` adapter can wrap any `mcp.server.Server` instance.
- **Asynchronous Handling**: The adapter uses `anyio` to manage asynchronous operations, allowing it to bridge the gRPC stream interface with the MCP SDK's consume/produce stream interface. It wraps `JSONRPCRequest` objects into `SessionMessage` to be consumed by the SDK's `ServerSession`.

> **Note**: This is an experimental example.
56 changes: 56 additions & 0 deletions examples/grpc/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import logging
import asyncio
import grpc
import sys
import os

# Import generated classes
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

import mcp_pb2
import mcp_pb2_grpc

from mcp.client.session import ClientSession
from examples.grpc.grpc_client import grpc_client_channel

async def run():
print("Connecting to gRPC server...")
async with grpc.aio.insecure_channel('localhost:50051') as channel:
# Use our custom gRPC transport adapter
async with grpc_client_channel(channel) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
print("Initializing...")
await session.initialize()

print("\nCalling ListResources...")
resources = await session.list_resources()
print("Resources received:")
for res in resources.resources:
print(f"- Name: {res.name}")
print(f" URI: {res.uri}")
print(f" MimeType: {res.mimeType}")

print("\nCalling ListTools...")
tools = await session.list_tools()
print("Tools received:")
for tool in tools.tools:
print(f"- Name: {tool.name}")
print(f" Description: {tool.description}")

print("\nCalling CallTool 'add' with a=10, b=20...")
result = await session.call_tool("add", arguments={"a": 10, "b": 20})
print("Tool Response:")
for content in result.content:
if content.type == "text":
print(f" Type: text")
print(f" Text: {content.text}")
elif content.type == "image":
print(f" Type: image")
print(f" Data: {len(content.data)} bytes")

if __name__ == '__main__':
logging.basicConfig()
try:
asyncio.run(run())
except KeyboardInterrupt:
pass
141 changes: 141 additions & 0 deletions examples/grpc/grpc_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@

import asyncio
import logging
from contextlib import asynccontextmanager
import anyio
import grpc
import mcp.types as types
from mcp.shared.message import SessionMessage
from mcp.client.session import ClientSession

# Import generated gRPC code
try:
import mcp_pb2
import mcp_pb2_grpc
except ImportError:
import examples.grpc.mcp_pb2 as mcp_pb2
import examples.grpc.mcp_pb2_grpc as mcp_pb2_grpc

logger = logging.getLogger("grpc_client_transport")

@asynccontextmanager
async def grpc_client_channel(channel):
"""
A transport context manager for MCP ClientSession that runs over a gRPC channel.
Yields (read_stream, write_stream).
"""
stub = mcp_pb2_grpc.McpStub(channel)

# Streams for ClientSession
read_stream_writer, read_stream = anyio.create_memory_object_stream(100)
write_stream, write_stream_reader = anyio.create_memory_object_stream(100)

async def process_outgoing_messages():
"""Reads JSON-RPC messages from ClientSession and sends them via gRPC."""
async with write_stream_reader:
async for message in write_stream_reader:
# message is SessionMessage -> JSONRPCMessage
try:
json_msg = message.message
if isinstance(json_msg.root, types.JSONRPCRequest):
req = json_msg.root
logger.debug(f"Sending request: {req.method} ID: {req.id}")
await handle_request(req, stub, read_stream_writer)
elif isinstance(json_msg.root, types.JSONRPCNotification):
# Handle notifications (e.g. initialized)
logger.debug(f"Sending notification: {json_msg.root.method}")
# Provide dummy/no-op implementation for notifications if not critical
pass
except Exception as e:
logger.error(f"Error processing outgoing message: {e}", exc_info=True)

async def handle_request(req, stub, result_writer):
"""Dispatches JSON-RPC requests to appropriate gRPC methods."""
response_result = None
error = None

try:
if req.method == "initialize":
# Fake initialize response because gRPC transport doesn't need handshake
# OR forward to server if server supports generic method (it doesn't in our proto)
# But our GridServer expects initialization?
# Actually, our `GrpcMcpService` adapter allows one-way bridge.
# BUT `ClientSession` expects full protocol.
# In this specific gRPC example, we defined Typed methods.
# So we simulate the handshake locally or map to something?
# The MCP SDK Client REQUIRES initialize.
# Let's return a synthetic response.
response_result = types.InitializeResult(
protocolVersion="2024-11-05",
capabilities=types.ServerCapabilities(),
serverInfo=types.Implementation(name="grpc-server", version="1.0")
).model_dump(by_alias=True)

elif req.method == "resources/list":
grpc_resp = await stub.ListResources(mcp_pb2.ListResourcesRequest())
# Map gRPC response to JSON-RPC result
resources = []
for r in grpc_resp.resources:
resources.append(types.Resource(
uri=r.uri, name=r.name, mimeType=r.mime_type
))
response_result = types.ListResourcesResult(resources=resources).model_dump(by_alias=True)

elif req.method == "tools/list":
grpc_resp = await stub.ListTools(mcp_pb2.ListToolsRequest())
tools = []
for t in grpc_resp.tools:
tools.append(types.Tool(
name=t.name, description=t.description, inputSchema={} # Simplified
))
response_result = types.ListToolsResult(tools=tools).model_dump(by_alias=True)

elif req.method == "tools/call":
# Map params to gRPC
params = req.params # dict
name = params.get("name")
args = params.get("arguments", {})

from google.protobuf import struct_pb2
args_struct = struct_pb2.Struct()
args_struct.update(args)

req_proto = mcp_pb2.CallToolRequest.Request(name=name, arguments=args_struct)

# Consume stream
content = []
is_error = False
async for chunk in stub.CallTool(mcp_pb2.CallToolRequest(request=req_proto)):
is_error = chunk.is_error
for c in chunk.content:
if c.HasField("text"):
content.append(types.TextContent(type="text", text=c.text.text))
elif c.HasField("image"):
content.append(types.ImageContent(type="image", data=c.image.data, mimeType=c.image.mime_type))

response_result = types.CallToolResult(content=content, isError=is_error).model_dump(by_alias=True)

else:
raise ValueError(f"Method not supported by gRPC transport: {req.method}")

# Send successful response back to ClientSession
resp = types.JSONRPCResponse(
jsonrpc="2.0",
id=req.id,
result=response_result
)
await result_writer.send(SessionMessage(types.JSONRPCMessage(root=resp)))

except Exception as e:
logger.error(f"RPC failed: {e}")
err_resp = types.JSONRPCError(
jsonrpc="2.0",
id=req.id,
error=types.ErrorData(code=-32603, message=str(e))
)
await result_writer.send(SessionMessage(types.JSONRPCMessage(root=err_resp)))

async with anyio.create_task_group() as tg:
tg.start_soon(process_outgoing_messages)
yield read_stream, write_stream
tg.cancel_scope.cancel()
Loading
Loading