-
Notifications
You must be signed in to change notification settings - Fork 0
API Reference
graph TB
subgraph Transport["Transport Layer"]
STDIO["stdio<br/>(single-user)"]
SSE["SSE<br/>(deprecated)"]
HTTP["Streamable HTTP<br/>(/mcp endpoint)"]
end
subgraph Auth["Auth Middleware"]
BearerAuth["BearerAuthMiddleware<br/>(Bearer token validation)"]
end
subgraph MCP["MCP Server Layer"]
FastMCP["FastMCP 3.2.0<br/>(@mcp.tool decorators)"]
end
subgraph State["State Management"]
SessionMgr["SessionManager<br/>(per-user workspaces)"]
JobTracker["JobTracker<br/>(async job tracking)"]
end
subgraph Execution["Execution Layer"]
EnginePool["EnginePoolManager<br/>(MATLAB engines)"]
JobExecutor["JobExecutor<br/>(job orchestration)"]
end
subgraph Security["Security Layer"]
Validator["SecurityValidator<br/>(code safety)"]
AuthConfig["Config<br/>(auth tokens)"]
end
subgraph Tools["Tools"]
CoreTools["Core: execute_code,<br/>check_code, get_workspace"]
JobTools["Jobs: get_job_status,<br/>get_job_result, cancel_job"]
FileTools["Files: upload_data,<br/>delete_file, read_script"]
DiscoveryTools["Discovery: list_toolboxes,<br/>list_functions, get_help"]
AdminTools["Admin: get_pool_status"]
CustomTools["Custom Tools<br/>(YAML config)"]
end
subgraph Output["Output Formatting"]
Formatter["ResultFormatter<br/>(text, vars, figures)"]
PlotlyConvert["Plotly Converter<br/>(MATLAB → interactive plots)"]
end
subgraph Monitoring["Monitoring"]
Collector["MetricsCollector<br/>(counters, timeseries)"]
Store["MetricsStore<br/>(SQLite persistence)"]
Dashboard["Dashboard<br/>(HTML + WebUI)"]
end
STDIO --> FastMCP
SSE --> BearerAuth
HTTP --> BearerAuth
BearerAuth --> FastMCP
FastMCP --> SessionMgr
FastMCP --> JobTracker
FastMCP --> Validator
FastMCP --> AuthConfig
FastMCP --> CoreTools
FastMCP --> JobTools
FastMCP --> FileTools
FastMCP --> DiscoveryTools
FastMCP --> AdminTools
FastMCP --> CustomTools
JobTracker --> JobExecutor
SessionMgr --> JobExecutor
Validator --> JobExecutor
JobExecutor --> EnginePool
EnginePool --> Execution
JobExecutor --> Formatter
Formatter --> PlotlyConvert
JobExecutor --> Collector
Collector --> Store
Store --> Dashboard
style Transport fill:#e1f5ff
style Auth fill:#fff3e0
style MCP fill:#f3e5f5
style State fill:#e8f5e9
style Execution fill:#fce4ec
style Security fill:#fff9c4
style Tools fill:#f1f8e9
style Output fill:#ede7f6
style Monitoring fill:#e0f2f1
File: src/matlab_mcp/auth/middleware.py
Pure ASGI middleware that validates bearer tokens on HTTP requests without relying on Starlette's BaseHTTPMiddleware (which has streaming response bugs).
class BearerAuthMiddleware:
def __init__(self, app: ASGI3App, token: str | None = None) -> None:
"""
Initialize the middleware.
Parameters:
app (ASGI3App): The wrapped ASGI application.
token (str | None): Bearer token from MATLAB_MCP_AUTH_TOKEN env var.
If None, auth is disabled.
Returns:
None
"""
async def __call__(self, scope, receive, send) -> None:
"""
ASGI3 entry point. Validates Authorization header before forwarding.
Parameters:
scope (dict): ASGI connection scope (headers, method, path, etc).
receive (Callable): Async callable for receiving messages.
send (Callable): Async callable for sending messages.
Returns:
None
Behavior:
- Bypasses auth for /health endpoint (used by load balancers)
- Bypasses auth for OPTIONS requests (CORS preflight)
- Extracts Authorization: Bearer <token> header
- Compares token using hmac.compare_digest (constant-time)
- Returns 401 with WWW-Authenticate header if token invalid/missing
- Forwards request to app if token valid or auth disabled
Raises:
None (all errors handled internally)
"""Example Usage:
from matlab_mcp.auth.middleware import BearerAuthMiddleware
import os
token = os.environ.get("MATLAB_MCP_AUTH_TOKEN")
middleware = BearerAuthMiddleware(app, token=token)
# Connect via HTTP with token:
# curl -H "Authorization: Bearer <token>" http://127.0.0.1:8765/mcpFile: src/matlab_mcp/config.py
Pydantic model for server-level configuration (transport, host, port, logging).
class ServerConfig(BaseModel):
transport: Literal["stdio", "sse", "streamablehttp"] = "stdio"
"""
Transport protocol.
- "stdio": single-user, no auth, no HTTP
- "sse": deprecated multi-user HTTP with Server-Sent Events
- "streamablehttp": recommended multi-user HTTP with streamable protocol
"""
host: str = "127.0.0.1"
"""Bind address. Default 127.0.0.1 (loopback) avoids Windows Firewall prompts."""
port: int = 8765
"""HTTP port for SSE and streamablehttp transports."""
stateless_http: bool = False
"""Enable stateless mode for streamablehttp (each request gets ephemeral temp dir)."""
log_level: str = "INFO"
"""Logging level (DEBUG, INFO, WARNING, ERROR)."""
inspect_mode: bool = False
"""Inspection mode: skip MATLAB engine pool, use mocked engines."""Example Usage:
from matlab_mcp.config import ServerConfig, AppConfig
# Load from defaults
config = ServerConfig()
print(config.transport) # "stdio"
# Load from environment override
config = ServerConfig(
transport="streamablehttp",
host="0.0.0.0",
port=9000,
stateless_http=True
)
# Pydantic environment variable override (prefix MATLAB_MCP_)
# export MATLAB_MCP_SERVER_TRANSPORT=streamablehttp
config = ServerConfig(_env_prefix="MATLAB_MCP_")File: src/matlab_mcp/config.py
Top-level configuration container with all subsystems.
class AppConfig(BaseModel):
server: ServerConfig = Field(default_factory=ServerConfig)
pool: EnginePoolConfig = Field(default_factory=EnginePoolConfig)
execution: ExecutionConfig = Field(default_factory=ExecutionConfig)
security: SecurityConfig = Field(default_factory=SecurityConfig)
output: OutputConfig = Field(default_factory=OutputConfig)
monitoring: MonitoringConfig = Field(default_factory=MonitoringConfig)
sessions: SessionConfig = Field(default_factory=SessionConfig)
hitl: HITLConfig = Field(default_factory=HITLConfig)
@classmethod
def from_file(cls, path: str | None = None) -> "AppConfig":
"""
Load configuration from YAML file with environment variable overrides.
Parameters:
path (str | None): Path to config.yaml. If None, uses default location.
Returns:
AppConfig: Loaded and merged configuration.
Raises:
FileNotFoundError: If config file not found.
ValueError: If config is invalid.
Behavior:
1. Load YAML from path
2. Apply MATLAB_MCP_* environment overrides
3. Validate all fields via Pydantic
4. Log warning if token-like keys found in YAML (security)
"""Example Usage:
from matlab_mcp.config import AppConfig
# Load from default config.yaml
config = AppConfig.from_file()
# Load from custom path
config = AppConfig.from_file("/etc/matlab_mcp/config.yaml")
# Access subsystems
print(config.server.transport) # "streamablehttp"
print(config.pool.max_engines) # 4
print(config.execution.sync_timeout) # 30.0File: src/matlab_mcp/hitl/gate.py
Pydantic model for human-in-the-loop approval requests.
class HumanApproval(BaseModel):
action: str
"""Type of action: "execute_code", "upload_file", "delete_file", etc."""
description: str
"""Human-readable description of what is being requested."""
details: dict[str, str] = Field(default_factory=dict)
"""Additional context: code snippet, filename, size, etc."""Example Usage:
from fastmcp import Context
from matlab_mcp.hitl.gate import request_execute_approval, HumanApproval
async def execute_code_with_gate(
code: str, ctx: Context, hitl_config
) -> bool:
"""Check if code execution is approved."""
approval = HumanApproval(
action="execute_code",
description=f"Execute MATLAB code: {code[:50]}...",
details={"full_code": code}
)
return await request_execute_approval(
approval, ctx, hitl_config
)File: src/matlab_mcp/hitl/gate.py
async def request_execute_approval(
approval: HumanApproval,
ctx: Context,
hitl_config: HITLConfig
) -> bool:
"""
Request human approval to execute MATLAB code.
Parameters:
approval (HumanApproval): Approval request with code details.
ctx (Context): FastMCP context for sending elicitation.
hitl_config (HITLConfig): HITL configuration (enabled/disabled, gates).
Returns:
bool: True if approved, False if declined or HITL disabled.
Raises:
None (cancellation is treated as decline).
Behavior:
1. If HITL disabled, return True immediately (pass-through)
2. Check if code contains protected functions via regex
3. If protected or all_execute mode, call ctx.elicit(HumanApproval)
4. Return True if AcceptedElicitation, False otherwise
"""File: src/matlab_mcp/hitl/gate.py
async def request_file_approval(
approval: HumanApproval,
ctx: Context,
hitl_config: HITLConfig
) -> bool:
"""
Request human approval for file upload/delete operations.
Parameters:
approval (HumanApproval): Approval request with file details (name, size).
ctx (Context): FastMCP context for elicitation.
hitl_config (HITLConfig): HITL configuration.
Returns:
bool: True if approved, False if declined or HITL disabled.
Raises:
None
Behavior:
1. If HITL disabled, return True
2. Call ctx.elicit(HumanApproval) to prompt user
3. Return True on acceptance, False on decline/cancel
"""File: src/matlab_mcp/pool/engine.py
Wraps a single MATLAB engine instance with lifecycle management.
class MatlabEngineWrapper:
def __init__(self, engine_id: str) -> None:
"""
Initialize wrapper around lazy MATLAB engine.
Parameters:
engine_id (str): Unique identifier for this engine (e.g., "engine-0").
Returns:
None
"""
@property
def state(self) -> EngineState:
"""Current state of engine (STOPPED, STARTING, IDLE, BUSY)."""
async def start(self) -> bool:
"""
Start MATLAB engine asynchronously.
Returns:
bool: True if started successfully, False if error.
Raises:
None (errors logged internally).
"""
async def stop(self) -> None:
"""Stop engine and release resources."""
async def is_alive(self) -> bool:
"""Check engine health via trivial MATLAB command."""
async def eval(self, code: str, nargout: int = 0) -> list[Any]:
"""
Execute MATLAB code synchronously.
Parameters:
code (str): MATLAB code to execute.
nargout (int): Number of output values expected.
Returns:
list[Any]: Results from eval.
Raises:
RuntimeError: If engine not in IDLE state or code fails.
"""
async def eval_async(
self, code: str, nargout: int = 0
) -> matlab.engine.FutureResult:
"""
Execute MATLAB code asynchronously (non-blocking).
Parameters:
code (str): MATLAB code.
nargout (int): Output count.
Returns:
FutureResult: Handle to background job.
"""File: src/matlab_mcp/pool/manager.py
Manages a pool of MATLAB engines with dynamic scaling.
class EnginePoolManager:
def __init__(self, config: EnginePoolConfig) -> None:
"""
Initialize engine pool.
Parameters:
config (EnginePoolConfig): Pool settings (min, max, timeout, health check interval).
Returns:
None
"""
async def start(self) -> None:
"""
Start the pool: spawn min_engines and begin health checks.
Raises:
RuntimeError: If initial engine startup fails critically.
"""
async def shutdown(self) -> None:
"""Stop all engines and clean up."""
async def acquire(self, timeout: float = 30.0) -> MatlabEngineWrapper:
"""
Get an available engine from pool, blocking if none ready.
Parameters:
timeout (float): Max seconds to wait.
Returns:
MatlabEngineWrapper: Ready engine.
Raises:
asyncio.TimeoutError: If no engine available within timeout.
RuntimeError: If pool capacity exhausted.
"""
def release(self, engine: MatlabEngineWrapper) -> None:
"""
Return engine to available pool.
Parameters:
engine (MatlabEngineWrapper): Engine to release.
"""
def get_status(self) -> dict[str, int]:
"""
Get current pool status.
Returns:
dict with keys: total, available, busy, max
Example:
{"total": 4, "available": 2, "busy": 2, "max": 4}
"""File: src/matlab_mcp/jobs/models.py
Represents a single MATLAB code execution request.
@dataclass
class Job:
job_id: str
"""Unique job identifier."""
session_id: str
"""Session that submitted this job."""
code: str
"""MATLAB code to execute."""
status: JobStatus
"""Current status (PENDING, RUNNING, COMPLETED, FAILED, CANCELLED)."""
created_at: datetime
"""Timestamp when job was created."""
started_at: datetime | None
"""Timestamp when execution started."""
completed_at: datetime | None
"""Timestamp when execution finished."""
engine_id: str | None
"""Engine ID assigned to this job (if started)."""
result: dict[str, Any] | None
"""Execution result (once completed)."""
error: str | None
"""Error message (if failed)."""
@property
def elapsed_seconds(self) -> float:
"""Seconds from created_at to completed_at (or now if still running)."""
def mark_running(self, engine_id: str) -> None:
"""Mark job as running on given engine."""
def mark_completed(self, result: dict[str, Any]) -> None:
"""Mark job as completed with result."""
def mark_failed(self, error: str) -> None:
"""Mark job as failed with error message."""
def mark_cancelled(self) -> None:
"""Mark job as cancelled."""File: src/matlab_mcp/jobs/executor.py
Orchestrates MATLAB code execution with hybrid sync/async behavior.
class JobExecutor:
def __init__(
self,
pool: EnginePoolManager,
session_manager: SessionManager,
security_validator: SecurityValidator,
metrics_collector: MetricsCollector,
job_tracker: JobTracker,
config: ExecutionConfig
) -> None:
"""Initialize executor with all dependencies."""
async def execute_code(
self,
code: str,
session_id: str,
sync_timeout: float = 30.0,
nargout: int = 0
) -> Job:
"""
Execute MATLAB code with automatic sync/async promotion.
Parameters:
code (str): MATLAB code to execute.
session_id (str): Session ID for workspace isolation.
sync_timeout (float): Max seconds to wait for inline completion.
nargout (int): Number of outputs to capture.
Returns:
Job: Job object with status and result (if inline).
Raises:
BlockedFunctionError: If code contains blocked functions.
RuntimeError: If pool is exhausted or session invalid.
Behavior:
1. Validate code against SecurityValidator
2. Create Job in PENDING status
3. Acquire engine from pool
4. Inject job context (job_id, temp_dir) into MATLAB workspace
5. Execute eval() with timeout
6. If completes within sync_timeout → return Job with COMPLETED status
7. If timeout → mark as RUNNING and return Job with status=RUNNING
8. Background task continues async execution; result stored in Job.result
Example:
job = await executor.execute_code(
"result = 2 + 2",
session_id="sess-123"
)
print(job.status) # JobStatus.COMPLETED
print(job.result["output"]) # "ans = 4"
"""File: src/matlab_mcp/jobs/tracker.py
Registry for in-flight and historical jobs.
class JobTracker:
def __init__(self, retention_hours: int = 24) -> None:
"""
Initialize tracker.
Parameters:
retention_hours (int): Keep completed jobs for N hours, then prune.
"""
def add_job(self, job: Job) -> None:
"""Register a new job."""
def get_job(self, job_id: str) -> Job | None:
"""Retrieve job by ID."""
def get_session_jobs(self, session_id: str) -> list[Job]:
"""Get all jobs for a session."""
def get_active_jobs(self, session_id: str) -> list[Job]:
"""Get non-terminal jobs for a session (used for cleanup)."""
def update_job(self, job: Job) -> None:
"""Update job status/result in tracker."""
async def prune_old_jobs(self) -> int:
"""
Remove completed jobs older than retention_hours.
Returns:
int: Number of jobs pruned.
"""File: src/matlab_mcp/session/manager.py
Manages user session lifecycle with per-session workspaces.
class SessionManager:
def __init__(self, config: SessionConfig) -> None:
"""Initialize session manager."""
def create_session(self, session_id: str | None = None) -> str:
"""
Create a new session with unique temp directory.
Parameters:
session_id (str | None): Requested session ID. If None, auto-generate.
Returns:
str: Session ID (assigned or provided).
Raises:
RuntimeError: If max_sessions limit exceeded.
"""
def get_session_temp_dir(self, session_id: str) -> Path:
"""
Get temporary directory for a session.
Parameters:
session_id (str): Session ID.
Returns:
Path: Directory path (created if needed).
Raises:
ValueError: If session not found.
"""
def touch_session(self, session_id: str) -> None:
"""Update session's last_active timestamp."""
async def cleanup_expired_sessions(self) -> int:
"""
Remove idle sessions beyond timeout threshold.
Returns:
int: Number of sessions cleaned up.
Behavior:
- Skip sessions with active jobs (via JobTracker)
- Remove temp directory
- Delete from session registry
"""File: src/matlab_mcp/security/validator.py
Validates MATLAB code and filenames for safety.
class SecurityValidator:
def __init__(self, config: SecurityConfig) -> None:
"""Initialize validator with blocklist and rules."""
def validate_code(self, code: str) -> None:
"""
Check code against blocklist of dangerous functions.
Parameters:
code (str): MATLAB code to validate.
Raises:
BlockedFunctionError: If blocked function detected.
Behavior:
1. Strip string literals and comments
2. Check for function names in blocklist (system, eval, unix, dos, etc.)
3. Raise BlockedFunctionError if found
Example:
validator.validate_code("x = 1 + 1") # OK
validator.validate_code("system('ls')") # Raises BlockedFunctionError
"""
def sanitize_filename(self, filename: str) -> str:
"""
Sanitize filename to prevent directory traversal.
Parameters:
filename (str): User-provided filename.
Returns:
str: Cleaned filename.
Raises:
ValueError: If filename contains path separators or traversal patterns.
Behavior:
- Reject filenames with / or \
- Reject . or .. (directory navigation)
- Allow alphanumeric, underscore, hyphen, dot (for extensions)
"""File: src/matlab_mcp/tools/core.py
Execute arbitrary MATLAB code with optional HITL approval gate.
async def execute_code_impl(
code: str,
ctx: Context,
hitl_config: Optional[Any] = None
) -> dict[str, Any]:
"""
Execute MATLAB code synchronously or promote to async job.
Parameters:
code (str): MATLAB code to execute.
ctx (Context): FastMCP context (for session routing, elicitation).
hitl_config (Optional[Any]): HITL config; None disables approval gates.
Returns:
dict with keys:
- job_id: Unique job identifier
- status: "completed" or "running"
- output: stdout text
- variables: dict of workspace variables (for completed jobs)
- figures: list of figure objects (for completed jobs)
- execution_time: elapsed seconds
- error: error message (if failed)
Raises:
(none — all errors returned in dict["error"])
Behavior:
1. If HITL enabled and code contains protected functions, prompt for approval
2. Validate code via SecurityValidator
3. Create Job and acquire engine from pool
4. Inject MCP_JOB_ID into workspace
5. Execute code with sync_timeout (default 30s)
6. Collect output, variables, and figures
7. Return Job with result
8. If sync timeout exceeded, mark RUNNING and continue in background
Example:
result = await execute_code_impl(
"x = [1 2 3]; mean(x)",
ctx
)
print(result["output"]) # "ans = 2"
print(result["status"]) # "completed"
"""File: src/matlab_mcp/tools/core.py
Run MATLAB code quality linter (checkcode).
async def check_code_impl(
code: str,
ctx: Context
) -> dict[str, Any]:
"""
Lint MATLAB code without executing it.
Parameters:
code (str): MATLAB code to check.
ctx (Context): FastMCP context.
Returns:
dict with keys:
- issues: list of linting issues
- error_count: number of errors
- warning_count: number of warnings
Behavior:
1. Write code to temp file
2. Invoke mcp_checkcode.m MATLAB helper
3. Parse JSON output
4. Return issues and counts
Example:
result = await check_code_impl("x = 1", ctx)
print(result["issues"]) # []
print(result["error_count"]) # 0
"""File: src/matlab_mcp/tools/core.py
List variables in current MATLAB workspace.
async def get_workspace_impl(
ctx: Context
) -> dict[str, Any]:
"""
Retrieve workspace variables for current session.
Parameters:
ctx (Context): FastMCP context.
Returns:
dict with key "variables": list of variable dicts
Each variable dict has: name, type, size, value
Example:
result = await get_workspace_impl(ctx)
# result["variables"] = [
# {"name": "x", "type": "double", "size": [1, 1], "value": 42}
# ]
"""File: src/matlab_mcp/tools/files.py
Upload a file to session temp directory with optional HITL approval.
async def upload_data_impl(
filename: str,
data: str,
ctx: Context,
hitl_config: Optional[Any] = None
) -> dict[str, Any]:
"""
Upload base64-encoded file data to session workspace.
Parameters:
filename (str): Target filename (e.g., "data.csv").
data (str): Base64-encoded file contents.
ctx (Context): FastMCP context.
hitl_config (Optional[Any]): HITL config; None disables approval.
Returns:
dict with keys:
- path: Full path to uploaded file
- size_bytes: File size
- status: "ok" or error message
Raises:
(none — errors returned in dict["status"])
Behavior:
1. If HITL enabled, prompt for approval with filename/size
2. Decode base64 data
3. Sanitize filename (no path traversal)
4. Write to session temp directory
5. Return path and metadata
Example:
import base64
data = base64.b64encode(b"1,2,3\\n4,5,6").decode()
result = await upload_data_impl("data.csv", data, ctx)
print(result["path"]) # "/tmp/matlab_mcp/sess-123/data.csv"
"""File: src/matlab_mcp/tools/files.py
Delete a file from session workspace with optional HITL approval.
async def delete_file_impl(
filename: str,
ctx: Context,
hitl_config: Optional[Any] = None
) -> dict[str, Any]:
"""
Delete a file from session temp directory.
Parameters:
filename (str): Filename to delete (relative to session temp dir).
ctx (Context): FastMCP context.
hitl_config (Optional[Any]): HITL config.
Returns:
dict with keys:
- status: "deleted" or error message
Raises:
(none)
Behavior:
1. If HITL enabled, prompt for approval
2. Sanitize filename
3. Check file exists in session temp dir
4. Delete file
5. Return status
"""File: src/matlab_mcp/tools/files.py
List files in session workspace.
async def list_files_impl(
ctx: Context
) -> dict[str, Any]:
"""
List files in current session's temp directory.
Returns:
dict with key "files": list of file objects
Each file object has: name, size_bytes, modified_at
"""File: