-
Notifications
You must be signed in to change notification settings - Fork 0
API Reference
classDiagram
class FastMCP {
+register_tool()
+run()
}
class MatlabMCPServer {
+config: AppConfig
+pool: EnginePoolManager
+tracker: JobTracker
+executor: JobExecutor
+sessions: SessionManager
+security: SecurityValidator
+collector: MetricsCollector
-_get_session_id()
-_get_temp_dir()
}
class EnginePoolManager {
-engines: List~MatlabEngineWrapper~
+acquire()
+release()
+get_status()
-_start_engines()
-_scale_up()
-_scale_down()
-_health_check()
}
class MatlabEngineWrapper {
-engine: matlab.engine.MatlabEngine
-state: EngineState
+execute()
+health_check()
+reset_workspace()
+mark_idle()
+mark_busy()
}
class JobExecutor {
-pool: EnginePoolManager
-tracker: JobTracker
-config: AppConfig
+execute()
-_inject_job_context()
-_safe_serialize()
-_wait_for_completion()
}
class JobTracker {
-jobs: Dict~str, Job~
+create()
+get()
+list()
+prune()
}
class Job {
+job_id: str
+session_id: str
+code: str
+status: JobStatus
+mark_running()
+mark_completed()
+mark_failed()
}
class SessionManager {
-sessions: Dict~str, Session~
+create_session()
+get_session()
+destroy_session()
}
class SecurityValidator {
-blocked_functions: List~str~
+check_code()
-_strip_literals()
}
class MetricsCollector {
+record_event()
+sample_once()
+get_current_snapshot()
+get_counters()
}
class AppConfig {
+server: ServerConfig
+pool: PoolConfig
+execution: ExecutionConfig
+output: OutputConfig
+security: SecurityConfig
+resolve_paths()
}
FastMCP --> MatlabMCPServer
MatlabMCPServer --> EnginePoolManager
MatlabMCPServer --> JobTracker
MatlabMCPServer --> JobExecutor
MatlabMCPServer --> SessionManager
MatlabMCPServer --> SecurityValidator
MatlabMCPServer --> MetricsCollector
MatlabMCPServer --> AppConfig
EnginePoolManager --> MatlabEngineWrapper
JobExecutor --> JobTracker
JobExecutor --> EnginePoolManager
JobTracker --> Job
File: src/matlab_mcp/config.py
Main configuration aggregator combining all sub-configs with YAML loading and environment variable overrides.
AppConfig(
server: ServerConfig
pool: PoolConfig
execution: ExecutionConfig
workspace: WorkspaceConfig
toolboxes: ToolboxesConfig
custom_tools: CustomToolsConfig
security: SecurityConfig
code_checker: CodeCheckerConfig
output: OutputConfig
sessions: SessionsConfig
monitoring: MonitoringConfig
) -> AppConfigMethods:
-
resolve_paths(base_dir: Path) -> None— Resolve relative paths to absolute -
validate_pool() -> AppConfig— Validate pool constraints (raisesValueErrorif min > max)
Environment Overrides: All config values can be overridden via MATLAB_MCP_* environment variables (e.g., MATLAB_MCP_POOL_MAX_ENGINES=15)
Example:
from matlab_mcp.config import load_config
config = load_config("config.yaml")
# or use defaults
config = load_config(None)Load configuration from YAML file with environment variable overrides.
Parameters:
-
path— Path to YAML config file, orNoneto use defaults
Returns: AppConfig instance
Raises: ValueError if pool constraints are violated
File: src/matlab_mcp/server.py
Container for all server-level state and component lifecycle.
class MatlabMCPServer:
config: AppConfig
pool: EnginePoolManager
tracker: JobTracker
executor: JobExecutor
sessions: SessionManager
security: SecurityValidator
collector: Optional[MetricsCollector]
store: Optional[MetricsStore]Methods:
-
_get_session_id(ctx: Context) -> str— Get session ID from context (stdio uses "default", SSE uses ctx.session_id) -
_get_temp_dir(session_id: str) -> str— Get temp directory for session
Create and configure the FastMCP server instance with all tools and lifespan handlers.
Parameters:
-
config—AppConfiginstance
Returns: Configured FastMCP server ready to run
Side Effects: Registers all MCP tools, sets up lifespan context managers for startup/shutdown
Example:
from matlab_mcp.config import load_config
from matlab_mcp.server import create_server
config = load_config("config.yaml")
mcp = create_server(config)
await mcp.run(port=8765)Entry point for the matlab-mcp CLI script. Loads config from file/environment and runs the server.
File: src/matlab_mcp/pool/manager.py
Manages a pool of MATLAB engine instances with elastic scaling and health checks.
class EnginePoolManager:
async def acquire() -> MatlabEngineWrapper
async def release(engine: MatlabEngineWrapper) -> None
def get_status() -> Dict[str, int]
async def shutdown() -> NoneBehavior:
- Maintains
min_enginestomax_enginesinstances - Auto-scales up on demand when all engines are busy
- Scales down idle engines after
scale_down_idle_timeoutseconds - Performs periodic health checks, replacing unresponsive engines
- Supports optional affinity tracking (pin jobs to engines)
Example:
from matlab_mcp.pool.manager import EnginePoolManager
from matlab_mcp.config import PoolConfig, load_config
config = load_config("config.yaml")
pool = EnginePoolManager(config)
await pool.initialize()
engine = await pool.acquire()
await engine.execute("x = 42;")
await pool.release(engine)
await pool.shutdown()File: src/matlab_mcp/pool/engine.py
Wraps a single MATLAB engine instance with state management and lazy initialization.
class MatlabEngineWrapper:
engine_id: str
state: EngineState # STOPPED, STARTING, IDLE, BUSY
idle_time: float
async def start() -> None
async def stop() -> None
async def execute(code: str, nargout: int = 0) -> Any
async def execute_async(code: str) -> MatlabFuture
async def health_check() -> bool
async def reset_workspace() -> None
def mark_idle() -> None
def mark_busy() -> NoneProperties:
-
is_alive: bool— Engine is running and responsive -
idle_seconds: float— Time since last use
Raises:
-
MatlabExecutionError— On MATLAB code execution errors -
RuntimeError— If engine fails to start within timeout
File: src/matlab_mcp/jobs/models.py
Represents a single MATLAB code execution request.
@dataclass
class Job:
session_id: str
code: str
job_id: str = field(default_factory=...) # auto-generated
status: JobStatus = JobStatus.PENDING
engine_id: Optional[str] = None
result: Optional[Dict] = None
error: Optional[Dict] = None
started_at: Optional[float] = None
completed_at: Optional[float] = None
created_at: float = field(default_factory=time.time)
future: Optional[asyncio.Future] = NoneMethods:
-
mark_running(engine_id: str) -> None— Transition to RUNNING -
mark_completed(result: Dict) -> None— Transition to COMPLETED -
mark_failed(error_type: str, message: str, matlab_id: Optional[str] = None, stack_trace: Optional[str] = None) -> None— Transition to FAILED -
mark_cancelled() -> None— Transition to CANCELLED
Properties:
-
elapsed_seconds: Optional[float]— Wall-clock time from start to completion (or now if running)
File: src/matlab_mcp/jobs/models.py
Job lifecycle states: PENDING, RUNNING, COMPLETED, FAILED, CANCELLED
File: src/matlab_mcp/jobs/tracker.py
Thread-safe registry of all MATLAB execution jobs.
class JobTracker:
def create(session_id: str, code: str) -> Job
def get(job_id: str) -> Optional[Job]
def list(session_id: Optional[str] = None) -> List[Job]
async def prune() -> int
def has_active_jobs(session_id: str) -> boolBehavior:
- Auto-prunes expired terminal jobs (COMPLETED/FAILED/CANCELLED) after
retention_seconds - Thread-safe with internal lock
Example:
from matlab_mcp.jobs.tracker import JobTracker
tracker = JobTracker(retention_seconds=86400)
job = tracker.create("session-1", "x = 42;")
job.mark_running("engine-0")
job.mark_completed({"text": "ans = 42"})
retrieved = tracker.get(job.job_id)
all_jobs = tracker.list("session-1")File: src/matlab_mcp/jobs/executor.py
Orchestrates full MATLAB code execution lifecycle: security validation, engine acquisition, code injection, execution, result building.
class JobExecutor:
async def execute(
session_id: str,
code: str,
temp_dir: Optional[str] = None,
timeout: Optional[float] = None
) -> Dict[str, Any]Returns: Result dictionary with keys:
-
status: "completed" | "running" | "failed" -
job_id: Job ID (all responses) -
text: Output text (completed/failed) -
variables: Dict of workspace variables (completed) -
error: Error details (failed) -
execution_time: Wall-clock seconds (completed)
Behavior:
- Runs synchronously if completes within
sync_timeout - Auto-promotes to async (background task) if exceeds timeout
- Injects
__mcp_job_id__and__mcp_temp_dir__into workspace - Captures stdout/stderr, handles MATLAB errors gracefully
- Records metrics (success/failure, execution time)
Raises:
-
SecurityValidator.BlockedFunctionError— If code contains blocked functions
Example:
from matlab_mcp.jobs.executor import JobExecutor
from matlab_mcp.pool.manager import EnginePoolManager
from matlab_mcp.jobs.tracker import JobTracker
from matlab_mcp.config import load_config
config = load_config("config.yaml")
pool = EnginePoolManager(config)
tracker = JobTracker()
executor = JobExecutor(pool=pool, tracker=tracker, config=config)
result = await executor.execute("s1", "x = eig(randn(5));")
# result["status"] == "completed"
# result["variables"]["x"] contains eigenvaluesFile: src/matlab_mcp/security/validator.py
Validates MATLAB code against a security blocklist.
class SecurityValidator:
def check_code(code: str) -> None # raises BlockedFunctionError if blocked
def sanitize_filename(filename: str) -> strBlocked Functions (Default):
- Dangerous execution:
system,unix,dos,!,perl,python - Code evaluation:
eval,feval,evalc,evalin,assignin
Behavior:
- Strips MATLAB string literals (
'...',"...") to avoid false positives - Detects function calls via regex pattern matching
- Customizable via
security.blocked_functionsin config
Raises:
-
BlockedFunctionError— If blocked function detected in code
Example:
from matlab_mcp.security.validator import SecurityValidator
from matlab_mcp.config import SecurityConfig
validator = SecurityValidator(SecurityConfig())
try:
validator.check_code("x = system('ls');")
except SecurityValidator.BlockedFunctionError as e:
print(f"Blocked: {e}") # Blocked: systemRaised when code contains a blocked function.
File: src/matlab_mcp/session/manager.py
Represents a user session with isolated temp directory.
@dataclass
class Session:
session_id: str
temp_dir: str
created_at: float = field(default_factory=time.time)
last_activity: float = field(default_factory=time.time)Properties:
-
idle_seconds: float— Time since last activity
Methods:
-
touch() -> None— Updatelast_activitytimestamp
File: src/matlab_mcp/session/manager.py
Manages user session lifecycle with isolation and cleanup.
class SessionManager:
def create_session(session_id: Optional[str] = None) -> Session
def get_session(session_id: str) -> Optional[Session]
def get_or_create_default() -> Session
def destroy_session(session_id: str) -> bool
async def cleanup_idle_sessions() -> None
@property
def session_count() -> intBehavior:
- Enforces
max_sessionslimit (raisesRuntimeErrorif exceeded) - Creates isolated temp directories for each session
- Cleans up idle sessions after
session_timeoutseconds - In stdio mode, uses a single "default" session for all requests
- In SSE mode, creates per-client sessions from context
Example:
from matlab_mcp.session.manager import SessionManager
from matlab_mcp.config import load_config
config = load_config("config.yaml")
manager = SessionManager(config)
session = manager.create_session("client-abc")
print(session.temp_dir) # /tmp/matlab-mcp/client-abc_<uuid>
await manager.cleanup_idle_sessions()
manager.destroy_session("client-abc")File: src/matlab_mcp/output/formatter.py
Converts raw MATLAB execution results into structured MCP response dictionaries.
class ResultFormatter:
def format_result(
text: str,
variables: Dict[str, Any],
execution_time: float
) -> Dict[str, Any]
def format_error(
error_type: str,
message: str,
matlab_id: Optional[str] = None,
stack_trace: Optional[str] = None
) -> Dict[str, Any]Behavior:
- Truncates text to
max_inline_text_length(default 50KB) - Saves overflow text to session directory as
.txtfile - Summarizes large variables (arrays >
large_result_threshold) - Elides values > 10000 chars
- Preserves numeric precision (floats displayed to ~15 digits)
Example:
from matlab_mcp.output.formatter import ResultFormatter
from matlab_mcp.config import OutputConfig
formatter = ResultFormatter(OutputConfig())
result = formatter.format_result(
text="x = [1 2 3; 4 5 6]",
variables={"x": [[1, 2, 3], [4, 5, 6]]},
execution_time=0.15
)
# {
# "status": "completed",
# "text": "x = [1 2 3; 4 5 6]",
# "variables": {"x": "2×3 double"},
# "execution_time": 0.15
# }File: src/matlab_mcp/output/plotly_style_mapper.py
Translates MATLAB figure properties to Plotly JSON format.
def matlab_to_plotly(
matlab_fig_json: Dict[str, Any],
format: str = "json"
) -> Dict[str, Any]Supported Chart Types:
- Line plots, scatter, bar, histogram, box plots
- 3D surfaces, mesh plots
- Heatmaps, contour plots
- Subplots with proper domain layout
Example:
from matlab_mcp.output.plotly_style_mapper import matlab_to_plotly
plotly_fig = matlab_to_plotly(matlab_fig_dict)
# Returns Plotly figure dict suitable for plotly.show()File: src/matlab_mcp/monitoring/collector.py
Collects server metrics and events with in-memory buffers and async store writes.
class MetricsCollector:
def record_event(event_type: str, data: Dict[str, Any]) -> None
async def sample_once() -> None
def get_current_snapshot() -> Dict[str, Any]
def get_counters() -> Dict[str, int]
def get_execution_stats() -> Dict[str, float]Event Types:
-
job_completed— Job finished successfully -
job_failed— Job execution failed -
job_cancelled— Job was cancelled -
session_created— New session created -
blocked_function— Security blocklist hit -
health_check_fail— Engine health check failed
Counters Tracked:
-
completed_total,failed_total,cancelled_total -
total_created_sessions,active_sessions -
error_total,blocked_attempts,health_check_failures
Execution Time Statistics:
- Ring buffer of 100 most recent execution times
- Computes
avg_execution_ms,p95_execution_ms
Example:
from matlab_mcp.monitoring.collector import MetricsCollector
from matlab_mcp.config import load_config
config = load_config("config.yaml")
collector = MetricsCollector(config)
collector.record_event("job_completed", {
"job_id": "j-1",
"execution_ms": 523
})
snapshot = collector.get_current_snapshot()
# {
# "timestamp": 1699564800.5,
# "pool": {"total": 2, "available": 1, ...},
# "jobs": {"active": 0, "completed_total": 1, ...},
# "counters": {...}
# }File: src/matlab_mcp/monitoring/store.py
Async SQLite-based time-series database for persistent metrics storage.
class MetricsStore:
async def initialize() -> None
async def insert_metrics(metrics: Dict[str, Any]) -> None
async def insert_event(event: Dict[str, Any]) -> None
async def get_latest() -> Dict[str, Any]
async def close() -> NoneSchema:
-
metricstable: (timestamp, key, value, indexed by key and timestamp) -
eventstable: (timestamp, event_type, data_json)
Cleanup: Auto-prunes data older than retention_days (default 7)
Example:
from matlab_mcp.monitoring.store import MetricsStore
store = MetricsStore("./monitoring/metrics.db")
await store.initialize()
await store.insert_metrics({
"timestamp": time.time(),
"pool.utilization_pct": 50.0,
"jobs.active": 1
})
latest = await store.get_latest()
await store.close()File: src/matlab_mcp/monitoring/health.py
def evaluate_health(collector: MetricsCollector) -> Dict[str, Any]Returns health status object with:
-
status: "healthy" | "degraded" | "unhealthy" -
issues: List of detected problems - Engine counts, job counts, error rates, uptime
File: src/matlab_mcp/tools/core.py
async def execute_code_impl(
code: str,
executor: JobExecutor,
session_id: str,
temp_dir: Optional[str] = None
) -> Dict[str, Any]Executes MATLAB code with security validation and async promotion.
Returns: Same format as JobExecutor.execute()
async def check_code_impl(
code: str,
executor: JobExecutor,
session_id: str,
temp_dir: str
) -> Dict[str, Any]Lints MATLAB code and returns issues list.
Returns:
{
"status": "completed",
"issues": [
{"line": N, "column": C, "message": "...", "severity": "error|warning"}
],
"summary": "X error(s), Y warning(s)"
}async def get_workspace_impl(
executor: JobExecutor,
session_id: str,
temp_dir: Optional[str] = None
) -> Dict[str, Any]Returns output of MATLAB whos command (variable names, sizes, types).
File: src/matlab_mcp/tools/files.py
async def upload_data_impl(
filename: str,
content_base64: str,
session_id: str,
temp_dir: str,
validator: SecurityValidator,
config: AppConfig
) -> Dict[str, Any]Upload base64-encoded file to session temp directory.
Validation:
- Filename sanitized against path traversal
- File size checked against
max_upload_size_mb
Raises: ValueError on size violation or invalid filename
async def delete_file_impl(
filename: str,
temp_dir: str,
validator: SecurityValidator
) -> Dict[str, Any]Delete file from session directory.
async def list_files_impl(temp_dir: str) -> Dict[str, Any]List all files in session directory with sizes and modification times.
async def read_script_impl(
filename: str,
temp_dir: str,
validator: SecurityValidator,
config: AppConfig
) -> Dict[str, Any]Read MATLAB .m script file, with truncation support.
async def read_image_impl(
filename: str,
temp_dir: str,
validator: SecurityValidator,
config: AppConfig
) -> Dict[str, Any]Read image file and return as inline FastMCP Image object.
async def read_data_impl(
filename: str,
format: str = "summary",
temp_dir: str = None,
validator: SecurityValidator = None,
config: AppConfig = None
) -> Dict[str, Any]Read data files (.mat, .csv, .json, .xlsx, .txt).
Formats:
-
.matwithformat="summary": Variable names/sizes via MATLABwhos -
.matwithformat="raw": Base64-encoded content -
.csv,.txt,.json: Text content -
.xlsx: Base64-encoded content
File: src/matlab_mcp/tools/jobs.py
async def get_job_status_impl(
job_id: str,
tracker: JobTracker,
temp_dir: str
) -> Dict[str, Any]Get status and progress of running job, reading .progress file if available.
async def get_job_result_impl(
job_id: str,
tracker: JobTracker
) -> Dict[str, Any]Get full result of completed/failed job.
async def cancel_job_impl(
job_id: str,
tracker: JobTracker
) -> Dict[str, Any]Cancel pending or running job.
async def list_jobs_impl(
session_id: str,
tracker: JobTracker
) -> Dict[str, Any]List all jobs in session with summary info.
File: src/matlab_mcp/tools/discovery.py
async def list_toolboxes_impl(
executor: JobExecutor,
session_id: str,
temp_dir: Optional[str],
config: AppConfig
) -> Dict[str, Any]List available MATLAB toolboxes, filtered by config mode (whitelist/blacklist/all).
async def list_functions_impl(
toolbox_name: str,
executor: JobExecutor,
session_id: str,
temp_dir: Optional[str],
config: AppConfig
) -> Dict[str, Any]List functions in a toolbox.
Validation: Toolbox name validated with regex to prevent MATLAB injection
async def get_help_impl(
function_name: str,
executor: JobExecutor,
session_id: str,
temp_dir: Optional[str]
) -> Dict[str, Any]Get MATLAB help text for function.
Validation: Function name validated with regex to prevent injection
File: src/matlab_mcp/tools/admin.py
async def get_pool_status_impl(pool: EnginePoolManager) -> Dict[str, Any]Get engine pool status: total, available, busy, max engines.
File: src/matlab_mcp/tools/monitoring.py
async def get_server_metrics_impl(
collector: Optional[MetricsCollector]
) -> Dict[str, Any]Get comprehensive server metrics snapshot.
async def get_server_health_impl(
collector: Optional[MetricsCollector]
) -> Dict[str, Any]Get health status with issue detection.
async def get_error_log_impl(
collector: Optional[MetricsCollector],
hours: int = 24
) -> Dict[str, Any]Get recent error events aggregated by type.
File: src/matlab_mcp/tools/custom.py
def load_custom_tools(config_file: str) -> List[CustomToolDef]Load custom tool definitions from YAML file.
YAML Schema:
tools:
- name: my_tool
function: my_matlab_function
description: "What this tool does"
parameters:
- name: param1
type: number
description: "Parameter description"
default: 0
return_value:
type: object
description: "Return value description"def make_custom_tool_handler(
tool_def: CustomToolDef,
executor: JobExecutor,
config: AppConfig
) -> Callable[..., Awaitable[Dict[str, Any]]]Generate typed async handler function for custom tool with proper inspect.Signature for FastMCP introspection.
Example:
from matlab_mcp.tools.custom import load_custom_tools, make_custom_tool_handler
custom_defs = load_custom_tools("custom_tools.yaml")
for tool_def in custom_defs:
handler = make_custom_tool_handler(tool_def, executor, config)
mcp.tool(tool_def.name)(handler)flowchart TD
A["FastMCP Server"] -->|register_tool| B["Tool Handler Functions"]
B -->|execute_code| C["SecurityValidator.check_code"]
C -->|valid| D["JobExecutor.execute"]
C -->|blocked| E["Return BlockedFunctionError"]
D -->|acquire engine| F["EnginePoolManager"]
F -->|get from pool or create| G["MatlabEngineWrapper"]
G -->|inject context| H["MATLAB Workspace"]
H -->|eval code| I["MATLAB Engine Process"]
I -->|capture output/vars| J["Job Result"]
J -->|< sync_timeout?| K["Return Inline Result"]
J -->|>= sync_timeout| L["Promote to Async"]
L -->|return job_id| M["Client Polls get_job_status"]
K -->|format| N["ResultFormatter"]
N -->|format_result| O["MCP Response Dict"]
G -->|release| F
D -->|record metrics| P["MetricsCollector"]
P -->|sample_once| Q["MetricsStore"]
Q -->|persist| R["SQLite DB"]
P -->|get_current_snapshot| S["Dashboard/Health API"]
style A fill:#4A90E2
style B fill:#7B68EE