-
Notifications
You must be signed in to change notification settings - Fork 0
API Reference
github-actions[bot] edited this page Apr 3, 2026
·
10 revisions
This document provides a comprehensive API reference for the MATLAB MCP Server, organized by module and namespace. All public functions, classes, and methods are documented with signatures, parameters, return types, and usage examples.
classDiagram
class MatlabMCPServer {
+config: AppConfig
+pool: EnginePoolManager
+executor: JobExecutor
+sessions: SessionManager
+tracker: JobTracker
+validator: SecurityValidator
+collector: MetricsCollector
+create_server(config) MatlabMCPServer
+run() None
}
class EnginePoolManager {
+min_engines: int
+max_engines: int
+acquire_engine() MatlabEngineWrapper
+release_engine(engine) None
+get_status() dict
+health_check() None
}
class SessionManager {
+max_sessions: int
+create_session(session_id) Session
+get_session(session_id) Session
+get_or_create_default() Session
+destroy_session(session_id) None
}
class JobExecutor {
+execute_code(code, ctx) dict
+check_code(code) dict
+get_workspace() dict
}
class SecurityValidator {
+blocked_functions_enabled: bool
+validate_code(code) None
+validate_filename(filename) None
}
class MetricsCollector {
+record_event(event_type, data) None
+get_current_snapshot() dict
+get_history(hours) list
}
MatlabMCPServer --> EnginePoolManager
MatlabMCPServer --> SessionManager
MatlabMCPServer --> JobExecutor
MatlabMCPServer --> SecurityValidator
MatlabMCPServer --> MetricsCollector
Main server class that orchestrates all components and exposes MCP tools.
class MatlabMCPServer:
"""MATLAB MCP Server that bridges MCP-compatible agents to MATLAB execution."""
def __init__(self, config: AppConfig) -> None:
"""Initialize the server with configuration.
Parameters
----------
config : AppConfig
Complete server configuration from YAML or defaults.
Raises
------
ValueError
If configuration is invalid (e.g., pool constraints violated).
"""def create_server(config: AppConfig) -> FastMCPServer:
"""Create and return a fully-configured FastMCP server instance.
This factory function assembles all subsystems (pool, executor, session
manager, security validator, metrics collector) and registers all 20
built-in MCP tools plus any custom tools from config.
Parameters
----------
config : AppConfig
Server configuration loaded from YAML or defaults.
Returns
-------
FastMCPServer
FastMCP server instance ready for transport-specific run().
Example
-------
>>> from matlab_mcp.config import load_config
>>> from matlab_mcp.server import create_server
>>> config = load_config("config.yaml")
>>> server = create_server(config)
>>> server.run(transport="stdio")
"""def run(self) -> None:
"""Start the server with the configured transport.
Routes to `server.run(transport=config.server.transport, ...)` based on
transport type:
- `stdio`: Single-user, no authentication, no middleware
- `sse`: Multi-user with bearer token authentication (deprecated)
- `streamablehttp`: Multi-user with bearer token authentication (recommended)
Raises
------
RuntimeError
If transport is unsupported or server startup fails.
"""def main() -> None:
"""Command-line interface for the MATLAB MCP Server.
Parses arguments and routes to the appropriate transport:
Usage
-----
>>> matlab-mcp --help
>>> matlab-mcp --transport stdio --config ./config.yaml
>>> matlab-mcp --transport streamablehttp --port 9000
>>> matlab-mcp --generate-token
Arguments
---------
--config PATH
Path to config.yaml (default: ./config.yaml)
--transport {stdio,sse,streamablehttp}
Override transport from config
--port PORT
Override port from config (HTTP/SSE only)
--generate-token
Generate a new bearer token and print env var snippet, then exit
--inspect
Pretty-print loaded configuration and exit
"""Top-level configuration model that aggregates all domains.
class AppConfig(BaseModel):
"""Complete application configuration.
Parameters
----------
server : ServerConfig
Server transport and networking settings.
pool : PoolConfig
MATLAB engine pool configuration.
execution : ExecutionConfig
Code execution timeouts and workspace isolation.
security : SecurityConfig
Security controls (blocked functions, upload limits).
sessions : SessionConfig
Session management (max sessions, timeout).
output : OutputConfig
Result formatting (Plotly, thumbnails, truncation).
monitoring : MonitoringConfig
Metrics collection and dashboard.
hitl : HITLConfig
Human-in-the-loop approval gates.
Example
-------
>>> from matlab_mcp.config import load_config
>>> config = load_config("config.yaml")
>>> print(config.pool.min_engines, config.pool.max_engines)
"""
server: ServerConfig = Field(default_factory=ServerConfig)
pool: PoolConfig = Field(default_factory=PoolConfig)
execution: ExecutionConfig = Field(default_factory=ExecutionConfig)
security: SecurityConfig = Field(default_factory=SecurityConfig)
sessions: SessionConfig = Field(default_factory=SessionConfig)
output: OutputConfig = Field(default_factory=OutputConfig)
monitoring: MonitoringConfig = Field(default_factory=MonitoringConfig)
hitl: HITLConfig = Field(default_factory=HITLConfig)class ServerConfig(BaseModel):
"""Server transport and networking configuration.
Parameters
----------
transport : {'stdio', 'sse', 'streamablehttp'}
Transport mode (default: 'stdio').
host : str
Bind address for HTTP/SSE (default: '127.0.0.1').
port : int
Listen port for HTTP/SSE (default: 8765).
stateless_http : bool
When True, each HTTP request gets its own ephemeral temp directory
(no session affinity required). Default: False.
log_level : {'debug', 'info', 'warning', 'error'}
Logging verbosity (default: 'info').
Example
-------
>>> config = ServerConfig(transport='streamablehttp', port=9000)
>>> print(f"Transport: {config.transport}, Port: {config.port}")
Transport: streamablehttp, Port: 9000
"""
transport: Literal["stdio", "sse", "streamablehttp"] = "stdio"
host: str = "127.0.0.1"
port: int = 8765
stateless_http: bool = False
log_level: Literal["debug", "info", "warning", "error"] = "info"class PoolConfig(BaseModel):
"""MATLAB engine pool configuration.
Parameters
----------
min_engines : int
Minimum engines to keep warm (default: 2).
max_engines : int
Maximum engines (default: 10).
scale_down_idle_timeout : int
Seconds idle before scaling down (default: 900).
engine_start_timeout : int
Seconds to wait for MATLAB startup (default: 120).
health_check_interval : int
Seconds between health checks (default: 60).
proactive_warmup_threshold : float
Utilization ratio (0-1) to trigger warmup (default: 0.8).
Example
-------
>>> config = PoolConfig(min_engines=4, max_engines=16)
"""
min_engines: int = 2
max_engines: int = 10
scale_down_idle_timeout: int = 900
engine_start_timeout: int = 120
health_check_interval: int = 60
proactive_warmup_threshold: float = 0.8class SecurityConfig(BaseModel):
"""Security controls for code execution.
Parameters
----------
blocked_functions_enabled : bool
Enable function blocklist (default: True).
blocked_functions : List[str]
List of banned MATLAB functions (default: system, eval, assignin, ...).
max_upload_size_mb : int
Maximum file upload size in MB (default: 100).
require_proxy_auth : bool
Require authentication header for file ops (default: False).
Example
-------
>>> config = SecurityConfig(
... blocked_functions_enabled=True,
... blocked_functions=["system", "eval"]
... )
"""
blocked_functions_enabled: bool = True
blocked_functions: List[str] = Field(
default_factory=lambda: [
"system", "unix", "dos", "!", "eval", "feval",
"evalc", "evalin", "assignin", "perl", "python"
]
)
max_upload_size_mb: int = 100
require_proxy_auth: bool = Falseclass HITLConfig(BaseModel):
"""Human-in-the-loop approval gate configuration.
When enabled, sensitive operations (execute_code, file uploads) pause
and prompt the human for approval via the MCP elicitation protocol.
Parameters
----------
enabled : bool
Master switch (default: False).
protected_functions : List[str]
MATLAB functions requiring approval before execution (default: []).
protect_file_ops : bool
Require approval for upload/delete (default: False).
all_execute : bool
Require approval for ALL execute_code calls (default: False).
Example
-------
>>> config = HITLConfig(
... enabled=True,
... protected_functions=["delete", "rmdir"],
... protect_file_ops=True
... )
"""
enabled: bool = False
protected_functions: List[str] = Field(default_factory=list)
protect_file_ops: bool = False
all_execute: bool = Falsedef load_config(path: str = "config.yaml") -> AppConfig:
"""Load and validate configuration from YAML file with env overrides.
Loads YAML configuration, applies environment variable overrides using
the MATLAB_MCP_* prefix, and validates with Pydantic.
Parameters
----------
path : str
Path to config.yaml file (default: './config.yaml').
Returns
-------
AppConfig
Loaded and validated configuration.
Raises
------
FileNotFoundError
If config file does not exist.
ValueError
If configuration is invalid.
Example
-------
>>> config = load_config("config.yaml")
>>> # Override via environment variable
>>> import os
>>> os.environ["MATLAB_MCP_POOL_MAX_ENGINES"] = "20"
>>> config = load_config("config.yaml")
>>> print(config.pool.max_engines)
20
"""Manages a pool of MATLAB engines with elastic scaling.
class EnginePoolManager:
"""Elastic pool of MATLAB engine instances.
Maintains min_engines to max_engines engines, acquiring/releasing on demand,
performing health checks, and scaling down idle engines.
Parameters
----------
config : PoolConfig
Pool configuration (min/max engines, timeouts).
matlab_root : str, optional
Explicit MATLAB installation path (auto-detected if None).
Example
-------
>>> from matlab_mcp.pool.manager import EnginePoolManager
>>> from matlab_mcp.config import PoolConfig
>>> config = PoolConfig(min_engines=2, max_engines=8)
>>> pool = EnginePoolManager(config)
>>> engine = await pool.acquire()
>>> await pool.release(engine)
"""
def __init__(
self,
config: PoolConfig,
matlab_root: str | None = None,
) -> None:
"""Initialize the engine pool."""
async def acquire(self, timeout: float = 30.0) -> MatlabEngineWrapper:
"""Acquire an engine from the pool.
Blocks if pool is at max capacity; waits for an engine to be released.
Parameters
----------
timeout : float
Seconds to wait before raising TimeoutError (default: 30).
Returns
-------
MatlabEngineWrapper
An idle engine ready for code execution.
Raises
------
TimeoutError
If no engine available within timeout.
Example
-------
>>> engine = await pool.acquire()
>>> result = await engine.eval("2 + 2")
>>> await pool.release(engine)
"""
async def release(self, engine: MatlabEngineWrapper) -> None:
"""Release an engine back to the pool.
Parameters
----------
engine : MatlabEngineWrapper
Engine to release.
Example
-------
>>> await pool.release(engine)
"""
def get_status(self) -> dict:
"""Get current pool status.
Returns
-------
dict
Dictionary with keys: 'total', 'available', 'busy', 'max_engines'.
Example
-------
>>> status = pool.get_status()
>>> print(f"Available: {status['available']}/{status['total']}")
Available: 2/4
"""
async def health_check(self) -> None:
"""Perform periodic health check.
Pings each engine; replaces dead engines.
Example
-------
>>> await pool.health_check()
"""
async def shutdown(self, timeout: float = 30.0) -> None:
"""Shutdown all engines gracefully.
Parameters
----------
timeout : float
Seconds to wait for engines to quit (default: 30).
Example
-------
>>> await pool.shutdown()
"""class MatlabEngineWrapper:
"""Wrapper around a single MATLAB engine instance.
Manages lifecycle state (STOPPED, STARTING, IDLE, BUSY), health checks,
and workspace operations.
Parameters
----------
engine_id : str
Unique engine identifier.
matlab_root : str, optional
MATLAB installation path (auto-detected if None).
Example
-------
>>> from matlab_mcp.pool.engine import MatlabEngineWrapper
>>> engine = MatlabEngineWrapper(engine_id="engine-1")
>>> await engine.start()
>>> result = await engine.eval("magic(3)")
>>> await engine.stop()
"""
async def start(self, timeout: float = 120.0) -> None:
"""Start the MATLAB engine.
Parameters
----------
timeout : float
Seconds to wait for startup (default: 120).
Raises
------
TimeoutError
If engine doesn't start within timeout.
"""
async def eval(
self,
code: str,
capture_output: bool = True,
) -> str:
"""Execute MATLAB code synchronously.
Parameters
----------
code : str
MATLAB code to execute.
capture_output : bool
Capture stdout/stderr (default: True).
Returns
-------
str
Code output.
Raises
------
RuntimeError
If code execution fails.
Example
-------
>>> result = await engine.eval("2 + 2")
>>> print(result)
ans = 4
"""
async def eval_async(self, code: str) -> concurrent.futures.Future:
"""Execute MATLAB code asynchronously.
Parameters
----------
code : str
MATLAB code to execute.
Returns
-------
Future
Future that resolves with code output.
Example
-------
>>> future = await engine.eval_async("pause(10); disp('done')")
>>> result = await asyncio.wait_for(future, timeout=15)
"""
async def get_workspace(self) -> dict:
"""Get current workspace variables.
Returns
-------
dict
Mapping of variable name to (type, size, value).
Example
-------
>>> ws = await engine.get_workspace()
>>> for name, (type_, size, value) in ws.items():
... print(f"{name}: {type_} {size}")
"""
async def health_check(self) -> bool:
"""Check if engine is healthy.
Returns
-------
bool
True if engine is responsive; False otherwise.
"""
async def stop(self, timeout: float = 30.0) -> None:
"""Stop the MATLAB engine gracefully.
Parameters
----------
timeout : float
Seconds to wait for shutdown (default: 30).
"""
@property
def is_alive(self) -> bool:
"""Whether engine is running."""
@property
def is_idle(self) -> bool:
"""Whether engine is idle (not executing code)."""class JobExecutor:
"""Orchestrates MATLAB code execution lifecycle.
Handles security validation, job context injection, sync/async promotion,
result formatting, and telemetry recording.
Parameters
----------
pool : EnginePoolManager
Engine pool to acquire engines from.
tracker : JobTracker
Job tracker for metadata recording.
validator : SecurityValidator
Security validator for code scanning.
formatter : ResultFormatter
Formats execution results.
config : ExecutionConfig
Execution configuration (timeouts, workspace isolation).
session_manager : SessionManager
Session manager for workspace isolation.
Example
-------
>>> executor = JobExecutor(pool, tracker, validator, formatter, config, sessions)
>>> result = await executor.execute_code(
... code="magic(3)",
... session_id="session-1",
... ctx=mcp_context
... )
"""
async def execute_code(
self,
code: str,
session_id: str,
ctx: fastmcp.Context,
hitl_config: HITLConfig | None = None,
) -> dict:
"""Execute MATLAB code with security checks and optional HITL gating.
Parameters
----------
code : str
MATLAB code to execute.
session_id : str
Session ID for workspace isolation.
ctx : fastmcp.Context
MCP request context (for elicitation, logging).
hitl_config : HITLConfig, optional
HITL approval gates (default: None = gates disabled).
Returns
-------
dict
Result dictionary with keys: 'text', 'variables', 'files', 'error', 'job_id'.
Raises
------
BlockedFunctionError
If code contains blocked functions.
Example
-------
>>> result = await executor.execute_code("2 + 2", "session-1", ctx)
>>> print(result.get("text"))
"""
async def check_code(self, code: str) -> dict:
"""Lint MATLAB code using checkcode.
Parameters
----------
code : str
MATLAB code to lint.
Returns
-------
dict
Linting results with 'issues', 'error_count', 'warning_count'.
Example
-------
>>> issues = await executor.check_code("x=1")
>>> print(f"{issues['error_count']} errors, {issues['warning_count']} warnings")
"""
async def get_workspace(self, session_id: str) -> dict:
"""Get current workspace variables.
Parameters
----------
session_id : str
Session ID.
Returns
-------
dict
Workspace variables with name, type, size, value.
Example
-------
>>> ws = await executor.get_workspace("session-1")
>>> print(ws)
"""class JobTracker:
"""Thread-safe registry of MATLAB execution jobs by session.
Tracks job metadata (status, timestamps, results) for polling and status checks.
Parameters
----------
retention_seconds : int
Keep completed jobs for this many seconds (default: 86400 = 24h).
Example
-------
>>> tracker = JobTracker(retention_seconds=86400)
>>> job = tracker.create_job(code="magic(3)", session_id="session-1")
>>> job.mark_running()
>>> tracker.get_job(job.job_id)
"""
def create_job(self, code: str, session_id: str) -> Job:
"""Create a new job.
Parameters
----------
code : str
MATLAB code to execute.
session_id : str
Session ID.
Returns
-------
Job
New job instance.
"""
def get_job(self, job_id: str) -> Job | None:
"""Retrieve a job by ID.
Parameters
----------
job_id : str
Job ID.
Returns
-------
Job or None
Job if found; None otherwise.
"""
def list_jobs(
self,
session_id: str,
status: str | None = None,
) -> list[Job]:
"""List jobs for a session.
Parameters
----------
session_id : str
Session ID.
status : str, optional
Filter by status (PENDING, RUNNING, COMPLETED, FAILED, CANCELLED).
Returns
-------
list[Job]
Matching jobs.
Example
-------
>>> jobs = tracker.list_jobs("session-1", status="COMPLETED")
"""
def prune(self) -> int:
"""Remove expired jobs older than retention_seconds.
Returns
-------
int
Number of jobs pruned.
"""@dataclass
class Job:
"""Single MATLAB code execution job.
Attributes
----------
job_id : str
Unique job identifier.
code : str
MATLAB code executed.
session_id : str
Associated session.
status : JobStatus
Current status (PENDING, RUNNING, COMPLETED, FAILED, CANCELLED).
engine_id : str, optional
Engine executing the job.
result : dict, optional
Execution result when status is COMPLETED.
error : str, optional
Error message when status is FAILED.
created_at : datetime
Job creation timestamp.
started_at : datetime, optional
When execution started.
completed_at : datetime, optional
When execution completed.
Example
-------
>>> job.mark_running()
>>> job.mark_completed(result={"text": "ans = 4"})
>>> print(job.elapsed_seconds)
2.5
"""
job_id: str
code: str
session_id: str
status: JobStatus = JobStatus.PENDING
engine_id: str | None = None
result: dict | None = None
error: str | None = None
created_at: datetime = field(default_factory=datetime.now)
started_at: datetime | None = None
completed_at: datetime | None = None
def mark_running(self) -> None:
"""Transition to RUNNING status."""
def mark_completed(self, result: dict) -> None:
"""Transition to COMPLETED status with result."""
def mark_failed(self, error: str) -> None:
"""Transition to FAILED status with error message."""
def mark_cancelled(self) -> None:
"""Transition to CANCELLED status."""
@property
def elapsed_seconds(self) -> float:
"""Time from creation to completion in seconds."""class SessionManager:
"""Manages per-user sessions with workspace isolation.
Parameters
----------
config : SessionConfig
Session configuration (max sessions, timeout).
base_temp_dir : str
Base directory for session temp directories.
Example
-------
>>> from matlab_mcp.session.manager import SessionManager
>>> from matlab_mcp.config import SessionConfig
>>> config = SessionConfig(max_sessions=50, session_timeout=3600)
>>> sessions = SessionManager(config, base_temp_dir="/tmp")
>>> session = sessions.create_session(session_id="user-1")
>>> print(session.temp_dir)
"""
def create_session(self, session_id: str) -> Session:
"""Create a new session.
Parameters
----------
session_id : str
Unique session identifier.
Returns
-------
Session
New session with isolated temp directory.
Raises
------
RuntimeError
If max_sessions limit reached.
Example
-------
>>> session = sessions.create_session("user-1")
>>> print(session.session_id, session.temp_dir)
"""
def get_session(self, session_id: str) -> Session | None:
"""Retrieve a session by ID.
Parameters
----------
session_id : str
Session ID.
Returns
-------
Session or None
Session if found; None otherwise.
"""
def get_or_create_default(self) -> Session:
"""Get the default session, creating if necessary.
Used for stdio transport which has no session concept.
Returns
-------
Session
Default session.
"""
def destroy_session(self, session_id: str) -> None:
"""Destroy a session and cleanup its temp directory.
Parameters
----------
session_id : str
Session ID.
Example
-------
>>> sessions.destroy_session("user-1")
"""
def list_sessions(self) -> list[Session]:
"""List all active sessions.
Returns
-------
list[Session]
All active sessions.
"""
def cleanup_idle(self, timeout_seconds: int = 3600) -> int:
"""Cleanup sessions idle longer than timeout.
Parameters
----------
timeout_seconds : int
Idle time threshold (default: 3600 = 1 hour).
Returns
-------
int
Number of sessions cleaned up.
"""@dataclass
class Session:
"""User session with isolated workspace.
Attributes
----------
session_id : str
Unique session identifier.
temp_dir : str
Temporary working directory for this session.
created_at : datetime
Session creation time.
last_activity : datetime
Last activity timestamp.
Example
-------
>>> session = Session(
... session_id="user-1",
... temp_dir="/tmp/matlab-mcp-user-1"
... )
>>> print(session.idle_seconds)
"""
session_id: str
temp_dir: str
created_at: datetime = field(default_factory=datetime.now)
last_activity: datetime = field(default_factory=datetime.now)
@property
def idle_seconds(self) -> float:
"""Seconds since last activity."""
def touch(self) -> None:
"""Update last_activity timestamp to now."""class SecurityValidator:
"""Enforces security policies on code and filenames.
Uses regex-based scanning with string/comment stripping to detect
dangerous functions and constructs.
Parameters
----------
config : SecurityConfig
Security configuration (blocklist, etc.).
Example
-------
>>> from matlab_mcp.security.validator import SecurityValidator
>>> from matlab_mcp.config import SecurityConfig
>>> config = SecurityConfig(blocked_functions_enabled=True)
>>> validator = SecurityValidator(config)
>>> try:
... validator.validate_code("system('ls')")
... except BlockedFunctionError as e:
... print(f"Blocked: {e}")
"""
def validate_code(self, code: str) -> None:
"""Scan code for blocked functions and constructs.
Parameters
----------
code : str
MATLAB code to validate.
Raises
------
BlockedFunctionError
If code contains blocked functions or constructs.
Example
-------
>>> validator.validate_code("x = 1; y = 2") # OK
>>> validator.validate_code("system('rm *')") # Raises BlockedFunctionError
"""
def validate_filename(self, filename: str) -> None:
"""Validate filename against whitelist of safe characters.
Parameters
----------
filename : str
Filename to validate (path separators not allowed).
Raises
------
ValueError
If filename contains unsafe characters.
Example
-------
>>> validator.validate_filename("data.csv") # OK
>>> validator.validate_filename("../etc/passwd") # Raises ValueError
"""
@property
def blocked_functions(self) -> list[str]:
"""Get current blocklist."""class BearerAuthMiddleware:
"""Pure ASGI middleware for bearer token authentication.
Validates `Authorization: Bearer <token>` header on all HTTP requests.
Token is sourced from `MATLAB_MCP_AUTH_TOKEN` environment variable.
Bypasses authentication for `/health` and OPTIONS pre-flight requests.
Parameters
----------
app : Callable
ASGI application to wrap.
Example
-------
>>> from matlab_mcp.auth.middleware import BearerAuthMiddleware
>>> from starlette.applications import Starlette
>>> app = Starlette()
>>> middleware = BearerAuthMiddleware(app)
>>> # Use with uvicorn or FastMCP
"""
def __init__(self, app: Callable) -> None:
"""Initialize middleware."""
async def __call__(
self,
scope: dict,
receive: Callable,
send: Callable,
) -> None:
"""ASGI interface.
Parameters
----------
scope : dict
ASGI scope (connection details).
receive : Callable
ASGI receive channel.
send : Callable
ASGI send channel.
Example
-------
>>> await middleware(scope, receive, send)
"""
@classmethod
def generate_token(cls) -> str:
"""Generate a new 64-character hex bearer token.
Returns
-------