This document provides comprehensive API reference for the Model Context Protocol (MCP) implementation in the Multi-Agent-Intelligent-Warehouse. The MCP system provides standardized interfaces for tool discovery, execution, and communication between AI agents and external systems.
- MCP Server API
- MCP Client API
- Tool Discovery API
- Tool Binding API
- Tool Routing API
- Tool Validation API
- Service Discovery API
- Monitoring API
- Adapter API
- Error Handling
The MCP server provides tool registration, discovery, and execution capabilities.
Initialize the MCP server.
Parameters:
config(Optional[MCPServerConfig]): Server configuration
Example:
from src.api.services.mcp.server import MCPServer, MCPServerConfig
config = MCPServerConfig(
host="localhost",
port=8000,
max_connections=100
)
server = MCPServer(config)Start the MCP server.
Example:
await server.start()Stop the MCP server.
Example:
await server.stop()Register a tool with the server.
Parameters:
tool(MCPTool): Tool to register
Returns:
bool: True if registration successful
Example:
tool = MCPTool(
name="get_inventory",
description="Get inventory levels",
tool_type=MCPToolType.FUNCTION,
parameters={
"item_id": {"type": "string", "required": True}
},
handler=inventory_handler
)
success = await server.register_tool(tool)Unregister a tool from the server.
Parameters:
tool_name(str): Name of tool to unregister
Returns:
bool: True if unregistration successful
Example:
success = await server.unregister_tool("get_inventory")Discover available tools.
Parameters:
category(Optional[str]): Tool category filter
Returns:
List[MCPTool]: List of available tools
Example:
# Get all tools
tools = await server.discover_tools()
# Get tools by category
inventory_tools = await server.discover_tools("inventory")Execute a tool with given arguments.
Parameters:
tool_name(str): Name of tool to executearguments(Dict[str, Any]): Tool arguments
Returns:
MCPToolResult: Tool execution result
Example:
result = await server.execute_tool("get_inventory", {
"item_id": "ITEM001",
"location": "WAREHOUSE_A"
})
if result.success:
print(f"Result: {result.data}")
else:
print(f"Error: {result.error}")Get information about a specific tool.
Parameters:
tool_name(str): Name of tool
Returns:
Optional[MCPTool]: Tool information or None
Example:
tool_info = await server.get_tool_info("get_inventory")
if tool_info:
print(f"Tool: {tool_info.name}")
print(f"Description: {tool_info.description}")Get server status and statistics.
Returns:
Dict[str, Any]: Server status information
Example:
status = await server.get_server_status()
print(f"Server running: {status['running']}")
print(f"Tools registered: {status['tools_count']}")
print(f"Active connections: {status['active_connections']}")Tool definition class.
name(str): Tool namedescription(str): Tool descriptiontool_type(MCPToolType): Tool typeparameters(Dict[str, Any]): Tool parameters schemahandler(Callable): Tool execution handlercategory(Optional[str]): Tool categorytags(List[str]): Tool tagsversion(str): Tool version
from src.api.services.mcp.server import MCPTool, MCPToolType
tool = MCPTool(
name="get_inventory",
description="Get inventory levels for a specific item",
tool_type=MCPToolType.FUNCTION,
parameters={
"item_id": {
"type": "string",
"description": "Item identifier",
"required": True
},
"location": {
"type": "string",
"description": "Warehouse location",
"required": False
}
},
handler=inventory_handler,
category="inventory",
tags=["inventory", "warehouse"],
version="1.0.0"
)Tool execution result.
success(bool): Execution success statusdata(Any): Execution result dataerror(Optional[str]): Error message if failedexecution_time(float): Execution time in secondsmetadata(Dict[str, Any]): Additional metadata
result = MCPToolResult(
success=True,
data={"item_id": "ITEM001", "quantity": 100},
execution_time=0.5,
metadata={"source": "inventory_system"}
)The MCP client enables communication with MCP servers.
Initialize the MCP client.
Parameters:
config(Optional[MCPClientConfig]): Client configuration
Example:
from src.api.services.mcp.client import MCPClient, MCPClientConfig
config = MCPClientConfig(
timeout=30,
retry_attempts=3
)
client = MCPClient(config)Connect to an MCP server.
Parameters:
server_url(str): Server URLconnection_type(MCPConnectionType): Connection type
Returns:
bool: True if connection successful
Example:
from src.api.services.mcp.client import MCPConnectionType
success = await client.connect("http://localhost:8000", MCPConnectionType.HTTP)Disconnect from the server.
Example:
await client.disconnect()Discover available tools on the server.
Parameters:
category(Optional[str]): Tool category filter
Returns:
List[MCPTool]: List of available tools
Example:
tools = await client.discover_tools("inventory")Execute a tool on the server.
Parameters:
tool_name(str): Name of tool to executearguments(Dict[str, Any]): Tool arguments
Returns:
MCPToolResult: Tool execution result
Example:
result = await client.execute_tool("get_inventory", {
"item_id": "ITEM001"
})Get information about a specific tool.
Parameters:
tool_name(str): Name of tool
Returns:
Optional[MCPTool]: Tool information or None
Example:
tool_info = await client.get_tool_info("get_inventory")Get server status.
Returns:
Dict[str, Any]: Server status information
Example:
status = await client.get_server_status()Dynamic tool discovery and registration service.
Initialize the tool discovery service.
Parameters:
config(Optional[ToolDiscoveryConfig]): Discovery configuration
Example:
from src.api.services.mcp.tool_discovery import ToolDiscoveryService, ToolDiscoveryConfig
config = ToolDiscoveryConfig(
discovery_interval=30,
max_tools_per_source=100
)
discovery = ToolDiscoveryService(config)Start the tool discovery process.
Example:
await discovery.start_discovery()Stop the tool discovery process.
Example:
await discovery.stop_discovery()Register a discovery source.
Parameters:
source_id(str): Unique source identifiersource(Any): Source objectsource_type(str): Type of source
Returns:
bool: True if registration successful
Example:
success = await discovery.register_discovery_source(
"erp_adapter",
erp_adapter,
"mcp_adapter"
)Unregister a discovery source.
Parameters:
source_id(str): Source identifier
Returns:
bool: True if unregistration successful
Example:
success = await discovery.unregister_discovery_source("erp_adapter")Search for tools using a query.
Parameters:
query(str): Search querycategory(Optional[str]): Tool category filter
Returns:
List[DiscoveredTool]: List of discovered tools
Example:
tools = await discovery.search_tools("inventory", "warehouse")Get tools by category.
Parameters:
category(ToolCategory): Tool category
Returns:
List[DiscoveredTool]: List of tools in category
Example:
from src.api.services.mcp.tool_discovery import ToolCategory
inventory_tools = await discovery.get_tools_by_category(ToolCategory.INVENTORY)Get tool usage statistics.
Parameters:
tool_name(str): Tool name
Returns:
Dict[str, Any]: Usage statistics
Example:
stats = await discovery.get_tool_usage_stats("get_inventory")
print(f"Usage count: {stats['usage_count']}")
print(f"Average execution time: {stats['avg_execution_time']}")Dynamic tool binding and execution framework.
Initialize the tool binding service.
Parameters:
discovery_service(ToolDiscoveryService): Tool discovery serviceconfig(Optional[ToolBindingConfig]): Binding configuration
Example:
from src.api.services.mcp.tool_binding import ToolBindingService, ToolBindingConfig
config = ToolBindingConfig(
max_tools_per_binding=10,
binding_timeout=30
)
binding = ToolBindingService(discovery, config)async def bind_tools(self, agent_id: str, query: str, intent: str, entities: Dict[str, Any], context: Dict[str, Any], strategy: BindingStrategy = BindingStrategy.SEMANTIC_MATCH, max_tools: int = 5) -> List[ToolBinding]
Bind tools to an agent based on query and context.
Parameters:
agent_id(str): Agent identifierquery(str): User queryintent(str): Query intententities(Dict[str, Any]): Extracted entitiescontext(Dict[str, Any]): Execution contextstrategy(BindingStrategy): Binding strategymax_tools(int): Maximum number of tools to bind
Returns:
List[ToolBinding]: List of tool bindings
Example:
from src.api.services.mcp.tool_binding import BindingStrategy
bindings = await binding.bind_tools(
agent_id="equipment_agent",
query="Get equipment status",
intent="equipment_lookup",
entities={"equipment_id": "EQ001"},
context={},
strategy=BindingStrategy.SEMANTIC_MATCH,
max_tools=5
)async def create_execution_plan(self, context: ExecutionContext, bindings: List[ToolBinding], mode: ExecutionMode = ExecutionMode.SEQUENTIAL) -> ExecutionPlan
Create an execution plan for tool bindings.
Parameters:
context(ExecutionContext): Execution contextbindings(List[ToolBinding]): Tool bindingsmode(ExecutionMode): Execution mode
Returns:
ExecutionPlan: Execution plan
Example:
from src.api.services.mcp.tool_binding import ExecutionMode
plan = await binding.create_execution_plan(
context,
bindings,
ExecutionMode.SEQUENTIAL
)Execute a tool execution plan.
Parameters:
plan(ExecutionPlan): Execution plan
Returns:
List[ExecutionResult]: Execution results
Example:
results = await binding.execute_plan(plan)Get binding history for an agent.
Parameters:
agent_id(str): Agent identifier
Returns:
List[ToolBinding]: Binding history
Example:
history = await binding.get_binding_history("equipment_agent")Intelligent tool routing and selection.
__init__(self, discovery_service: ToolDiscoveryService, binding_service: ToolBindingService, config: Optional[ToolRoutingConfig] = None)
Initialize the tool routing service.
Parameters:
discovery_service(ToolDiscoveryService): Tool discovery servicebinding_service(ToolBindingService): Tool binding serviceconfig(Optional[ToolRoutingConfig]): Routing configuration
Example:
from src.api.services.mcp.tool_routing import ToolRoutingService, ToolRoutingConfig
config = ToolRoutingConfig(
routing_timeout=30,
max_tools_per_route=5
)
routing = ToolRoutingService(discovery, binding, config)async def route_tools(self, context: RoutingContext, strategy: RoutingStrategy = RoutingStrategy.BALANCED, max_tools: int = 5) -> RoutingDecision
Route tools based on context and strategy.
Parameters:
context(RoutingContext): Routing contextstrategy(RoutingStrategy): Routing strategymax_tools(int): Maximum number of tools to route
Returns:
RoutingDecision: Routing decision
Example:
from src.api.services.mcp.tool_routing import RoutingStrategy, RoutingContext
context = RoutingContext(
query="Get equipment status for forklift EQ001",
intent="equipment_lookup",
entities={"equipment_id": "EQ001", "equipment_type": "forklift"},
user_context={"priority": "high"},
session_id="session_123",
agent_id="equipment_agent"
)
decision = await routing.route_tools(
context,
strategy=RoutingStrategy.BALANCED,
max_tools=5
)Get routing statistics.
Returns:
Dict[str, Any]: Routing statistics
Example:
stats = await routing.get_routing_stats()
print(f"Total routes: {stats['total_routes']}")
print(f"Average routing time: {stats['avg_routing_time']}")Comprehensive validation and error handling.
__init__(self, discovery_service: ToolDiscoveryService, config: Optional[ToolValidationConfig] = None)
Initialize the tool validation service.
Parameters:
discovery_service(ToolDiscoveryService): Tool discovery serviceconfig(Optional[ToolValidationConfig]): Validation configuration
Example:
from src.api.services.mcp.tool_validation import ToolValidationService, ToolValidationConfig
config = ToolValidationConfig(
validation_timeout=30,
strict_validation=True
)
validation = ToolValidationService(discovery, config)async def validate_tool_execution(self, tool_id: str, arguments: Dict[str, Any], context: Dict[str, Any], validation_level: ValidationLevel = ValidationLevel.STANDARD) -> ValidationResult
Validate tool execution parameters and context.
Parameters:
tool_id(str): Tool identifierarguments(Dict[str, Any]): Tool argumentscontext(Dict[str, Any]): Execution contextvalidation_level(ValidationLevel): Validation level
Returns:
ValidationResult: Validation result
Example:
from src.api.services.mcp.tool_validation import ValidationLevel
result = await validation.validate_tool_execution(
tool_id="get_equipment_status",
arguments={"equipment_id": "EQ001"},
context=execution_context,
validation_level=ValidationLevel.STANDARD
)
if result.is_valid:
print("Validation passed")
else:
for error in result.errors:
print(f"Validation error: {error}")async def validate_tool_capabilities(self, tool_id: str, required_capabilities: List[str]) -> ValidationResult
Validate tool capabilities.
Parameters:
tool_id(str): Tool identifierrequired_capabilities(List[str]): Required capabilities
Returns:
ValidationResult: Validation result
Example:
result = await validation.validate_tool_capabilities(
tool_id="get_equipment_status",
required_capabilities=["equipment_lookup", "status_check"]
)Service discovery and registry system.
Initialize the service discovery registry.
Parameters:
config(Optional[ServiceDiscoveryConfig]): Discovery configuration
Example:
from src.api.services.mcp.service_discovery import ServiceDiscoveryRegistry, ServiceDiscoveryConfig
config = ServiceDiscoveryConfig(
registry_ttl=300,
health_check_interval=30
)
registry = ServiceDiscoveryRegistry(config)Register a service with the registry.
Parameters:
service(ServiceInfo): Service information
Returns:
bool: True if registration successful
Example:
from src.api.services.mcp.service_discovery import ServiceInfo, ServiceType
service = ServiceInfo(
service_id="erp_adapter_001",
service_name="ERP Adapter",
service_type=ServiceType.ADAPTER,
endpoint="http://localhost:8001",
version="1.0.0",
capabilities=["inventory", "orders", "customers"]
)
success = await registry.register_service(service)Unregister a service from the registry.
Parameters:
service_id(str): Service identifier
Returns:
bool: True if unregistration successful
Example:
success = await registry.unregister_service("erp_adapter_001")async def discover_services(self, service_type: Optional[ServiceType] = None, capabilities: Optional[List[str]] = None) -> List[ServiceInfo]
Discover services by type and capabilities.
Parameters:
service_type(Optional[ServiceType]): Service type filtercapabilities(Optional[List[str]]): Required capabilities
Returns:
List[ServiceInfo]: List of discovered services
Example:
from src.api.services.mcp.service_discovery import ServiceType
# Discover all adapters
adapters = await registry.discover_services(ServiceType.ADAPTER)
# Discover services with specific capabilities
inventory_services = await registry.discover_services(
capabilities=["inventory"]
)Get service health status.
Parameters:
service_id(str): Service identifier
Returns:
ServiceHealth: Service health information
Example:
health = await registry.get_service_health("erp_adapter_001")
print(f"Service healthy: {health.is_healthy}")
print(f"Last check: {health.last_check}")Comprehensive monitoring, logging, and management.
Initialize the monitoring service.
Parameters:
config(Optional[MonitoringConfig]): Monitoring configuration
Example:
from src.api.services.mcp.monitoring import MCPMonitoringService, MonitoringConfig
config = MonitoringConfig(
metrics_retention_days=30,
alert_thresholds={
"error_rate": 0.05,
"response_time": 5.0
}
)
monitoring = MCPMonitoringService(config)Start the monitoring service.
Example:
await monitoring.start_monitoring()Stop the monitoring service.
Example:
await monitoring.stop_monitoring()async def record_metric(self, metric_name: str, value: float, tags: Optional[Dict[str, str]] = None) -> None
Record a metric.
Parameters:
metric_name(str): Metric namevalue(float): Metric valuetags(Optional[Dict[str, str]]): Metric tags
Example:
await monitoring.record_metric(
"tool_execution_time",
1.5,
{"tool_name": "get_inventory", "agent_id": "equipment_agent"}
)async def get_metrics(self, metric_name: Optional[str] = None, time_range: Optional[Tuple[datetime, datetime]] = None) -> List[MetricData]
Get metrics data.
Parameters:
metric_name(Optional[str]): Metric name filtertime_range(Optional[Tuple[datetime, datetime]]): Time range filter
Returns:
List[MetricData]: List of metric data
Example:
from datetime import datetime, timedelta
# Get metrics for last hour
end_time = datetime.now()
start_time = end_time - timedelta(hours=1)
metrics = await monitoring.get_metrics(
metric_name="tool_execution_time",
time_range=(start_time, end_time)
)Get monitoring dashboard data.
Returns:
Dict[str, Any]: Dashboard data
Example:
dashboard = await monitoring.get_monitoring_dashboard()
print(f"System health: {dashboard['system_health']}")
print(f"Active services: {dashboard['active_services']}")Base class for all MCP adapters.
Initialize the adapter.
Parameters:
config(AdapterConfig): Adapter configuration
Example:
from src.api.services.mcp.base import MCPAdapter, AdapterConfig, AdapterType
config = AdapterConfig(
adapter_id="erp_adapter_001",
adapter_name="ERP Adapter",
adapter_type=AdapterType.ERP,
connection_string="postgresql://user:pass@localhost:5432/erp",
capabilities=["inventory", "orders", "customers"]
)
adapter = MCPAdapter(config)Connect to the external system.
Returns:
bool: True if connection successful
Example:
success = await adapter.connect()Disconnect from the external system.
Example:
await adapter.disconnect()Check adapter health.
Returns:
bool: True if healthy
Example:
healthy = await adapter.health_check()Get adapter capabilities.
Returns:
List[str]: List of capabilities
Example:
capabilities = await adapter.get_capabilities()Base exception for MCP errors.
error_code(str): Error codeerror_message(str): Error messageerror_details(Optional[Dict[str, Any]]): Additional error details
from src.api.services.mcp.base import MCPError
try:
result = await client.execute_tool("invalid_tool", {})
except MCPError as e:
print(f"Error code: {e.error_code}")
print(f"Error message: {e.error_message}")
print(f"Error details: {e.error_details}")CONNECTION_ERROR: Connection-related errorsAUTHENTICATION_ERROR: Authentication errorsVALIDATION_ERROR: Validation errorsEXECUTION_ERROR: Tool execution errorsDISCOVERY_ERROR: Tool discovery errorsROUTING_ERROR: Tool routing errors
- Always handle MCPError exceptions
- Check error codes for specific error types
- Implement retry logic for transient errors
- Log errors with appropriate context
- Provide meaningful error messages to users
This API reference provides comprehensive documentation for all MCP components. For additional examples and usage patterns, refer to the MCP integration documentation and test files.