From 5f97b4d49d1f6be95ae3abab72db8c590e60e5a3 Mon Sep 17 00:00:00 2001 From: youhuanghe <1051233107@qq.com> Date: Sun, 22 Mar 2026 07:25:20 +0000 Subject: [PATCH 01/14] feat(box): add box runtime package and `lbp box` CLI command Move sandbox box runtime code from LangBot core into the plugin SDK as `langbot_plugin.box` package. Add `lbp box` CLI command to start the box runtime standalone, similar to `lbp rt` for the plugin runtime. New package includes: actions, backend (Docker/Podman), client, errors, models, runtime, security, server, and `__main__` entry point --- src/langbot_plugin/box/__init__.py | 1 + src/langbot_plugin/box/__main__.py | 5 + src/langbot_plugin/box/actions.py | 21 ++ src/langbot_plugin/box/backend.py | 388 +++++++++++++++++++++++++++++ src/langbot_plugin/box/client.py | 177 +++++++++++++ src/langbot_plugin/box/errors.py | 33 +++ src/langbot_plugin/box/models.py | 267 ++++++++++++++++++++ src/langbot_plugin/box/runtime.py | 386 ++++++++++++++++++++++++++++ src/langbot_plugin/box/security.py | 35 +++ src/langbot_plugin/box/server.py | 267 ++++++++++++++++++++ src/langbot_plugin/cli/__init__.py | 20 ++ 11 files changed, 1600 insertions(+) create mode 100644 src/langbot_plugin/box/__init__.py create mode 100644 src/langbot_plugin/box/__main__.py create mode 100644 src/langbot_plugin/box/actions.py create mode 100644 src/langbot_plugin/box/backend.py create mode 100644 src/langbot_plugin/box/client.py create mode 100644 src/langbot_plugin/box/errors.py create mode 100644 src/langbot_plugin/box/models.py create mode 100644 src/langbot_plugin/box/runtime.py create mode 100644 src/langbot_plugin/box/security.py create mode 100644 src/langbot_plugin/box/server.py diff --git a/src/langbot_plugin/box/__init__.py b/src/langbot_plugin/box/__init__.py new file mode 100644 index 0000000..c1ea6e1 --- /dev/null +++ b/src/langbot_plugin/box/__init__.py @@ -0,0 +1 @@ +"""LangBot Box runtime package.""" diff --git a/src/langbot_plugin/box/__main__.py b/src/langbot_plugin/box/__main__.py new file mode 100644 index 0000000..c6144f0 --- /dev/null +++ b/src/langbot_plugin/box/__main__.py @@ -0,0 +1,5 @@ +"""Allow running the Box server via ``python -m langbot_plugin.box``.""" + +from .server import main + +main() diff --git a/src/langbot_plugin/box/actions.py b/src/langbot_plugin/box/actions.py new file mode 100644 index 0000000..954c606 --- /dev/null +++ b/src/langbot_plugin/box/actions.py @@ -0,0 +1,21 @@ +"""Box-specific action types for the action RPC protocol.""" + +from __future__ import annotations + +from langbot_plugin.entities.io.actions.enums import ActionType + + +class LangBotToBoxAction(ActionType): + """Actions sent from LangBot to the Box runtime.""" + + HEALTH = 'box_health' + STATUS = 'box_status' + EXEC = 'box_exec' + CREATE_SESSION = 'box_create_session' + GET_SESSION = 'box_get_session' + GET_SESSIONS = 'box_get_sessions' + DELETE_SESSION = 'box_delete_session' + START_MANAGED_PROCESS = 'box_start_managed_process' + GET_MANAGED_PROCESS = 'box_get_managed_process' + GET_BACKEND_INFO = 'box_get_backend_info' + SHUTDOWN = 'box_shutdown' diff --git a/src/langbot_plugin/box/backend.py b/src/langbot_plugin/box/backend.py new file mode 100644 index 0000000..e5bbe56 --- /dev/null +++ b/src/langbot_plugin/box/backend.py @@ -0,0 +1,388 @@ +from __future__ import annotations + +import abc +import asyncio +import dataclasses +import datetime as dt +import logging +import re +import shlex +import shutil +import uuid + +from .errors import BoxError +from .models import ( + DEFAULT_BOX_MOUNT_PATH, + BoxExecutionResult, + BoxExecutionStatus, + BoxHostMountMode, + BoxNetworkMode, + BoxSessionInfo, + BoxSpec, +) +from .security import validate_sandbox_security + +# Hard cap on raw subprocess output to prevent unbounded memory usage. +# Container timeout already bounds duration, but fast commands can still +# produce large output within the time limit. After this many bytes the +# remaining output is discarded before decoding. +_MAX_RAW_OUTPUT_BYTES = 1_048_576 # 1 MB per stream + + +@dataclasses.dataclass(slots=True) +class _CommandResult: + return_code: int + stdout: str + stderr: str + timed_out: bool = False + + +class BaseSandboxBackend(abc.ABC): + name: str + instance_id: str = '' + + def __init__(self, logger: logging.Logger): + self.logger = logger + + async def initialize(self): + return None + + @abc.abstractmethod + async def is_available(self) -> bool: + pass + + @abc.abstractmethod + async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: + pass + + @abc.abstractmethod + async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult: + pass + + @abc.abstractmethod + async def stop_session(self, session: BoxSessionInfo): + pass + + async def start_managed_process(self, session: BoxSessionInfo, spec): + raise BoxError(f'{self.name} backend does not support managed processes') + + async def cleanup_orphaned_containers(self, current_instance_id: str = ''): + """Remove lingering containers from previous runs. No-op by default.""" + pass + + +class CLISandboxBackend(BaseSandboxBackend): + command: str + + def __init__(self, logger: logging.Logger, command: str, backend_name: str): + super().__init__(logger) + self.command = command + self.name = backend_name + + async def is_available(self) -> bool: + if shutil.which(self.command) is None: + return False + + result = await self._run_command([self.command, 'info'], timeout_sec=5, check=False) + return result.return_code == 0 and not result.timed_out + + async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: + validate_sandbox_security(spec) + + now = dt.datetime.now(dt.UTC) + container_name = self._build_container_name(spec.session_id) + + args = [ + self.command, + 'run', + '-d', + '--rm', + '--name', + container_name, + '--label', + 'langbot.box=true', + '--label', + f'langbot.session_id={spec.session_id}', + '--label', + f'langbot.box.instance_id={self.instance_id}', + ] + + if spec.network == BoxNetworkMode.OFF: + args.extend(['--network', 'none']) + + # Resource limits + args.extend(['--cpus', str(spec.cpus)]) + args.extend(['--memory', f'{spec.memory_mb}m']) + args.extend(['--pids-limit', str(spec.pids_limit)]) + + if spec.read_only_rootfs: + args.append('--read-only') + args.extend(['--tmpfs', '/tmp:size=64m']) + + if spec.host_path is not None and spec.host_path_mode != BoxHostMountMode.NONE: + mount_spec = f'{spec.host_path}:{DEFAULT_BOX_MOUNT_PATH}:{spec.host_path_mode.value}' + args.extend(['-v', mount_spec]) + + args.extend([spec.image, 'sh', '-lc', 'while true; do sleep 3600; done']) + + self.logger.info( + f'LangBot Box backend start_session: backend={self.name} ' + f'session_id={spec.session_id} container_name={container_name} ' + f'image={spec.image} network={spec.network.value} ' + f'host_path={spec.host_path} host_path_mode={spec.host_path_mode.value} ' + f'cpus={spec.cpus} memory_mb={spec.memory_mb} pids_limit={spec.pids_limit} ' + f'read_only_rootfs={spec.read_only_rootfs}' + ) + + await self._run_command(args, timeout_sec=30, check=True) + + return BoxSessionInfo( + session_id=spec.session_id, + backend_name=self.name, + backend_session_id=container_name, + image=spec.image, + network=spec.network, + host_path=spec.host_path, + host_path_mode=spec.host_path_mode, + cpus=spec.cpus, + memory_mb=spec.memory_mb, + pids_limit=spec.pids_limit, + read_only_rootfs=spec.read_only_rootfs, + created_at=now, + last_used_at=now, + ) + + async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult: + start = dt.datetime.now(dt.UTC) + args = [self.command, 'exec'] + + for key, value in spec.env.items(): + args.extend(['-e', f'{key}={value}']) + + args.extend( + [ + session.backend_session_id, + 'sh', + '-lc', + self._build_exec_command(spec.workdir, spec.cmd), + ] + ) + + cmd_preview = spec.cmd.strip() + if len(cmd_preview) > 400: + cmd_preview = f'{cmd_preview[:397]}...' + self.logger.info( + f'LangBot Box backend exec: backend={self.name} ' + f'session_id={session.session_id} container_name={session.backend_session_id} ' + f'workdir={spec.workdir} timeout_sec={spec.timeout_sec} ' + f'env_keys={sorted(spec.env.keys())} cmd={cmd_preview}' + ) + + result = await self._run_command(args, timeout_sec=spec.timeout_sec, check=False) + duration_ms = int((dt.datetime.now(dt.UTC) - start).total_seconds() * 1000) + + if result.timed_out: + return BoxExecutionResult( + session_id=session.session_id, + backend_name=self.name, + status=BoxExecutionStatus.TIMED_OUT, + exit_code=None, + stdout=result.stdout, + stderr=result.stderr or f'Command timed out after {spec.timeout_sec} seconds.', + duration_ms=duration_ms, + ) + + return BoxExecutionResult( + session_id=session.session_id, + backend_name=self.name, + status=BoxExecutionStatus.COMPLETED, + exit_code=result.return_code, + stdout=result.stdout, + stderr=result.stderr, + duration_ms=duration_ms, + ) + + async def stop_session(self, session: BoxSessionInfo): + self.logger.info( + f'LangBot Box backend stop_session: backend={self.name} ' + f'session_id={session.session_id} container_name={session.backend_session_id}' + ) + await self._run_command( + [self.command, 'rm', '-f', session.backend_session_id], + timeout_sec=20, + check=False, + ) + + async def cleanup_orphaned_containers(self, current_instance_id: str = ''): + """Remove langbot.box containers from previous instances. + + Only removes containers whose ``langbot.box.instance_id`` label does + NOT match *current_instance_id*. Containers without the label (from + older versions) are also removed. + """ + result = await self._run_command( + [ + self.command, + 'ps', + '-a', + '--filter', + 'label=langbot.box=true', + '--format', + '{{.ID}}\t{{.Label "langbot.box.instance_id"}}', + ], + timeout_sec=10, + check=False, + ) + if result.return_code != 0 or not result.stdout.strip(): + return + orphan_ids = [] + for line in result.stdout.strip().split('\n'): + line = line.strip() + if not line: + continue + parts = line.split('\t', 1) + cid = parts[0].strip() + label_instance = parts[1].strip() if len(parts) > 1 else '' + if label_instance != current_instance_id: + orphan_ids.append(cid) + if not orphan_ids: + return + for cid in orphan_ids: + self.logger.info(f'Cleaning up orphaned Box container: {cid}') + await self._run_command( + [self.command, 'rm', '-f', *orphan_ids], + timeout_sec=30, + check=False, + ) + + async def start_managed_process(self, session: BoxSessionInfo, spec) -> asyncio.subprocess.Process: + args = [self.command, 'exec', '-i'] + + for key, value in spec.env.items(): + args.extend(['-e', f'{key}={value}']) + + args.extend( + [ + session.backend_session_id, + 'sh', + '-lc', + self._build_spawn_command(spec.cwd, spec.command, spec.args), + ] + ) + + self.logger.info( + f'LangBot Box backend start_managed_process: backend={self.name} ' + f'session_id={session.session_id} container_name={session.backend_session_id} ' + f'cwd={spec.cwd} env_keys={sorted(spec.env.keys())} command={spec.command} args={spec.args}' + ) + + return await asyncio.create_subprocess_exec( + *args, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + def _build_container_name(self, session_id: str) -> str: + normalized = re.sub(r'[^a-zA-Z0-9_.-]+', '-', session_id).strip('-').lower() or 'session' + suffix = uuid.uuid4().hex[:8] + return f'langbot-box-{normalized[:32]}-{suffix}' + + def _build_exec_command(self, workdir: str, cmd: str) -> str: + quoted_workdir = shlex.quote(workdir) + return f'mkdir -p {quoted_workdir} && cd {quoted_workdir} && {cmd}' + + def _build_spawn_command(self, cwd: str, command: str, args: list[str]) -> str: + quoted_cwd = shlex.quote(cwd) + command_parts = [shlex.quote(command), *[shlex.quote(arg) for arg in args]] + return f'mkdir -p {quoted_cwd} && cd {quoted_cwd} && exec {" ".join(command_parts)}' + + async def _run_command( + self, + args: list[str], + timeout_sec: int, + check: bool, + ) -> _CommandResult: + process = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout_task = asyncio.create_task(self._read_stream(process.stdout)) + stderr_task = asyncio.create_task(self._read_stream(process.stderr)) + + timed_out = False + try: + await asyncio.wait_for(process.wait(), timeout=timeout_sec) + except asyncio.TimeoutError: + process.kill() + timed_out = True + await process.wait() + + stdout_bytes, stdout_total = await stdout_task + stderr_bytes, stderr_total = await stderr_task + + if timed_out: + return _CommandResult( + return_code=-1, + stdout=self._clip_captured_bytes(stdout_bytes, stdout_total), + stderr=self._clip_captured_bytes(stderr_bytes, stderr_total), + timed_out=True, + ) + + stdout = self._clip_captured_bytes(stdout_bytes, stdout_total) + stderr = self._clip_captured_bytes(stderr_bytes, stderr_total) + + if check and process.returncode != 0: + raise BoxError(self._format_cli_error(stderr or stdout or 'unknown backend error')) + + return _CommandResult( + return_code=process.returncode, + stdout=stdout, + stderr=stderr, + timed_out=False, + ) + + @staticmethod + def _clip_captured_bytes(data: bytes, total_size: int, limit: int = _MAX_RAW_OUTPUT_BYTES) -> str: + text = data.decode('utf-8', errors='replace').strip() + if total_size > limit: + text += f'\n... [raw output clipped at {limit} bytes, {total_size - limit} bytes discarded]' + return text + + @staticmethod + async def _read_stream( + stream: asyncio.StreamReader | None, + limit: int = _MAX_RAW_OUTPUT_BYTES, + ) -> tuple[bytes, int]: + if stream is None: + return b'', 0 + + chunks = bytearray() + total_size = 0 + while True: + chunk = await stream.read(65536) + if not chunk: + break + total_size += len(chunk) + remaining = limit - len(chunks) + if remaining > 0: + chunks.extend(chunk[:remaining]) + + return bytes(chunks), total_size + + def _format_cli_error(self, message: str) -> str: + message = ' '.join(message.split()) + if len(message) > 300: + message = f'{message[:297]}...' + return f'{self.name} backend error: {message}' + + +class PodmanBackend(CLISandboxBackend): + def __init__(self, logger: logging.Logger): + super().__init__(logger=logger, command='podman', backend_name='podman') + + +class DockerBackend(CLISandboxBackend): + def __init__(self, logger: logging.Logger): + super().__init__(logger=logger, command='docker', backend_name='docker') diff --git a/src/langbot_plugin/box/client.py b/src/langbot_plugin/box/client.py new file mode 100644 index 0000000..36a525a --- /dev/null +++ b/src/langbot_plugin/box/client.py @@ -0,0 +1,177 @@ +"""BoxRuntimeClient abstraction for Box Runtime access.""" + +from __future__ import annotations + +import abc +import logging +from typing import Any + +from langbot_plugin.runtime.io.handler import Handler + +from .actions import LangBotToBoxAction +from .errors import BoxError, BoxRuntimeUnavailableError +from .models import ( + BoxExecutionResult, + BoxExecutionStatus, + BoxManagedProcessInfo, + BoxManagedProcessSpec, + BoxSpec, +) + + +class BoxRuntimeClient(abc.ABC): + """Abstract interface that BoxService uses to talk to a Box Runtime.""" + + @abc.abstractmethod + async def initialize(self) -> None: ... + + @abc.abstractmethod + async def execute(self, spec: BoxSpec) -> BoxExecutionResult: ... + + @abc.abstractmethod + async def shutdown(self) -> None: ... + + @abc.abstractmethod + async def get_status(self) -> dict: ... + + @abc.abstractmethod + async def get_sessions(self) -> list[dict]: ... + + @abc.abstractmethod + async def get_backend_info(self) -> dict: ... + + @abc.abstractmethod + async def delete_session(self, session_id: str) -> None: ... + + @abc.abstractmethod + async def create_session(self, spec: BoxSpec) -> dict: ... + + @abc.abstractmethod + async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> BoxManagedProcessInfo: ... + + @abc.abstractmethod + async def get_managed_process(self, session_id: str) -> BoxManagedProcessInfo: ... + + @abc.abstractmethod + async def get_session(self, session_id: str) -> dict: ... + + +def _translate_action_error(exc: Exception) -> BoxError: + """Convert an ActionCallError message back into the appropriate BoxError subclass.""" + from .errors import ( + BoxBackendUnavailableError, + BoxManagedProcessConflictError, + BoxManagedProcessNotFoundError, + BoxSessionConflictError, + BoxSessionNotFoundError, + BoxValidationError, + ) + + msg = str(exc) + _ERROR_PREFIX_MAP: list[tuple[str, type[BoxError]]] = [ + ('BoxValidationError:', BoxValidationError), + ('BoxSessionNotFoundError:', BoxSessionNotFoundError), + ('BoxSessionConflictError:', BoxSessionConflictError), + ('BoxManagedProcessNotFoundError:', BoxManagedProcessNotFoundError), + ('BoxManagedProcessConflictError:', BoxManagedProcessConflictError), + ('BoxBackendUnavailableError:', BoxBackendUnavailableError), + ] + for prefix, cls in _ERROR_PREFIX_MAP: + if prefix in msg: + return cls(msg) + return BoxError(msg) + + +class ActionRPCBoxClient(BoxRuntimeClient): + """Client that talks to BoxRuntime via the action RPC protocol.""" + + def __init__(self, logger: logging.Logger): + self._logger = logger + self._handler: Handler | None = None + + @property + def handler(self) -> Handler: + if self._handler is None: + raise BoxRuntimeUnavailableError('box runtime not connected') + return self._handler + + def set_handler(self, handler: Handler) -> None: + self._handler = handler + + async def _call(self, action: LangBotToBoxAction, data: dict[str, Any], timeout: float = 15.0) -> dict[str, Any]: + try: + return await self.handler.call_action(action, data, timeout=timeout) + except BoxRuntimeUnavailableError: + raise + except Exception as exc: + raise _translate_action_error(exc) from exc + + async def initialize(self) -> None: + try: + await self._call(LangBotToBoxAction.HEALTH, {}) + self._logger.info('LangBot Box runtime connected via action RPC.') + except Exception as exc: + raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + + async def execute(self, spec: BoxSpec) -> BoxExecutionResult: + data = await self._call(LangBotToBoxAction.EXEC, spec.model_dump(mode='json'), timeout=300.0) + return BoxExecutionResult( + session_id=data['session_id'], + backend_name=data['backend_name'], + status=BoxExecutionStatus(data['status']), + exit_code=data.get('exit_code'), + stdout=data.get('stdout', ''), + stderr=data.get('stderr', ''), + duration_ms=data['duration_ms'], + ) + + async def shutdown(self) -> None: + if self._handler is not None: + try: + await self._call(LangBotToBoxAction.SHUTDOWN, {}) + except Exception: + pass + self._handler = None + + async def get_status(self) -> dict: + return await self._call(LangBotToBoxAction.STATUS, {}) + + async def get_sessions(self) -> list[dict]: + data = await self._call(LangBotToBoxAction.GET_SESSIONS, {}) + return data['sessions'] + + async def get_session(self, session_id: str) -> dict: + return await self._call(LangBotToBoxAction.GET_SESSION, {'session_id': session_id}) + + async def get_backend_info(self) -> dict: + return await self._call(LangBotToBoxAction.GET_BACKEND_INFO, {}) + + async def delete_session(self, session_id: str) -> None: + await self._call(LangBotToBoxAction.DELETE_SESSION, {'session_id': session_id}) + + async def create_session(self, spec: BoxSpec) -> dict: + return await self._call(LangBotToBoxAction.CREATE_SESSION, spec.model_dump(mode='json')) + + async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> BoxManagedProcessInfo: + data = await self._call( + LangBotToBoxAction.START_MANAGED_PROCESS, + {'session_id': session_id, 'spec': spec.model_dump(mode='json')}, + ) + return BoxManagedProcessInfo.model_validate(data) + + async def get_managed_process(self, session_id: str) -> BoxManagedProcessInfo: + data = await self._call(LangBotToBoxAction.GET_MANAGED_PROCESS, {'session_id': session_id}) + return BoxManagedProcessInfo.model_validate(data) + + def get_managed_process_websocket_url(self, session_id: str, ws_relay_base_url: str) -> str: + base = ws_relay_base_url + if base.startswith('https://'): + scheme = 'wss://' + suffix = base[len('https://') :] + elif base.startswith('http://'): + scheme = 'ws://' + suffix = base[len('http://') :] + else: + scheme = 'ws://' + suffix = base + return f'{scheme}{suffix}/v1/sessions/{session_id}/managed-process/ws' diff --git a/src/langbot_plugin/box/errors.py b/src/langbot_plugin/box/errors.py new file mode 100644 index 0000000..f6a8e86 --- /dev/null +++ b/src/langbot_plugin/box/errors.py @@ -0,0 +1,33 @@ +from __future__ import annotations + + +class BoxError(RuntimeError): + """Base error for LangBot Box failures.""" + + +class BoxValidationError(BoxError): + """Raised when sandbox_exec arguments are invalid.""" + + +class BoxBackendUnavailableError(BoxError): + """Raised when no supported container backend is available.""" + + +class BoxRuntimeUnavailableError(BoxError): + """Raised when the standalone Box Runtime service is unavailable.""" + + +class BoxSessionConflictError(BoxError): + """Raised when an existing session cannot satisfy a new request.""" + + +class BoxSessionNotFoundError(BoxError): + """Raised when a referenced session does not exist.""" + + +class BoxManagedProcessConflictError(BoxError): + """Raised when a session already has an active managed process.""" + + +class BoxManagedProcessNotFoundError(BoxError): + """Raised when a referenced managed process does not exist.""" diff --git a/src/langbot_plugin/box/models.py b/src/langbot_plugin/box/models.py new file mode 100644 index 0000000..90496ca --- /dev/null +++ b/src/langbot_plugin/box/models.py @@ -0,0 +1,267 @@ +from __future__ import annotations + +import datetime as dt +import enum + +import pydantic + + +DEFAULT_BOX_IMAGE = 'python:3.11-slim' +DEFAULT_BOX_MOUNT_PATH = '/workspace' + + +class BoxNetworkMode(str, enum.Enum): + OFF = 'off' + ON = 'on' + + +class BoxExecutionStatus(str, enum.Enum): + COMPLETED = 'completed' + TIMED_OUT = 'timed_out' + + +class BoxHostMountMode(str, enum.Enum): + NONE = 'none' + READ_ONLY = 'ro' + READ_WRITE = 'rw' + + +class BoxManagedProcessStatus(str, enum.Enum): + RUNNING = 'running' + EXITED = 'exited' + + +class BoxSpec(pydantic.BaseModel): + cmd: str = '' + workdir: str = '/workspace' + timeout_sec: int = 30 + network: BoxNetworkMode = BoxNetworkMode.OFF + session_id: str + env: dict[str, str] = pydantic.Field(default_factory=dict) + image: str = DEFAULT_BOX_IMAGE + host_path: str | None = None + host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE + # Resource limits + cpus: float = 1.0 + memory_mb: int = 512 + pids_limit: int = 128 + read_only_rootfs: bool = True + + @pydantic.field_validator('cmd') + @classmethod + def validate_cmd(cls, value: str) -> str: + return value.strip() + + @pydantic.field_validator('workdir') + @classmethod + def validate_workdir(cls, value: str) -> str: + value = value.strip() + if not value.startswith('/'): + raise ValueError('workdir must be an absolute path inside the sandbox') + return value + + @pydantic.field_validator('timeout_sec') + @classmethod + def validate_timeout_sec(cls, value: int) -> int: + if value <= 0: + raise ValueError('timeout_sec must be greater than 0') + return value + + @pydantic.field_validator('cpus') + @classmethod + def validate_cpus(cls, value: float) -> float: + if value <= 0: + raise ValueError('cpus must be greater than 0') + return value + + @pydantic.field_validator('memory_mb') + @classmethod + def validate_memory_mb(cls, value: int) -> int: + if value < 32: + raise ValueError('memory_mb must be at least 32') + return value + + @pydantic.field_validator('pids_limit') + @classmethod + def validate_pids_limit(cls, value: int) -> int: + if value < 1: + raise ValueError('pids_limit must be at least 1') + return value + + @pydantic.field_validator('session_id') + @classmethod + def validate_session_id(cls, value: str) -> str: + value = value.strip() + if not value: + raise ValueError('session_id must not be empty') + return value + + @pydantic.field_validator('env') + @classmethod + def validate_env(cls, value: dict[str, str]) -> dict[str, str]: + return {str(k): str(v) for k, v in value.items()} + + @pydantic.field_validator('host_path') + @classmethod + def validate_host_path(cls, value: str | None) -> str | None: + if value is None: + return None + value = value.strip() + if not value.startswith('/'): + raise ValueError('host_path must be an absolute host path') + return value + + @pydantic.model_validator(mode='after') + def validate_host_mount_consistency(self) -> 'BoxSpec': + if self.host_path is None: + return self + if self.host_path_mode == BoxHostMountMode.NONE: + return self + if not self.workdir.startswith(DEFAULT_BOX_MOUNT_PATH): + raise ValueError('workdir must stay under /workspace when host_path is provided') + return self + + +class BoxProfile(pydantic.BaseModel): + """Preset sandbox configuration. + + Provides default values for BoxSpec fields and optionally locks fields + so that tool-call parameters cannot override them. + """ + + name: str + image: str = DEFAULT_BOX_IMAGE + network: BoxNetworkMode = BoxNetworkMode.OFF + timeout_sec: int = 30 + host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE + max_timeout_sec: int = 120 + # Resource limits + cpus: float = 1.0 + memory_mb: int = 512 + pids_limit: int = 128 + read_only_rootfs: bool = True + locked: frozenset[str] = frozenset() + + model_config = pydantic.ConfigDict(frozen=True) + + +BUILTIN_PROFILES: dict[str, BoxProfile] = { + 'default': BoxProfile( + name='default', + network=BoxNetworkMode.OFF, + host_path_mode=BoxHostMountMode.READ_WRITE, + cpus=1.0, + memory_mb=512, + pids_limit=128, + read_only_rootfs=True, + max_timeout_sec=120, + ), + 'offline_readonly': BoxProfile( + name='offline_readonly', + network=BoxNetworkMode.OFF, + host_path_mode=BoxHostMountMode.READ_ONLY, + cpus=0.5, + memory_mb=256, + pids_limit=64, + read_only_rootfs=True, + max_timeout_sec=60, + locked=frozenset({'network', 'host_path_mode', 'read_only_rootfs'}), + ), + 'network_basic': BoxProfile( + name='network_basic', + network=BoxNetworkMode.ON, + host_path_mode=BoxHostMountMode.READ_WRITE, + cpus=1.0, + memory_mb=512, + pids_limit=128, + read_only_rootfs=True, + max_timeout_sec=120, + ), + 'network_extended': BoxProfile( + name='network_extended', + network=BoxNetworkMode.ON, + host_path_mode=BoxHostMountMode.READ_WRITE, + cpus=2.0, + memory_mb=1024, + pids_limit=256, + read_only_rootfs=False, + max_timeout_sec=300, + ), +} + + +class BoxSessionInfo(pydantic.BaseModel): + session_id: str + backend_name: str + backend_session_id: str + image: str + network: BoxNetworkMode + host_path: str | None = None + host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE + cpus: float = 1.0 + memory_mb: int = 512 + pids_limit: int = 128 + read_only_rootfs: bool = True + created_at: dt.datetime + last_used_at: dt.datetime + + +class BoxManagedProcessSpec(pydantic.BaseModel): + command: str + args: list[str] = pydantic.Field(default_factory=list) + env: dict[str, str] = pydantic.Field(default_factory=dict) + cwd: str = '/workspace' + + @pydantic.field_validator('command') + @classmethod + def validate_command(cls, value: str) -> str: + value = value.strip() + if not value: + raise ValueError('command must not be empty') + return value + + @pydantic.field_validator('args') + @classmethod + def validate_args(cls, value: list[str]) -> list[str]: + return [str(item) for item in value] + + @pydantic.field_validator('env') + @classmethod + def validate_env(cls, value: dict[str, str]) -> dict[str, str]: + return {str(k): str(v) for k, v in value.items()} + + @pydantic.field_validator('cwd') + @classmethod + def validate_cwd(cls, value: str) -> str: + value = value.strip() + if not value.startswith('/'): + raise ValueError('cwd must be an absolute path inside the sandbox') + return value + + +class BoxManagedProcessInfo(pydantic.BaseModel): + session_id: str + status: BoxManagedProcessStatus + command: str + args: list[str] + cwd: str + env_keys: list[str] + attached: bool = False + started_at: dt.datetime + exited_at: dt.datetime | None = None + exit_code: int | None = None + stderr_preview: str = '' + + +class BoxExecutionResult(pydantic.BaseModel): + session_id: str + backend_name: str + status: BoxExecutionStatus + exit_code: int | None + stdout: str = '' + stderr: str = '' + duration_ms: int + + @property + def ok(self) -> bool: + return self.status == BoxExecutionStatus.COMPLETED and self.exit_code == 0 diff --git a/src/langbot_plugin/box/runtime.py b/src/langbot_plugin/box/runtime.py new file mode 100644 index 0000000..36f8c13 --- /dev/null +++ b/src/langbot_plugin/box/runtime.py @@ -0,0 +1,386 @@ +from __future__ import annotations + +import asyncio +import collections +import dataclasses +import datetime as dt +import logging +import uuid + +from .backend import BaseSandboxBackend, DockerBackend, PodmanBackend +from .errors import ( + BoxBackendUnavailableError, + BoxManagedProcessConflictError, + BoxManagedProcessNotFoundError, + BoxSessionConflictError, + BoxSessionNotFoundError, + BoxValidationError, +) +from .models import ( + BoxExecutionResult, + BoxExecutionStatus, + BoxManagedProcessInfo, + BoxManagedProcessSpec, + BoxManagedProcessStatus, + BoxSessionInfo, + BoxSpec, +) + +_UTC = dt.timezone.utc +_MANAGED_PROCESS_STDERR_PREVIEW_LIMIT = 4000 + + +@dataclasses.dataclass(slots=True) +class _ManagedProcess: + spec: BoxManagedProcessSpec + process: asyncio.subprocess.Process + started_at: dt.datetime + attach_lock: asyncio.Lock + stderr_chunks: collections.deque[str] + stderr_total_len: int = 0 + exit_code: int | None = None + exited_at: dt.datetime | None = None + + @property + def is_running(self) -> bool: + return self.exit_code is None and self.process.returncode is None + + +@dataclasses.dataclass(slots=True) +class _RuntimeSession: + info: BoxSessionInfo + lock: asyncio.Lock + managed_process: _ManagedProcess | None = None + + +class BoxRuntime: + def __init__( + self, + logger: logging.Logger, + backends: list[BaseSandboxBackend] | None = None, + session_ttl_sec: int = 300, + ): + self.logger = logger + self.backends = backends or [PodmanBackend(logger), DockerBackend(logger)] + self.session_ttl_sec = session_ttl_sec + self._backend: BaseSandboxBackend | None = None + self._sessions: dict[str, _RuntimeSession] = {} + self._lock = asyncio.Lock() + self.instance_id = uuid.uuid4().hex[:12] + + async def initialize(self): + self._backend = await self._select_backend() + if self._backend is not None: + self._backend.instance_id = self.instance_id + try: + await self._backend.cleanup_orphaned_containers(self.instance_id) + except Exception as exc: + self.logger.warning(f'LangBot Box orphan container cleanup failed: {exc}') + + async def execute(self, spec: BoxSpec) -> BoxExecutionResult: + if not spec.cmd: + raise BoxValidationError('cmd must not be empty') + session = await self._get_or_create_session(spec) + + async with session.lock: + self.logger.info( + 'LangBot Box execute: ' + f'session_id={spec.session_id} ' + f'backend_session_id={session.info.backend_session_id} ' + f'backend={session.info.backend_name} ' + f'workdir={spec.workdir} ' + f'timeout_sec={spec.timeout_sec}' + ) + result = await (await self._get_backend()).exec(session.info, spec) + + async with self._lock: + now = dt.datetime.now(_UTC) + if spec.session_id in self._sessions: + self._sessions[spec.session_id].info.last_used_at = now + + if result.status == BoxExecutionStatus.TIMED_OUT: + await self._drop_session_locked(spec.session_id) + + return result + + async def shutdown(self): + async with self._lock: + session_ids = list(self._sessions.keys()) + for session_id in session_ids: + await self._drop_session_locked(session_id) + + async def create_session(self, spec: BoxSpec) -> dict: + session = await self._get_or_create_session(spec) + return self._session_to_dict(session.info) + + async def delete_session(self, session_id: str) -> None: + async with self._lock: + if session_id not in self._sessions: + raise BoxSessionNotFoundError(f'session {session_id} not found') + await self._drop_session_locked(session_id) + + async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> dict: + async with self._lock: + runtime_session = self._sessions.get(session_id) + if runtime_session is None: + raise BoxSessionNotFoundError(f'session {session_id} not found') + + async with runtime_session.lock: + existing = runtime_session.managed_process + if existing is not None and existing.is_running: + raise BoxManagedProcessConflictError(f'session {session_id} already has a managed process') + + backend = await self._get_backend() + process = await backend.start_managed_process(runtime_session.info, spec) + managed_process = _ManagedProcess( + spec=spec, + process=process, + started_at=dt.datetime.now(_UTC), + attach_lock=asyncio.Lock(), + stderr_chunks=collections.deque(), + ) + runtime_session.managed_process = managed_process + runtime_session.info.last_used_at = dt.datetime.now(_UTC) + asyncio.create_task(self._drain_managed_process_stderr(runtime_session.info.session_id, managed_process)) + asyncio.create_task(self._watch_managed_process(runtime_session.info.session_id, managed_process)) + return self._managed_process_to_dict(runtime_session.info.session_id, managed_process) + + def get_managed_process(self, session_id: str) -> dict: + runtime_session = self._sessions.get(session_id) + if runtime_session is None: + raise BoxSessionNotFoundError(f'session {session_id} not found') + if runtime_session.managed_process is None: + raise BoxManagedProcessNotFoundError(f'session {session_id} has no managed process') + return self._managed_process_to_dict(session_id, runtime_session.managed_process) + + # ── Observability ───────────────────────────────────────────────── + + async def get_backend_info(self) -> dict: + backend = self._backend + if backend is None: + return {'name': None, 'available': False} + try: + available = await backend.is_available() + except Exception: + available = False + return {'name': backend.name, 'available': available} + + def get_sessions(self) -> list[dict]: + return [self._session_to_dict(s.info) for s in self._sessions.values()] + + def get_session(self, session_id: str) -> dict: + runtime_session = self._sessions.get(session_id) + if runtime_session is None: + raise BoxSessionNotFoundError(f'session {session_id} not found') + result = self._session_to_dict(runtime_session.info) + if runtime_session.managed_process is not None: + result['managed_process'] = self._managed_process_to_dict(session_id, runtime_session.managed_process) + return result + + async def get_status(self) -> dict: + backend_info = await self.get_backend_info() + return { + 'backend': backend_info, + 'active_sessions': len(self._sessions), + 'managed_processes': sum( + 1 + for runtime_session in self._sessions.values() + if runtime_session.managed_process is not None and runtime_session.managed_process.is_running + ), + 'session_ttl_sec': self.session_ttl_sec, + } + + async def _get_or_create_session(self, spec: BoxSpec) -> _RuntimeSession: + async with self._lock: + await self._reap_expired_sessions_locked() + + existing = self._sessions.get(spec.session_id) + if existing is not None: + self._assert_session_compatible(existing.info, spec) + existing.info.last_used_at = dt.datetime.now(_UTC) + self.logger.info( + 'LangBot Box session reused: ' + f'session_id={spec.session_id} ' + f'backend_session_id={existing.info.backend_session_id} ' + f'backend={existing.info.backend_name}' + ) + return existing + + backend = await self._get_backend() + info = await backend.start_session(spec) + runtime_session = _RuntimeSession(info=info, lock=asyncio.Lock()) + self._sessions[spec.session_id] = runtime_session + self.logger.info( + 'LangBot Box session created: ' + f'session_id={spec.session_id} ' + f'backend_session_id={info.backend_session_id} ' + f'backend={info.backend_name} ' + f'image={info.image} ' + f'network={info.network.value} ' + f'host_path={info.host_path} ' + f'host_path_mode={info.host_path_mode.value}' + ) + return runtime_session + + async def _get_backend(self) -> BaseSandboxBackend: + if self._backend is None: + self._backend = await self._select_backend() + if self._backend is None: + raise BoxBackendUnavailableError( + 'LangBot Box backend unavailable. Install and start Podman or Docker before using sandbox_exec.' + ) + return self._backend + + async def _select_backend(self) -> BaseSandboxBackend | None: + for backend in self.backends: + try: + await backend.initialize() + if await backend.is_available(): + self.logger.info(f'LangBot Box using backend: {backend.name}') + return backend + except Exception as exc: + self.logger.warning(f'LangBot Box backend {backend.name} probe failed: {exc}') + + self.logger.warning('LangBot Box backend unavailable: neither Podman nor Docker is ready') + return None + + async def _reap_expired_sessions_locked(self): + if self.session_ttl_sec <= 0: + return + + deadline = dt.datetime.now(_UTC) - dt.timedelta(seconds=self.session_ttl_sec) + expired_session_ids = [ + session_id + for session_id, session in self._sessions.items() + if session.info.last_used_at < deadline + and not (session.managed_process is not None and session.managed_process.is_running) + ] + + for session_id in expired_session_ids: + await self._drop_session_locked(session_id) + + async def _drop_session_locked(self, session_id: str): + runtime_session = self._sessions.pop(session_id, None) + if runtime_session is None or self._backend is None: + return + + await self._terminate_managed_process(runtime_session) + + try: + self.logger.info( + 'LangBot Box session cleanup: ' + f'session_id={session_id} ' + f'backend_session_id={runtime_session.info.backend_session_id} ' + f'backend={runtime_session.info.backend_name}' + ) + await self._backend.stop_session(runtime_session.info) + except Exception as exc: + self.logger.warning(f'Failed to clean up box session {session_id}: {exc}') + + def _assert_session_compatible(self, session: BoxSessionInfo, spec: BoxSpec): + _COMPAT_FIELDS = ( + 'network', + 'image', + 'host_path', + 'host_path_mode', + 'cpus', + 'memory_mb', + 'pids_limit', + 'read_only_rootfs', + ) + for field in _COMPAT_FIELDS: + session_val = getattr(session, field) + spec_val = getattr(spec, field) + if session_val != spec_val: + display = session_val.value if hasattr(session_val, 'value') else session_val + raise BoxSessionConflictError( + f'sandbox_exec session {spec.session_id} already exists with {field}={display}' + ) + + async def _drain_managed_process_stderr(self, session_id: str, managed_process: _ManagedProcess) -> None: + stream = managed_process.process.stderr + if stream is None: + return + + try: + while True: + chunk = await stream.readline() + if not chunk: + break + text = chunk.decode('utf-8', errors='replace').rstrip() + if not text: + continue + managed_process.stderr_chunks.append(text) + managed_process.stderr_total_len += len(text) + 1 # +1 for '\n' separator + while ( + managed_process.stderr_total_len > _MANAGED_PROCESS_STDERR_PREVIEW_LIMIT + and managed_process.stderr_chunks + ): + removed = managed_process.stderr_chunks.popleft() + managed_process.stderr_total_len -= len(removed) + 1 + self.logger.info(f'LangBot Box managed process stderr: session_id={session_id} {text}') + except Exception as exc: + self.logger.warning(f'Failed to drain managed process stderr for {session_id}: {exc}') + + async def _watch_managed_process(self, session_id: str, managed_process: _ManagedProcess) -> None: + return_code = await managed_process.process.wait() + managed_process.exit_code = return_code + managed_process.exited_at = dt.datetime.now(_UTC) + runtime_session = self._sessions.get(session_id) + if runtime_session is not None: + runtime_session.info.last_used_at = managed_process.exited_at + self.logger.info(f'LangBot Box managed process exited: session_id={session_id} return_code={return_code}') + + async def _terminate_managed_process(self, runtime_session: _RuntimeSession) -> None: + managed_process = runtime_session.managed_process + if managed_process is None or not managed_process.is_running: + return + + process = managed_process.process + try: + if process.stdin is not None: + process.stdin.close() + except Exception: + pass + + try: + await asyncio.wait_for(asyncio.shield(process.wait()), timeout=5) + except asyncio.TimeoutError: + if process.returncode is None: + try: + process.terminate() + except ProcessLookupError: + pass + try: + await asyncio.wait_for(asyncio.shield(process.wait()), timeout=5) + except asyncio.TimeoutError: + if process.returncode is None: + try: + process.kill() + except ProcessLookupError: + pass + await process.wait() + finally: + managed_process.exit_code = process.returncode + managed_process.exited_at = dt.datetime.now(_UTC) + + def _managed_process_to_dict(self, session_id: str, managed_process: _ManagedProcess) -> dict: + stderr_preview = '\n'.join(managed_process.stderr_chunks) + status = BoxManagedProcessStatus.RUNNING if managed_process.is_running else BoxManagedProcessStatus.EXITED + return BoxManagedProcessInfo( + session_id=session_id, + status=status, + command=managed_process.spec.command, + args=managed_process.spec.args, + cwd=managed_process.spec.cwd, + env_keys=sorted(managed_process.spec.env.keys()), + attached=managed_process.attach_lock.locked(), + started_at=managed_process.started_at, + exited_at=managed_process.exited_at, + exit_code=managed_process.exit_code, + stderr_preview=stderr_preview, + ).model_dump(mode='json') + + @staticmethod + def _session_to_dict(info: BoxSessionInfo) -> dict: + return info.model_dump(mode='json') diff --git a/src/langbot_plugin/box/security.py b/src/langbot_plugin/box/security.py new file mode 100644 index 0000000..d5a8c51 --- /dev/null +++ b/src/langbot_plugin/box/security.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import os + +from .errors import BoxValidationError +from .models import BoxSpec + +BLOCKED_HOST_PATHS = frozenset( + { + '/etc', + '/proc', + '/sys', + '/dev', + '/root', + '/boot', + '/run', + '/var/run', + '/run/docker.sock', + '/var/run/docker.sock', + '/run/podman', + '/var/run/podman', + } +) + + +def validate_sandbox_security(spec: BoxSpec) -> None: + """Validate that a BoxSpec does not request dangerous container config. + + Raises BoxValidationError when the spec contains a blocked host_path. + """ + if spec.host_path: + real = os.path.realpath(spec.host_path) + for blocked in BLOCKED_HOST_PATHS: + if real == blocked or real.startswith(blocked + '/'): + raise BoxValidationError(f'host_path {spec.host_path} is blocked for security') diff --git a/src/langbot_plugin/box/server.py b/src/langbot_plugin/box/server.py new file mode 100644 index 0000000..0690b8f --- /dev/null +++ b/src/langbot_plugin/box/server.py @@ -0,0 +1,267 @@ +"""Standalone Box Runtime service exposing BoxRuntime via action RPC. + +Usage (stdio, launched by LangBot as subprocess): + python -m langbot_plugin.box.server + +Usage (ws + ws relay, for remote/docker mode): + python -m langbot_plugin.box.server --port 5410 +""" + +from __future__ import annotations + +import argparse +import asyncio +import datetime as dt +import logging +import sys +from typing import Any + +import pydantic +from aiohttp import web + +from langbot_plugin.entities.io.actions.enums import CommonAction +from langbot_plugin.entities.io.resp import ActionResponse +from langbot_plugin.runtime.io.connection import Connection +from langbot_plugin.runtime.io.handler import Handler + +from .actions import LangBotToBoxAction +from .errors import ( + BoxManagedProcessConflictError, + BoxManagedProcessNotFoundError, + BoxSessionNotFoundError, +) +from .models import BoxExecutionResult, BoxManagedProcessSpec, BoxSpec +from .runtime import BoxRuntime + +logger = logging.getLogger('langbot.box.server') + + +def _result_to_dict(result: BoxExecutionResult) -> dict: + return result.model_dump(mode='json') + + +class BoxServerHandler(Handler): + """Server-side handler that registers box actions backed by BoxRuntime.""" + + name = 'BoxServerHandler' + + def __init__(self, connection: Connection, runtime: BoxRuntime): + super().__init__(connection) + self._runtime = runtime + self._register_actions() + + def _register_actions(self) -> None: + @self.action(CommonAction.PING) + async def ping(data: dict[str, Any]) -> ActionResponse: + return ActionResponse.success({}) + + @self.action(LangBotToBoxAction.HEALTH) + async def health(data: dict[str, Any]) -> ActionResponse: + info = await self._runtime.get_backend_info() + return ActionResponse.success(info) + + @self.action(LangBotToBoxAction.STATUS) + async def status(data: dict[str, Any]) -> ActionResponse: + result = await self._runtime.get_status() + return ActionResponse.success(result) + + @self.action(LangBotToBoxAction.EXEC) + async def exec_cmd(data: dict[str, Any]) -> ActionResponse: + try: + spec = BoxSpec.model_validate(data) + except pydantic.ValidationError as exc: + return ActionResponse.error(f'BoxValidationError: {exc}') + result = await self._runtime.execute(spec) + return ActionResponse.success(_result_to_dict(result)) + + @self.action(LangBotToBoxAction.CREATE_SESSION) + async def create_session(data: dict[str, Any]) -> ActionResponse: + try: + spec = BoxSpec.model_validate(data) + except pydantic.ValidationError as exc: + return ActionResponse.error(f'BoxValidationError: {exc}') + info = await self._runtime.create_session(spec) + return ActionResponse.success(info) + + @self.action(LangBotToBoxAction.GET_SESSION) + async def get_session(data: dict[str, Any]) -> ActionResponse: + return ActionResponse.success(self._runtime.get_session(data['session_id'])) + + @self.action(LangBotToBoxAction.GET_SESSIONS) + async def get_sessions(data: dict[str, Any]) -> ActionResponse: + return ActionResponse.success({'sessions': self._runtime.get_sessions()}) + + @self.action(LangBotToBoxAction.DELETE_SESSION) + async def delete_session(data: dict[str, Any]) -> ActionResponse: + await self._runtime.delete_session(data['session_id']) + return ActionResponse.success({'deleted': data['session_id']}) + + @self.action(LangBotToBoxAction.START_MANAGED_PROCESS) + async def start_managed_process(data: dict[str, Any]) -> ActionResponse: + session_id = data['session_id'] + try: + spec = BoxManagedProcessSpec.model_validate(data['spec']) + except pydantic.ValidationError as exc: + return ActionResponse.error(f'BoxValidationError: {exc}') + info = await self._runtime.start_managed_process(session_id, spec) + return ActionResponse.success(info) + + @self.action(LangBotToBoxAction.GET_MANAGED_PROCESS) + async def get_managed_process(data: dict[str, Any]) -> ActionResponse: + return ActionResponse.success(self._runtime.get_managed_process(data['session_id'])) + + @self.action(LangBotToBoxAction.GET_BACKEND_INFO) + async def get_backend_info(data: dict[str, Any]) -> ActionResponse: + info = await self._runtime.get_backend_info() + return ActionResponse.success(info) + + @self.action(LangBotToBoxAction.SHUTDOWN) + async def shutdown(data: dict[str, Any]) -> ActionResponse: + await self._runtime.shutdown() + return ActionResponse.success({}) + + +# ── Managed process WebSocket relay (aiohttp) ──────────────────────── + + +def _error_response(exc: Exception) -> web.Response: + return web.json_response( + {'error': {'code': type(exc).__name__, 'message': str(exc)}}, + status=400, + ) + + +async def handle_managed_process_ws(request: web.Request) -> web.StreamResponse: + runtime: BoxRuntime = request.app['runtime'] + session_id = request.match_info['session_id'] + + runtime_session = runtime._sessions.get(session_id) + if runtime_session is None: + return _error_response(BoxSessionNotFoundError(f'session {session_id} not found')) + + managed_process = runtime_session.managed_process + if managed_process is None: + return _error_response(BoxManagedProcessNotFoundError(f'session {session_id} has no managed process')) + if not managed_process.is_running: + return _error_response( + BoxManagedProcessConflictError(f'managed process in session {session_id} is not running') + ) + + ws = web.WebSocketResponse(protocols=('mcp',)) + await ws.prepare(request) + + async with managed_process.attach_lock: + process = managed_process.process + stdout = process.stdout + stdin = process.stdin + if stdout is None or stdin is None: + await ws.close(message=b'managed process stdio unavailable') + return ws + + async def _stdout_to_ws() -> None: + while True: + line = await stdout.readline() + if not line: + break + await ws.send_str(line.decode('utf-8', errors='replace').rstrip('\n')) + runtime_session.info.last_used_at = dt.datetime.now(dt.timezone.utc) + + async def _ws_to_stdin() -> None: + async for msg in ws: + if msg.type == web.WSMsgType.TEXT: + stdin.write((msg.data + '\n').encode('utf-8')) + await stdin.drain() + runtime_session.info.last_used_at = dt.datetime.now(dt.timezone.utc) + elif msg.type in ( + web.WSMsgType.CLOSE, + web.WSMsgType.CLOSING, + web.WSMsgType.CLOSED, + web.WSMsgType.ERROR, + ): + break + + stdout_task = asyncio.create_task(_stdout_to_ws()) + stdin_task = asyncio.create_task(_ws_to_stdin()) + try: + done, pending = await asyncio.wait( + [stdout_task, stdin_task], + return_when=asyncio.FIRST_COMPLETED, + ) + for task in pending: + task.cancel() + for task in done: + task.result() + finally: + await ws.close() + + return ws + + +def create_ws_relay_app(runtime: BoxRuntime) -> web.Application: + """Create a minimal aiohttp app that only serves the managed-process ws relay.""" + app = web.Application() + app['runtime'] = runtime + app.router.add_get('/v1/sessions/{session_id}/managed-process/ws', handle_managed_process_ws) + return app + + +# ── Entry point ────────────────────────────────────────────────────── + + +async def _run_server(host: str, port: int, mode: str) -> None: + runtime = BoxRuntime(logger=logger) + await runtime.initialize() + + # Start aiohttp for ws relay (non-fatal — managed process attach + # degrades gracefully if the port is unavailable). + runner: web.AppRunner | None = None + try: + ws_app = create_ws_relay_app(runtime) + runner = web.AppRunner(ws_app) + await runner.setup() + site = web.TCPSite(runner, host, port) + await site.start() + logger.info(f'Box ws relay listening on {host}:{port}') + except OSError as exc: + logger.warning(f'Box ws relay failed to bind {host}:{port}: {exc}') + logger.warning('Managed process WebSocket attach will be unavailable.') + + async def new_connection_callback(connection: Connection) -> None: + handler = BoxServerHandler(connection, runtime) + await handler.run() + + try: + if mode == 'stdio': + from langbot_plugin.runtime.io.controllers.stdio.server import StdioServerController + + ctrl = StdioServerController() + await ctrl.run(new_connection_callback) + else: + from langbot_plugin.runtime.io.controllers.ws.server import WebSocketServerController + + # Action RPC uses port+1 to avoid conflict with ws relay + rpc_port = port + 1 + logger.info(f'Box action RPC (ws) listening on {host}:{rpc_port}') + ctrl = WebSocketServerController(rpc_port) + await ctrl.run(new_connection_callback) + finally: + await runtime.shutdown() + if runner is not None: + await runner.cleanup() + + +def main() -> None: + parser = argparse.ArgumentParser(description='LangBot Box Runtime Service') + parser.add_argument('--host', default='0.0.0.0', help='Bind address') + parser.add_argument('--port', type=int, default=5410, help='Bind port (ws relay)') + parser.add_argument( + '--mode', choices=['stdio', 'ws'], default='stdio', help='Control channel transport (default: stdio)' + ) + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO, stream=sys.stderr) + asyncio.run(_run_server(args.host, args.port, args.mode)) + + +if __name__ == '__main__': + main() diff --git a/src/langbot_plugin/cli/__init__.py b/src/langbot_plugin/cli/__init__.py index 7f913f2..b4f388b 100644 --- a/src/langbot_plugin/cli/__init__.py +++ b/src/langbot_plugin/cli/__init__.py @@ -33,6 +33,10 @@ - [--stdio-control -s]: Use stdio for control connection - [--ws-control-port]: The port for control connection - [--ws-debug-port]: The port for debug connection + box: Run the sandbox box runtime + - [--host]: Bind address, default is 0.0.0.0 + - [--port]: Bind port for ws relay, default is 5410 + - [--mode]: Control channel transport (stdio or ws), default is stdio """ @@ -120,6 +124,19 @@ def main(): help="Skip checking and installing dependencies for all installed plugins", ) + # box command + box_parser = subparsers.add_parser("box", help="Run the sandbox box runtime") + box_parser.add_argument( + "--host", default="0.0.0.0", help="Bind address" + ) + box_parser.add_argument( + "--port", type=int, default=5410, help="Bind port (ws relay)" + ) + box_parser.add_argument( + "--mode", choices=["stdio", "ws"], default="stdio", + help="Control channel transport (default: stdio)" + ) + args = parser.parse_args() if not args.command: @@ -148,6 +165,9 @@ def main(): publish_process() case "rt": runtime_app.main(args) + case "box": + from langbot_plugin.box.server import main as box_main + box_main() case _: cli_print("unknown_command", args.command) sys.exit(1) From 873848e914e73003655e55a8a135fbd960d9d9a7 Mon Sep 17 00:00:00 2001 From: youhuanghe <1051233107@qq.com> Date: Sun, 22 Mar 2026 07:39:55 +0000 Subject: [PATCH 02/14] refactor: add if --- src/langbot_plugin/box/__main__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/langbot_plugin/box/__main__.py b/src/langbot_plugin/box/__main__.py index c6144f0..6c41643 100644 --- a/src/langbot_plugin/box/__main__.py +++ b/src/langbot_plugin/box/__main__.py @@ -2,4 +2,6 @@ from .server import main -main() +if __name__ == "__main__": + main() + From 818bc55cb7607d25d52e35f2a0fd28b8dc5547d8 Mon Sep 17 00:00:00 2001 From: youhuanghe <1051233107@qq.com> Date: Sun, 22 Mar 2026 14:19:34 +0000 Subject: [PATCH 03/14] feat: add box backend nsjail --- src/langbot_plugin/box/nsjail_backend.py | 506 +++++++++++++++++++++++ src/langbot_plugin/box/runtime.py | 7 +- tests/box/__init__.py | 0 tests/box/test_nsjail_backend.py | 348 ++++++++++++++++ 4 files changed, 858 insertions(+), 3 deletions(-) create mode 100644 src/langbot_plugin/box/nsjail_backend.py create mode 100644 tests/box/__init__.py create mode 100644 tests/box/test_nsjail_backend.py diff --git a/src/langbot_plugin/box/nsjail_backend.py b/src/langbot_plugin/box/nsjail_backend.py new file mode 100644 index 0000000..4421667 --- /dev/null +++ b/src/langbot_plugin/box/nsjail_backend.py @@ -0,0 +1,506 @@ +from __future__ import annotations + +import asyncio +import datetime as dt +import json +import logging +import os +import pathlib +import shlex +import shutil +import signal +import uuid + +from .backend import BaseSandboxBackend, _CommandResult, _MAX_RAW_OUTPUT_BYTES +from .errors import BoxError +from .models import ( + DEFAULT_BOX_MOUNT_PATH, + BoxExecutionResult, + BoxExecutionStatus, + BoxHostMountMode, + BoxNetworkMode, + BoxSessionInfo, + BoxSpec, +) +from .security import validate_sandbox_security + +# System directories to mount read-only inside the sandbox. +# Only well-known paths needed for running Python/Node/shell commands. +_READONLY_SYSTEM_MOUNTS: list[str] = [ + '/usr', + '/lib', + '/lib64', + '/bin', + '/sbin', +] + +# Specific /etc entries required for dynamic linking and TLS. +_READONLY_ETC_ENTRIES: list[str] = [ + '/etc/alternatives', + '/etc/ld.so.cache', + '/etc/ld.so.conf', + '/etc/ld.so.conf.d', + '/etc/ssl/certs', + '/etc/localtime', + '/etc/resolv.conf', # needed when network=ON +] + +_DEFAULT_BASE_DIR = '/tmp/langbot-box-nsjail' + + +class NsjailBackend(BaseSandboxBackend): + """Lightweight sandbox backend using nsjail. + + Each ``exec`` invocation spawns an independent nsjail process. Session + state (workspace files) persists via a shared host directory that is + bind-mounted into every invocation. + """ + + name = 'nsjail' + + def __init__( + self, + logger: logging.Logger, + nsjail_bin: str = 'nsjail', + base_dir: str = _DEFAULT_BASE_DIR, + ): + super().__init__(logger) + self._nsjail_bin = nsjail_bin + self._base_dir = pathlib.Path(base_dir) + self._cgroup_v2_available: bool = False + + # ── lifecycle ───────────────────────────────────────────────────── + + async def is_available(self) -> bool: + if shutil.which(self._nsjail_bin) is None: + self.logger.info('nsjail binary not found in PATH') + return False + + # Quick sanity check – nsjail --help exits 0. + try: + proc = await asyncio.create_subprocess_exec( + self._nsjail_bin, '--help', + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + ) + await asyncio.wait_for(proc.wait(), timeout=5) + if proc.returncode != 0: + self.logger.info('nsjail --help returned non-zero') + return False + except Exception as exc: + self.logger.info(f'nsjail probe failed: {exc}') + return False + + self._cgroup_v2_available = self._detect_cgroup_v2() + if not self._cgroup_v2_available: + self.logger.warning( + 'cgroup v2 not available for nsjail; ' + 'falling back to rlimit-based resource limits' + ) + + self._base_dir.mkdir(parents=True, exist_ok=True) + return True + + async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: + validate_sandbox_security(spec) + + now = dt.datetime.now(dt.UTC) + session_dir_name = f'{self.instance_id}_{spec.session_id}_{uuid.uuid4().hex[:8]}' + session_dir = self._base_dir / session_dir_name + + # Per-session writable directories. + workspace_dir = session_dir / 'workspace' + tmp_dir = session_dir / 'tmp' + home_dir = session_dir / 'home' + + for d in (workspace_dir, tmp_dir, home_dir): + d.mkdir(parents=True, exist_ok=True) + + # If host_path is specified, we will use it directly instead of the + # per-session workspace when building nsjail args (see _build_mounts). + meta = { + 'session_id': spec.session_id, + 'instance_id': self.instance_id, + 'host_path': spec.host_path, + 'host_path_mode': spec.host_path_mode.value if spec.host_path else None, + 'network': spec.network.value, + 'cpus': spec.cpus, + 'memory_mb': spec.memory_mb, + 'pids_limit': spec.pids_limit, + 'created_at': now.isoformat(), + } + (session_dir / 'meta.json').write_text(json.dumps(meta, indent=2)) + + self.logger.info( + f'LangBot Box backend start_session: backend=nsjail ' + f'session_id={spec.session_id} session_dir={session_dir} ' + f'network={spec.network.value} ' + f'host_path={spec.host_path} host_path_mode={spec.host_path_mode.value} ' + f'cpus={spec.cpus} memory_mb={spec.memory_mb} pids_limit={spec.pids_limit}' + ) + + return BoxSessionInfo( + session_id=spec.session_id, + backend_name=self.name, + backend_session_id=str(session_dir), + image='host', + network=spec.network, + host_path=spec.host_path, + host_path_mode=spec.host_path_mode, + cpus=spec.cpus, + memory_mb=spec.memory_mb, + pids_limit=spec.pids_limit, + read_only_rootfs=True, # always true for nsjail + created_at=now, + last_used_at=now, + ) + + async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult: + start = dt.datetime.now(dt.UTC) + session_dir = pathlib.Path(session.backend_session_id) + + args = self._build_nsjail_args(session, spec, session_dir) + + cmd_preview = spec.cmd.strip() + if len(cmd_preview) > 400: + cmd_preview = f'{cmd_preview[:397]}...' + self.logger.info( + f'LangBot Box backend exec: backend=nsjail ' + f'session_id={session.session_id} session_dir={session_dir} ' + f'workdir={spec.workdir} timeout_sec={spec.timeout_sec} ' + f'env_keys={sorted(spec.env.keys())} cmd={cmd_preview}' + ) + + result = await self._run_nsjail(args, timeout_sec=spec.timeout_sec) + duration_ms = int((dt.datetime.now(dt.UTC) - start).total_seconds() * 1000) + + if result.timed_out: + return BoxExecutionResult( + session_id=session.session_id, + backend_name=self.name, + status=BoxExecutionStatus.TIMED_OUT, + exit_code=None, + stdout=result.stdout, + stderr=result.stderr or f'Command timed out after {spec.timeout_sec} seconds.', + duration_ms=duration_ms, + ) + + return BoxExecutionResult( + session_id=session.session_id, + backend_name=self.name, + status=BoxExecutionStatus.COMPLETED, + exit_code=result.return_code, + stdout=result.stdout, + stderr=result.stderr, + duration_ms=duration_ms, + ) + + async def stop_session(self, session: BoxSessionInfo): + session_dir = pathlib.Path(session.backend_session_id) + self.logger.info( + f'LangBot Box backend stop_session: backend=nsjail ' + f'session_id={session.session_id} session_dir={session_dir}' + ) + + # Kill any lingering nsjail processes whose cwd is inside session_dir. + await self._kill_session_processes(session_dir) + + try: + if session_dir.exists(): + shutil.rmtree(session_dir) + except Exception as exc: + self.logger.warning(f'Failed to remove nsjail session dir {session_dir}: {exc}') + + async def start_managed_process( + self, session: BoxSessionInfo, spec + ) -> asyncio.subprocess.Process: + session_dir = pathlib.Path(session.backend_session_id) + + # Build a BoxSpec-like object so we can reuse _build_nsjail_args. + # ManagedProcessSpec has command/args/cwd/env but not the full BoxSpec. + inner_cmd = ' '.join([shlex.quote(spec.command), *[shlex.quote(a) for a in spec.args]]) + pseudo_spec = BoxSpec( + cmd=inner_cmd, + workdir=spec.cwd, + timeout_sec=86400, # not used here + network=session.network, + session_id=session.session_id, + env=spec.env, + host_path=session.host_path, + host_path_mode=session.host_path_mode, + cpus=session.cpus, + memory_mb=session.memory_mb, + pids_limit=session.pids_limit, + read_only_rootfs=True, + ) + + args = self._build_nsjail_args(session, pseudo_spec, session_dir) + + self.logger.info( + f'LangBot Box backend start_managed_process: backend=nsjail ' + f'session_id={session.session_id} session_dir={session_dir} ' + f'cwd={spec.cwd} env_keys={sorted(spec.env.keys())} ' + f'command={spec.command} args={spec.args}' + ) + + return await asyncio.create_subprocess_exec( + *args, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + async def cleanup_orphaned_containers(self, current_instance_id: str = ''): + if not self._base_dir.exists(): + return + + for entry in self._base_dir.iterdir(): + if not entry.is_dir(): + continue + + # Session dirs are named: __ + # If it doesn't start with the current instance_id, it's orphaned. + if entry.name.startswith(f'{current_instance_id}_'): + continue + + self.logger.info(f'Cleaning up orphaned nsjail session dir: {entry}') + try: + await self._kill_session_processes(entry) + shutil.rmtree(entry) + except Exception as exc: + self.logger.warning(f'Failed to clean up orphaned nsjail dir {entry}: {exc}') + + # ── nsjail argument construction ────────────────────────────────── + + def _build_nsjail_args( + self, + session: BoxSessionInfo, + spec: BoxSpec, + session_dir: pathlib.Path, + ) -> list[str]: + args: list[str] = [self._nsjail_bin] + + # Mode: one-shot execution. + args.extend(['--mode', 'o']) + + # Namespace isolation. + args.extend([ + '--clone_newuser', + '--clone_newns', + '--clone_newpid', + '--clone_newipc', + '--clone_newuts', + '--clone_newcgroup', + ]) + + # Network namespace. + if spec.network == BoxNetworkMode.OFF: + args.append('--clone_newnet') + else: + args.append('--disable_clone_newnet') + + # Read-only system mounts. + args.extend(self._build_readonly_mounts(spec.network)) + + # Writable per-session mounts. + args.extend(self._build_writable_mounts(session, spec, session_dir)) + + # Isolated /proc and minimal /dev. + args.extend(['--mount', 'none:/proc:proc:rw']) + args.extend(['--mount', 'none:/dev:tmpfs:rw']) + + # Working directory. + args.extend(['--cwd', spec.workdir]) + + # Environment variables. + args.extend(['--env', 'PYTHONUNBUFFERED=1']) + args.extend(['--env', 'HOME=/home']) + args.extend(['--env', 'PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin']) + for key, value in spec.env.items(): + args.extend(['--env', f'{key}={value}']) + + # Resource limits. + args.extend(self._build_resource_limits(spec)) + + # Suppress nsjail's own log output. + args.append('--really_quiet') + + # The actual command. + quoted_workdir = shlex.quote(spec.workdir) + user_cmd = f'mkdir -p {quoted_workdir} && cd {quoted_workdir} && {spec.cmd}' + args.extend(['--', 'sh', '-lc', user_cmd]) + + return args + + def _build_readonly_mounts(self, network: BoxNetworkMode) -> list[str]: + args: list[str] = [] + + for path in _READONLY_SYSTEM_MOUNTS: + if os.path.exists(path): + args.extend(['--bindmount_ro', f'{path}:{path}']) + + for path in _READONLY_ETC_ENTRIES: + # /etc/resolv.conf is only needed when network is ON. + if path == '/etc/resolv.conf' and network == BoxNetworkMode.OFF: + continue + if os.path.exists(path): + args.extend(['--bindmount_ro', f'{path}:{path}']) + + return args + + def _build_writable_mounts( + self, + session: BoxSessionInfo, + spec: BoxSpec, + session_dir: pathlib.Path, + ) -> list[str]: + args: list[str] = [] + + # Workspace mount. + if spec.host_path is not None and spec.host_path_mode != BoxHostMountMode.NONE: + if spec.host_path_mode == BoxHostMountMode.READ_ONLY: + args.extend(['--bindmount_ro', f'{spec.host_path}:{DEFAULT_BOX_MOUNT_PATH}']) + else: + args.extend(['--rw_bind', f'{spec.host_path}:{DEFAULT_BOX_MOUNT_PATH}']) + else: + workspace_dir = session_dir / 'workspace' + args.extend(['--rw_bind', f'{workspace_dir}:{DEFAULT_BOX_MOUNT_PATH}']) + + # /tmp and /home are always per-session writable. + tmp_dir = session_dir / 'tmp' + home_dir = session_dir / 'home' + args.extend(['--rw_bind', f'{tmp_dir}:/tmp']) + args.extend(['--rw_bind', f'{home_dir}:/home']) + + return args + + def _build_resource_limits(self, spec: BoxSpec) -> list[str]: + args: list[str] = [] + + if self._cgroup_v2_available: + # cgroup v2 – precise limits. + memory_bytes = spec.memory_mb * 1024 * 1024 + args.extend(['--cgroup_mem_max', str(memory_bytes)]) + args.extend(['--cgroup_pids_max', str(spec.pids_limit)]) + cpu_ms = int(spec.cpus * 1000) + args.extend(['--cgroup_cpu_ms_per_sec', str(cpu_ms)]) + else: + # rlimit fallback – best-effort. + args.extend(['--rlimit_as', str(spec.memory_mb)]) + args.extend(['--rlimit_nproc', str(spec.pids_limit)]) + + # Always set these rlimits regardless of cgroup mode. + args.extend(['--rlimit_fsize', '512']) # max file size 512 MB + args.extend(['--rlimit_nofile', '256']) # max open fds + + return args + + # ── process execution ───────────────────────────────────────────── + + async def _run_nsjail( + self, + args: list[str], + timeout_sec: int, + ) -> _CommandResult: + process = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout_task = asyncio.create_task(self._read_stream(process.stdout)) + stderr_task = asyncio.create_task(self._read_stream(process.stderr)) + + timed_out = False + try: + await asyncio.wait_for(process.wait(), timeout=timeout_sec) + except asyncio.TimeoutError: + process.kill() + timed_out = True + await process.wait() + + stdout_bytes, stdout_total = await stdout_task + stderr_bytes, stderr_total = await stderr_task + + return _CommandResult( + return_code=process.returncode if not timed_out else -1, + stdout=self._clip_captured_bytes(stdout_bytes, stdout_total), + stderr=self._clip_captured_bytes(stderr_bytes, stderr_total), + timed_out=timed_out, + ) + + # ── helpers ─────────────────────────────────────────────────────── + + @staticmethod + def _detect_cgroup_v2() -> bool: + """Check whether the host runs cgroup v2 and we can write to it.""" + cgroup_mount = pathlib.Path('/sys/fs/cgroup') + if not cgroup_mount.exists(): + return False + # cgroup v2 has a single hierarchy with cgroup.controllers file. + controllers = cgroup_mount / 'cgroup.controllers' + if not controllers.exists(): + return False + # Check if we can write to a cgroup subtree (needed for nsjail). + # A rough heuristic: if the user owns a cgroup directory we're probably + # running under systemd user delegation. + user_slice = cgroup_mount / f'user.slice/user-{os.getuid()}.slice' + if user_slice.exists(): + return True + # If running as root (uid 0), cgroup v2 is always usable. + if os.getuid() == 0: + return True + # Conservative: if we can't confirm writability, report unavailable. + return False + + async def _kill_session_processes(self, session_dir: pathlib.Path) -> None: + """Best-effort kill of nsjail processes associated with a session dir. + + We scan /proc for nsjail processes whose command line contains the + session directory path. + """ + session_path_str = str(session_dir) + proc_dir = pathlib.Path('/proc') + if not proc_dir.exists(): + return + + for pid_dir in proc_dir.iterdir(): + if not pid_dir.name.isdigit(): + continue + try: + cmdline = (pid_dir / 'cmdline').read_bytes().decode('utf-8', errors='replace') + if self._nsjail_bin in cmdline and session_path_str in cmdline: + pid = int(pid_dir.name) + os.kill(pid, signal.SIGKILL) + self.logger.info(f'Killed orphaned nsjail process {pid}') + except (OSError, ValueError): + continue + + @staticmethod + def _clip_captured_bytes( + data: bytes, total_size: int, limit: int = _MAX_RAW_OUTPUT_BYTES + ) -> str: + text = data.decode('utf-8', errors='replace').strip() + if total_size > limit: + text += f'\n... [raw output clipped at {limit} bytes, {total_size - limit} bytes discarded]' + return text + + @staticmethod + async def _read_stream( + stream: asyncio.StreamReader | None, + limit: int = _MAX_RAW_OUTPUT_BYTES, + ) -> tuple[bytes, int]: + if stream is None: + return b'', 0 + + chunks = bytearray() + total_size = 0 + while True: + chunk = await stream.read(65536) + if not chunk: + break + total_size += len(chunk) + remaining = limit - len(chunks) + if remaining > 0: + chunks.extend(chunk[:remaining]) + + return bytes(chunks), total_size diff --git a/src/langbot_plugin/box/runtime.py b/src/langbot_plugin/box/runtime.py index 36f8c13..cac6b66 100644 --- a/src/langbot_plugin/box/runtime.py +++ b/src/langbot_plugin/box/runtime.py @@ -8,6 +8,7 @@ import uuid from .backend import BaseSandboxBackend, DockerBackend, PodmanBackend +from .nsjail_backend import NsjailBackend from .errors import ( BoxBackendUnavailableError, BoxManagedProcessConflictError, @@ -61,7 +62,7 @@ def __init__( session_ttl_sec: int = 300, ): self.logger = logger - self.backends = backends or [PodmanBackend(logger), DockerBackend(logger)] + self.backends = backends or [PodmanBackend(logger), DockerBackend(logger), NsjailBackend(logger)] self.session_ttl_sec = session_ttl_sec self._backend: BaseSandboxBackend | None = None self._sessions: dict[str, _RuntimeSession] = {} @@ -227,7 +228,7 @@ async def _get_backend(self) -> BaseSandboxBackend: self._backend = await self._select_backend() if self._backend is None: raise BoxBackendUnavailableError( - 'LangBot Box backend unavailable. Install and start Podman or Docker before using sandbox_exec.' + 'LangBot Box backend unavailable. Install and start Podman, Docker, or nsjail before using sandbox_exec.' ) return self._backend @@ -241,7 +242,7 @@ async def _select_backend(self) -> BaseSandboxBackend | None: except Exception as exc: self.logger.warning(f'LangBot Box backend {backend.name} probe failed: {exc}') - self.logger.warning('LangBot Box backend unavailable: neither Podman nor Docker is ready') + self.logger.warning('LangBot Box backend unavailable: no supported backend (Podman, Docker, nsjail) is ready') return None async def _reap_expired_sessions_locked(self): diff --git a/tests/box/__init__.py b/tests/box/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/box/test_nsjail_backend.py b/tests/box/test_nsjail_backend.py new file mode 100644 index 0000000..b9e7f68 --- /dev/null +++ b/tests/box/test_nsjail_backend.py @@ -0,0 +1,348 @@ +"""Unit tests for NsjailBackend. + +These tests do NOT require nsjail to be installed – they mock subprocess +calls and filesystem checks to verify argument construction, session +directory management, and cgroup detection logic. +""" + +from __future__ import annotations + +import asyncio +import logging +import pathlib +from unittest import mock + +import pytest + +from langbot_plugin.box.nsjail_backend import ( + NsjailBackend, + _READONLY_ETC_ENTRIES, + _READONLY_SYSTEM_MOUNTS, +) +from langbot_plugin.box.models import ( + BoxExecutionStatus, + BoxHostMountMode, + BoxNetworkMode, + BoxSessionInfo, + BoxSpec, +) + + +@pytest.fixture +def logger(): + return logging.getLogger('test.nsjail') + + +@pytest.fixture +def tmp_base(tmp_path: pathlib.Path): + return tmp_path / 'nsjail-base' + + +@pytest.fixture +def backend(logger, tmp_base): + b = NsjailBackend(logger=logger, base_dir=str(tmp_base)) + b.instance_id = 'test123' + return b + + +# ── is_available ────────────────────────────────────────────────────── + +@pytest.mark.anyio +async def test_is_available_no_binary(backend): + with mock.patch('shutil.which', return_value=None): + assert await backend.is_available() is False + + +@pytest.mark.anyio +async def test_is_available_binary_exists(backend, tmp_base): + with ( + mock.patch('shutil.which', return_value='/usr/bin/nsjail'), + mock.patch('asyncio.create_subprocess_exec') as mock_exec, + ): + mock_proc = mock.AsyncMock() + mock_proc.returncode = 0 + mock_proc.wait = mock.AsyncMock(return_value=0) + mock_exec.return_value = mock_proc + + result = await backend.is_available() + assert result is True + assert tmp_base.exists() + + +# ── start_session ───────────────────────────────────────────────────── + +@pytest.mark.anyio +async def test_start_session_creates_directories(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + spec = BoxSpec(session_id='sess1', cmd='echo hi') + + info = await backend.start_session(spec) + + session_dir = pathlib.Path(info.backend_session_id) + assert session_dir.exists() + assert (session_dir / 'workspace').is_dir() + assert (session_dir / 'tmp').is_dir() + assert (session_dir / 'home').is_dir() + assert (session_dir / 'meta.json').exists() + + assert info.backend_name == 'nsjail' + assert info.session_id == 'sess1' + assert info.image == 'host' + assert info.read_only_rootfs is True + + +@pytest.mark.anyio +async def test_start_session_with_host_path(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + spec = BoxSpec( + session_id='sess2', + cmd='ls', + host_path='/some/path', + host_path_mode=BoxHostMountMode.READ_WRITE, + ) + + info = await backend.start_session(spec) + assert info.host_path == '/some/path' + assert info.host_path_mode == BoxHostMountMode.READ_WRITE + + +# ── stop_session ────────────────────────────────────────────────────── + +@pytest.mark.anyio +async def test_stop_session_removes_directory(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + spec = BoxSpec(session_id='sess-rm', cmd='echo') + + info = await backend.start_session(spec) + session_dir = pathlib.Path(info.backend_session_id) + assert session_dir.exists() + + await backend.stop_session(info) + assert not session_dir.exists() + + +# ── nsjail argument construction ────────────────────────────────────── + +def test_build_nsjail_args_basic(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + session_dir = tmp_base / 'test_session' + for d in ('workspace', 'tmp', 'home'): + (session_dir / d).mkdir(parents=True) + + session = BoxSessionInfo( + session_id='s1', + backend_name='nsjail', + backend_session_id=str(session_dir), + image='host', + network=BoxNetworkMode.OFF, + created_at='2024-01-01T00:00:00+00:00', + last_used_at='2024-01-01T00:00:00+00:00', + ) + spec = BoxSpec(session_id='s1', cmd='echo hello', env={'FOO': 'bar'}) + + args = backend._build_nsjail_args(session, spec, session_dir) + + assert args[0] == 'nsjail' + assert '--mode' in args + assert args[args.index('--mode') + 1] == 'o' + assert '--clone_newnet' in args + assert '--disable_clone_newnet' not in args + assert '--really_quiet' in args + + # Writable mounts should reference session directories. + rw_binds = [args[i + 1] for i, a in enumerate(args) if a == '--rw_bind'] + workspace_mount = f'{session_dir}/workspace:/workspace' + assert workspace_mount in rw_binds + + # Custom env should be present. + env_values = [args[i + 1] for i, a in enumerate(args) if a == '--env'] + assert 'FOO=bar' in env_values + + # Command is the last part after '--'. + separator_idx = args.index('--') + assert args[separator_idx + 1] == 'sh' + + +def test_build_nsjail_args_network_on(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + session_dir = tmp_base / 'test_session_net' + for d in ('workspace', 'tmp', 'home'): + (session_dir / d).mkdir(parents=True) + + session = BoxSessionInfo( + session_id='s2', + backend_name='nsjail', + backend_session_id=str(session_dir), + image='host', + network=BoxNetworkMode.ON, + created_at='2024-01-01T00:00:00+00:00', + last_used_at='2024-01-01T00:00:00+00:00', + ) + spec = BoxSpec(session_id='s2', cmd='curl http://example.com', network=BoxNetworkMode.ON) + + args = backend._build_nsjail_args(session, spec, session_dir) + + assert '--disable_clone_newnet' in args + assert '--clone_newnet' not in args + + +def test_build_nsjail_args_host_path_ro(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + session_dir = tmp_base / 'test_hp' + for d in ('workspace', 'tmp', 'home'): + (session_dir / d).mkdir(parents=True) + + session = BoxSessionInfo( + session_id='s3', + backend_name='nsjail', + backend_session_id=str(session_dir), + image='host', + network=BoxNetworkMode.OFF, + host_path='/data/project', + host_path_mode=BoxHostMountMode.READ_ONLY, + created_at='2024-01-01T00:00:00+00:00', + last_used_at='2024-01-01T00:00:00+00:00', + ) + spec = BoxSpec( + session_id='s3', + cmd='ls', + host_path='/data/project', + host_path_mode=BoxHostMountMode.READ_ONLY, + ) + + args = backend._build_nsjail_args(session, spec, session_dir) + + ro_binds = [args[i + 1] for i, a in enumerate(args) if a == '--bindmount_ro'] + assert '/data/project:/workspace' in ro_binds + + +def test_build_resource_limits_cgroup(backend): + backend._cgroup_v2_available = True + spec = BoxSpec(session_id='s', cmd='x', cpus=2.0, memory_mb=1024, pids_limit=256) + + args = backend._build_resource_limits(spec) + + assert '--cgroup_mem_max' in args + mem_idx = args.index('--cgroup_mem_max') + assert args[mem_idx + 1] == str(1024 * 1024 * 1024) + + pids_idx = args.index('--cgroup_pids_max') + assert args[pids_idx + 1] == '256' + + cpu_idx = args.index('--cgroup_cpu_ms_per_sec') + assert args[cpu_idx + 1] == '2000' + + +def test_build_resource_limits_rlimit_fallback(backend): + backend._cgroup_v2_available = False + spec = BoxSpec(session_id='s', cmd='x', memory_mb=512, pids_limit=128) + + args = backend._build_resource_limits(spec) + + assert '--rlimit_as' in args + as_idx = args.index('--rlimit_as') + assert args[as_idx + 1] == '512' + + nproc_idx = args.index('--rlimit_nproc') + assert args[nproc_idx + 1] == '128' + + # cgroup flags should NOT be present. + assert '--cgroup_mem_max' not in args + + +# ── exec ────────────────────────────────────────────────────────────── + +@pytest.mark.anyio +async def test_exec_success(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + spec = BoxSpec(session_id='exec1', cmd='echo hello') + info = await backend.start_session(spec) + + with mock.patch.object(backend, '_run_nsjail') as mock_run: + from langbot_plugin.box.backend import _CommandResult + mock_run.return_value = _CommandResult( + return_code=0, stdout='hello\n', stderr='', timed_out=False + ) + + result = await backend.exec(info, spec) + + assert result.status == BoxExecutionStatus.COMPLETED + assert result.exit_code == 0 + assert result.stdout == 'hello\n' + assert result.backend_name == 'nsjail' + + +@pytest.mark.anyio +async def test_exec_timeout(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + spec = BoxSpec(session_id='exec2', cmd='sleep 100', timeout_sec=1) + info = await backend.start_session(spec) + + with mock.patch.object(backend, '_run_nsjail') as mock_run: + from langbot_plugin.box.backend import _CommandResult + mock_run.return_value = _CommandResult( + return_code=-1, stdout='', stderr='', timed_out=True + ) + + result = await backend.exec(info, spec) + + assert result.status == BoxExecutionStatus.TIMED_OUT + assert result.exit_code is None + + +# ── cgroup detection ────────────────────────────────────────────────── + +def test_detect_cgroup_v2_no_mount(): + with mock.patch.object(pathlib.Path, 'exists', return_value=False): + assert NsjailBackend._detect_cgroup_v2() is False + + +def test_detect_cgroup_v2_root_user(): + orig_exists = pathlib.Path.exists + + def always_exists(self): + return True + + with ( + mock.patch('os.getuid', return_value=0), + mock.patch.object(pathlib.Path, 'exists', always_exists), + ): + assert NsjailBackend._detect_cgroup_v2() is True + + +# ── cleanup_orphaned_containers ─────────────────────────────────────── + +@pytest.mark.anyio +async def test_cleanup_orphaned_removes_old_sessions(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + + # Create a dir from a different instance. + old_dir = tmp_base / 'oldinst_sess1_abc' + old_dir.mkdir() + (old_dir / 'workspace').mkdir() + + # Create a dir from current instance. + current_dir = tmp_base / 'test123_sess2_def' + current_dir.mkdir() + (current_dir / 'workspace').mkdir() + + with mock.patch.object(backend, '_kill_session_processes', new_callable=mock.AsyncMock): + await backend.cleanup_orphaned_containers('test123') + + assert not old_dir.exists() + assert current_dir.exists() + + +# ── output clipping ────────────────────────────────────────────────── + +def test_clip_captured_bytes_within_limit(): + data = b'hello world' + result = NsjailBackend._clip_captured_bytes(data, len(data)) + assert result == 'hello world' + + +def test_clip_captured_bytes_exceeds_limit(): + data = b'hello' + result = NsjailBackend._clip_captured_bytes(data, 2_000_000, limit=1_000_000) + assert 'clipped' in result + assert '1000000' in result From c0e30968bd22051634165c1e7480b2215280d4a0 Mon Sep 17 00:00:00 2001 From: youhuanghe <1051233107@qq.com> Date: Sun, 22 Mar 2026 15:06:29 +0000 Subject: [PATCH 04/14] refactor: use unified logging config in box server --- src/langbot_plugin/box/server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/langbot_plugin/box/server.py b/src/langbot_plugin/box/server.py index 0690b8f..e704500 100644 --- a/src/langbot_plugin/box/server.py +++ b/src/langbot_plugin/box/server.py @@ -23,6 +23,7 @@ from langbot_plugin.entities.io.resp import ActionResponse from langbot_plugin.runtime.io.connection import Connection from langbot_plugin.runtime.io.handler import Handler +from langbot_plugin.utils.log import configure_process_logging from .actions import LangBotToBoxAction from .errors import ( @@ -259,7 +260,7 @@ def main() -> None: ) args = parser.parse_args() - logging.basicConfig(level=logging.INFO, stream=sys.stderr) + configure_process_logging(stream=sys.stderr) asyncio.run(_run_server(args.host, args.port, args.mode)) From ef7f3546bc2ace0fe46810c4457888f04df863f6 Mon Sep 17 00:00:00 2001 From: youhuanghe <1051233107@qq.com> Date: Tue, 24 Mar 2026 02:38:20 +0000 Subject: [PATCH 05/14] feat(box): support configurable sandbox mount paths - add mount_path to box specs and session metadata - mount host paths at spec.mount_path instead of hard-coded /workspace - default workdir/cwd from mount_path when omitted - update runtime/backend logging and nsjail tests for custom mount paths --- src/langbot_plugin/box/backend.py | 12 ++++---- src/langbot_plugin/box/models.py | 31 +++++++++++++++++--- src/langbot_plugin/box/nsjail_backend.py | 18 ++++++------ src/langbot_plugin/box/runtime.py | 4 ++- tests/box/test_nsjail_backend.py | 36 ++++++++++++++++++++++++ 5 files changed, 82 insertions(+), 19 deletions(-) diff --git a/src/langbot_plugin/box/backend.py b/src/langbot_plugin/box/backend.py index e5bbe56..40b6a67 100644 --- a/src/langbot_plugin/box/backend.py +++ b/src/langbot_plugin/box/backend.py @@ -12,7 +12,6 @@ from .errors import BoxError from .models import ( - DEFAULT_BOX_MOUNT_PATH, BoxExecutionResult, BoxExecutionStatus, BoxHostMountMode, @@ -89,7 +88,7 @@ async def is_available(self) -> bool: async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: validate_sandbox_security(spec) - now = dt.datetime.now(dt.UTC) + now = dt.datetime.now(dt.timezone.utc) container_name = self._build_container_name(spec.session_id) args = [ @@ -120,7 +119,7 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: args.extend(['--tmpfs', '/tmp:size=64m']) if spec.host_path is not None and spec.host_path_mode != BoxHostMountMode.NONE: - mount_spec = f'{spec.host_path}:{DEFAULT_BOX_MOUNT_PATH}:{spec.host_path_mode.value}' + mount_spec = f'{spec.host_path}:{spec.mount_path}:{spec.host_path_mode.value}' args.extend(['-v', mount_spec]) args.extend([spec.image, 'sh', '-lc', 'while true; do sleep 3600; done']) @@ -129,7 +128,7 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: f'LangBot Box backend start_session: backend={self.name} ' f'session_id={spec.session_id} container_name={container_name} ' f'image={spec.image} network={spec.network.value} ' - f'host_path={spec.host_path} host_path_mode={spec.host_path_mode.value} ' + f'host_path={spec.host_path} host_path_mode={spec.host_path_mode.value} mount_path={spec.mount_path} ' f'cpus={spec.cpus} memory_mb={spec.memory_mb} pids_limit={spec.pids_limit} ' f'read_only_rootfs={spec.read_only_rootfs}' ) @@ -144,6 +143,7 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: network=spec.network, host_path=spec.host_path, host_path_mode=spec.host_path_mode, + mount_path=spec.mount_path, cpus=spec.cpus, memory_mb=spec.memory_mb, pids_limit=spec.pids_limit, @@ -153,7 +153,7 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: ) async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult: - start = dt.datetime.now(dt.UTC) + start = dt.datetime.now(dt.timezone.utc) args = [self.command, 'exec'] for key, value in spec.env.items(): @@ -179,7 +179,7 @@ async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResu ) result = await self._run_command(args, timeout_sec=spec.timeout_sec, check=False) - duration_ms = int((dt.datetime.now(dt.UTC) - start).total_seconds() * 1000) + duration_ms = int((dt.datetime.now(dt.timezone.utc) - start).total_seconds() * 1000) if result.timed_out: return BoxExecutionResult( diff --git a/src/langbot_plugin/box/models.py b/src/langbot_plugin/box/models.py index 90496ca..8de58dc 100644 --- a/src/langbot_plugin/box/models.py +++ b/src/langbot_plugin/box/models.py @@ -33,7 +33,7 @@ class BoxManagedProcessStatus(str, enum.Enum): class BoxSpec(pydantic.BaseModel): cmd: str = '' - workdir: str = '/workspace' + workdir: str = DEFAULT_BOX_MOUNT_PATH timeout_sec: int = 30 network: BoxNetworkMode = BoxNetworkMode.OFF session_id: str @@ -41,12 +41,26 @@ class BoxSpec(pydantic.BaseModel): image: str = DEFAULT_BOX_IMAGE host_path: str | None = None host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE + mount_path: str = DEFAULT_BOX_MOUNT_PATH # Resource limits cpus: float = 1.0 memory_mb: int = 512 pids_limit: int = 128 read_only_rootfs: bool = True + @pydantic.model_validator(mode='before') + @classmethod + def populate_workdir_from_mount_path(cls, data): + if not isinstance(data, dict): + return data + if data.get('workdir') not in (None, ''): + return data + mount_path = data.get('mount_path') + if isinstance(mount_path, str) and mount_path.strip(): + data = dict(data) + data['workdir'] = mount_path + return data + @pydantic.field_validator('cmd') @classmethod def validate_cmd(cls, value: str) -> str: @@ -111,14 +125,22 @@ def validate_host_path(cls, value: str | None) -> str | None: raise ValueError('host_path must be an absolute host path') return value + @pydantic.field_validator('mount_path') + @classmethod + def validate_mount_path(cls, value: str) -> str: + value = value.strip() + if not value.startswith('/'): + raise ValueError('mount_path must be an absolute path inside the sandbox') + return value + @pydantic.model_validator(mode='after') def validate_host_mount_consistency(self) -> 'BoxSpec': if self.host_path is None: return self if self.host_path_mode == BoxHostMountMode.NONE: return self - if not self.workdir.startswith(DEFAULT_BOX_MOUNT_PATH): - raise ValueError('workdir must stay under /workspace when host_path is provided') + if self.workdir != self.mount_path and not self.workdir.startswith(f'{self.mount_path}/'): + raise ValueError('workdir must stay under mount_path when host_path is provided') return self @@ -198,6 +220,7 @@ class BoxSessionInfo(pydantic.BaseModel): network: BoxNetworkMode host_path: str | None = None host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE + mount_path: str = DEFAULT_BOX_MOUNT_PATH cpus: float = 1.0 memory_mb: int = 512 pids_limit: int = 128 @@ -210,7 +233,7 @@ class BoxManagedProcessSpec(pydantic.BaseModel): command: str args: list[str] = pydantic.Field(default_factory=list) env: dict[str, str] = pydantic.Field(default_factory=dict) - cwd: str = '/workspace' + cwd: str = DEFAULT_BOX_MOUNT_PATH @pydantic.field_validator('command') @classmethod diff --git a/src/langbot_plugin/box/nsjail_backend.py b/src/langbot_plugin/box/nsjail_backend.py index 4421667..b972c7a 100644 --- a/src/langbot_plugin/box/nsjail_backend.py +++ b/src/langbot_plugin/box/nsjail_backend.py @@ -14,7 +14,6 @@ from .backend import BaseSandboxBackend, _CommandResult, _MAX_RAW_OUTPUT_BYTES from .errors import BoxError from .models import ( - DEFAULT_BOX_MOUNT_PATH, BoxExecutionResult, BoxExecutionStatus, BoxHostMountMode, @@ -104,7 +103,7 @@ async def is_available(self) -> bool: async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: validate_sandbox_security(spec) - now = dt.datetime.now(dt.UTC) + now = dt.datetime.now(dt.timezone.utc) session_dir_name = f'{self.instance_id}_{spec.session_id}_{uuid.uuid4().hex[:8]}' session_dir = self._base_dir / session_dir_name @@ -123,6 +122,7 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: 'instance_id': self.instance_id, 'host_path': spec.host_path, 'host_path_mode': spec.host_path_mode.value if spec.host_path else None, + 'mount_path': spec.mount_path, 'network': spec.network.value, 'cpus': spec.cpus, 'memory_mb': spec.memory_mb, @@ -135,7 +135,7 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: f'LangBot Box backend start_session: backend=nsjail ' f'session_id={spec.session_id} session_dir={session_dir} ' f'network={spec.network.value} ' - f'host_path={spec.host_path} host_path_mode={spec.host_path_mode.value} ' + f'host_path={spec.host_path} host_path_mode={spec.host_path_mode.value} mount_path={spec.mount_path} ' f'cpus={spec.cpus} memory_mb={spec.memory_mb} pids_limit={spec.pids_limit}' ) @@ -147,6 +147,7 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: network=spec.network, host_path=spec.host_path, host_path_mode=spec.host_path_mode, + mount_path=spec.mount_path, cpus=spec.cpus, memory_mb=spec.memory_mb, pids_limit=spec.pids_limit, @@ -156,7 +157,7 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: ) async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult: - start = dt.datetime.now(dt.UTC) + start = dt.datetime.now(dt.timezone.utc) session_dir = pathlib.Path(session.backend_session_id) args = self._build_nsjail_args(session, spec, session_dir) @@ -172,7 +173,7 @@ async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResu ) result = await self._run_nsjail(args, timeout_sec=spec.timeout_sec) - duration_ms = int((dt.datetime.now(dt.UTC) - start).total_seconds() * 1000) + duration_ms = int((dt.datetime.now(dt.timezone.utc) - start).total_seconds() * 1000) if result.timed_out: return BoxExecutionResult( @@ -228,6 +229,7 @@ async def start_managed_process( env=spec.env, host_path=session.host_path, host_path_mode=session.host_path_mode, + mount_path=session.mount_path, cpus=session.cpus, memory_mb=session.memory_mb, pids_limit=session.pids_limit, @@ -359,12 +361,12 @@ def _build_writable_mounts( # Workspace mount. if spec.host_path is not None and spec.host_path_mode != BoxHostMountMode.NONE: if spec.host_path_mode == BoxHostMountMode.READ_ONLY: - args.extend(['--bindmount_ro', f'{spec.host_path}:{DEFAULT_BOX_MOUNT_PATH}']) + args.extend(['--bindmount_ro', f'{spec.host_path}:{spec.mount_path}']) else: - args.extend(['--rw_bind', f'{spec.host_path}:{DEFAULT_BOX_MOUNT_PATH}']) + args.extend(['--rw_bind', f'{spec.host_path}:{spec.mount_path}']) else: workspace_dir = session_dir / 'workspace' - args.extend(['--rw_bind', f'{workspace_dir}:{DEFAULT_BOX_MOUNT_PATH}']) + args.extend(['--rw_bind', f'{workspace_dir}:{spec.mount_path}']) # /tmp and /home are always per-session writable. tmp_dir = session_dir / 'tmp' diff --git a/src/langbot_plugin/box/runtime.py b/src/langbot_plugin/box/runtime.py index cac6b66..ef4d2ef 100644 --- a/src/langbot_plugin/box/runtime.py +++ b/src/langbot_plugin/box/runtime.py @@ -219,7 +219,8 @@ async def _get_or_create_session(self, spec: BoxSpec) -> _RuntimeSession: f'image={info.image} ' f'network={info.network.value} ' f'host_path={info.host_path} ' - f'host_path_mode={info.host_path_mode.value}' + f'host_path_mode={info.host_path_mode.value} ' + f'mount_path={info.mount_path}' ) return runtime_session @@ -284,6 +285,7 @@ def _assert_session_compatible(self, session: BoxSessionInfo, spec: BoxSpec): 'image', 'host_path', 'host_path_mode', + 'mount_path', 'cpus', 'memory_mb', 'pids_limit', diff --git a/tests/box/test_nsjail_backend.py b/tests/box/test_nsjail_backend.py index b9e7f68..fca4e2b 100644 --- a/tests/box/test_nsjail_backend.py +++ b/tests/box/test_nsjail_backend.py @@ -99,11 +99,13 @@ async def test_start_session_with_host_path(backend, tmp_base): cmd='ls', host_path='/some/path', host_path_mode=BoxHostMountMode.READ_WRITE, + mount_path='/project', ) info = await backend.start_session(spec) assert info.host_path == '/some/path' assert info.host_path_mode == BoxHostMountMode.READ_WRITE + assert info.mount_path == '/project' # ── stop_session ────────────────────────────────────────────────────── @@ -216,6 +218,40 @@ def test_build_nsjail_args_host_path_ro(backend, tmp_base): assert '/data/project:/workspace' in ro_binds +def test_build_nsjail_args_uses_custom_mount_path(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + session_dir = tmp_base / 'test_custom_mount' + for d in ('workspace', 'tmp', 'home'): + (session_dir / d).mkdir(parents=True) + + session = BoxSessionInfo( + session_id='s4', + backend_name='nsjail', + backend_session_id=str(session_dir), + image='host', + network=BoxNetworkMode.OFF, + host_path='/data/project', + host_path_mode=BoxHostMountMode.READ_WRITE, + mount_path='/project', + created_at='2024-01-01T00:00:00+00:00', + last_used_at='2024-01-01T00:00:00+00:00', + ) + spec = BoxSpec( + session_id='s4', + cmd='pwd', + workdir='/project/src', + host_path='/data/project', + host_path_mode=BoxHostMountMode.READ_WRITE, + mount_path='/project', + ) + + args = backend._build_nsjail_args(session, spec, session_dir) + + rw_binds = [args[i + 1] for i, a in enumerate(args) if a == '--rw_bind'] + assert '/data/project:/project' in rw_binds + assert args[args.index('--cwd') + 1] == '/project/src' + + def test_build_resource_limits_cgroup(backend): backend._cgroup_v2_available = True spec = BoxSpec(session_id='s', cmd='x', cpus=2.0, memory_mb=1024, pids_limit=256) From cf7ec2d7fd3d5bdfff59c5bf619d1138bf04284a Mon Sep 17 00:00:00 2001 From: youhuanghe <1051233107@qq.com> Date: Tue, 24 Mar 2026 04:01:51 +0000 Subject: [PATCH 06/14] fix(box-runtime): terminate managed processes promptly on session deletion --- src/langbot_plugin/box/client.py | 2 +- src/langbot_plugin/box/runtime.py | 17 +++++++---------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/src/langbot_plugin/box/client.py b/src/langbot_plugin/box/client.py index 36a525a..8b8e17b 100644 --- a/src/langbot_plugin/box/client.py +++ b/src/langbot_plugin/box/client.py @@ -147,7 +147,7 @@ async def get_backend_info(self) -> dict: return await self._call(LangBotToBoxAction.GET_BACKEND_INFO, {}) async def delete_session(self, session_id: str) -> None: - await self._call(LangBotToBoxAction.DELETE_SESSION, {'session_id': session_id}) + await self._call(LangBotToBoxAction.DELETE_SESSION, {'session_id': session_id}, timeout=30.0) async def create_session(self, spec: BoxSpec) -> dict: return await self._call(LangBotToBoxAction.CREATE_SESSION, spec.model_dump(mode='json')) diff --git a/src/langbot_plugin/box/runtime.py b/src/langbot_plugin/box/runtime.py index ef4d2ef..b09ab32 100644 --- a/src/langbot_plugin/box/runtime.py +++ b/src/langbot_plugin/box/runtime.py @@ -347,22 +347,19 @@ async def _terminate_managed_process(self, runtime_session: _RuntimeSession) -> pass try: + if process.returncode is None: + try: + process.terminate() + except ProcessLookupError: + pass await asyncio.wait_for(asyncio.shield(process.wait()), timeout=5) except asyncio.TimeoutError: if process.returncode is None: try: - process.terminate() + process.kill() except ProcessLookupError: pass - try: - await asyncio.wait_for(asyncio.shield(process.wait()), timeout=5) - except asyncio.TimeoutError: - if process.returncode is None: - try: - process.kill() - except ProcessLookupError: - pass - await process.wait() + await process.wait() finally: managed_process.exit_code = process.returncode managed_process.exited_at = dt.datetime.now(_UTC) From 31c763c5d249d654b17f9feb0da8fe2a6f74bef8 Mon Sep 17 00:00:00 2001 From: youhuanghe <1051233107@qq.com> Date: Tue, 24 Mar 2026 06:55:44 +0000 Subject: [PATCH 07/14] fix: update doc --- src/langbot_plugin/box/errors.py | 2 +- src/langbot_plugin/box/runtime.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/langbot_plugin/box/errors.py b/src/langbot_plugin/box/errors.py index f6a8e86..ecdde7a 100644 --- a/src/langbot_plugin/box/errors.py +++ b/src/langbot_plugin/box/errors.py @@ -6,7 +6,7 @@ class BoxError(RuntimeError): class BoxValidationError(BoxError): - """Raised when sandbox_exec arguments are invalid.""" + """Raised when exec tool arguments are invalid.""" class BoxBackendUnavailableError(BoxError): diff --git a/src/langbot_plugin/box/runtime.py b/src/langbot_plugin/box/runtime.py index b09ab32..b91bc02 100644 --- a/src/langbot_plugin/box/runtime.py +++ b/src/langbot_plugin/box/runtime.py @@ -229,7 +229,7 @@ async def _get_backend(self) -> BaseSandboxBackend: self._backend = await self._select_backend() if self._backend is None: raise BoxBackendUnavailableError( - 'LangBot Box backend unavailable. Install and start Podman, Docker, or nsjail before using sandbox_exec.' + 'LangBot Box backend unavailable. Install and start Podman, Docker, or nsjail before using exec.' ) return self._backend @@ -297,7 +297,7 @@ def _assert_session_compatible(self, session: BoxSessionInfo, spec: BoxSpec): if session_val != spec_val: display = session_val.value if hasattr(session_val, 'value') else session_val raise BoxSessionConflictError( - f'sandbox_exec session {spec.session_id} already exists with {field}={display}' + f'Box session {spec.session_id} already exists with {field}={display}' ) async def _drain_managed_process_stderr(self, session_id: str, managed_process: _ManagedProcess) -> None: From dea5820a46ad3283662f24bb3bfcb472a395792e Mon Sep 17 00:00:00 2001 From: youhuanghe <1051233107@qq.com> Date: Tue, 24 Mar 2026 07:58:34 +0000 Subject: [PATCH 08/14] fix(box): repair sdk runtime entrypoints and nsjail session parity --- pyproject.toml | 1 + src/langbot_plugin/box/nsjail_backend.py | 4 ++-- src/langbot_plugin/box/server.py | 4 ++-- src/langbot_plugin/cli/__init__.py | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 0631261..1f29456 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ authors = [ requires-python = ">=3.10" dependencies = [ "aiofiles>=24.1.0", + "aiohttp>=3.9.0", "dotenv>=0.9.9", "httpx>=0.28.1", "jinja2>=3.1.6", diff --git a/src/langbot_plugin/box/nsjail_backend.py b/src/langbot_plugin/box/nsjail_backend.py index b972c7a..32a3437 100644 --- a/src/langbot_plugin/box/nsjail_backend.py +++ b/src/langbot_plugin/box/nsjail_backend.py @@ -151,7 +151,7 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: cpus=spec.cpus, memory_mb=spec.memory_mb, pids_limit=spec.pids_limit, - read_only_rootfs=True, # always true for nsjail + read_only_rootfs=spec.read_only_rootfs, created_at=now, last_used_at=now, ) @@ -233,7 +233,7 @@ async def start_managed_process( cpus=session.cpus, memory_mb=session.memory_mb, pids_limit=session.pids_limit, - read_only_rootfs=True, + read_only_rootfs=session.read_only_rootfs, ) args = self._build_nsjail_args(session, pseudo_spec, session_dir) diff --git a/src/langbot_plugin/box/server.py b/src/langbot_plugin/box/server.py index e704500..95e5d80 100644 --- a/src/langbot_plugin/box/server.py +++ b/src/langbot_plugin/box/server.py @@ -251,14 +251,14 @@ async def new_connection_callback(connection: Connection) -> None: await runner.cleanup() -def main() -> None: +def main(argv: list[str] | None = None) -> None: parser = argparse.ArgumentParser(description='LangBot Box Runtime Service') parser.add_argument('--host', default='0.0.0.0', help='Bind address') parser.add_argument('--port', type=int, default=5410, help='Bind port (ws relay)') parser.add_argument( '--mode', choices=['stdio', 'ws'], default='stdio', help='Control channel transport (default: stdio)' ) - args = parser.parse_args() + args = parser.parse_args(argv) configure_process_logging(stream=sys.stderr) asyncio.run(_run_server(args.host, args.port, args.mode)) diff --git a/src/langbot_plugin/cli/__init__.py b/src/langbot_plugin/cli/__init__.py index b4f388b..ff9174c 100644 --- a/src/langbot_plugin/cli/__init__.py +++ b/src/langbot_plugin/cli/__init__.py @@ -167,7 +167,7 @@ def main(): runtime_app.main(args) case "box": from langbot_plugin.box.server import main as box_main - box_main() + box_main(sys.argv[2:]) case _: cli_print("unknown_command", args.command) sys.exit(1) From 8e63877f5f3cdc7920861eba9a4c280859ede521 Mon Sep 17 00:00:00 2001 From: youhuanghe <1051233107@qq.com> Date: Thu, 26 Mar 2026 10:45:24 +0000 Subject: [PATCH 09/14] feat(box): add session workspace quota enforcement and SDK quota metadata --- src/langbot_plugin/box/backend.py | 3 ++- src/langbot_plugin/box/models.py | 10 ++++++++++ src/langbot_plugin/box/nsjail_backend.py | 4 +++- src/langbot_plugin/box/runtime.py | 4 +++- 4 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/langbot_plugin/box/backend.py b/src/langbot_plugin/box/backend.py index 40b6a67..47bcafb 100644 --- a/src/langbot_plugin/box/backend.py +++ b/src/langbot_plugin/box/backend.py @@ -130,7 +130,7 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: f'image={spec.image} network={spec.network.value} ' f'host_path={spec.host_path} host_path_mode={spec.host_path_mode.value} mount_path={spec.mount_path} ' f'cpus={spec.cpus} memory_mb={spec.memory_mb} pids_limit={spec.pids_limit} ' - f'read_only_rootfs={spec.read_only_rootfs}' + f'read_only_rootfs={spec.read_only_rootfs} workspace_quota_mb={spec.workspace_quota_mb}' ) await self._run_command(args, timeout_sec=30, check=True) @@ -148,6 +148,7 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: memory_mb=spec.memory_mb, pids_limit=spec.pids_limit, read_only_rootfs=spec.read_only_rootfs, + workspace_quota_mb=spec.workspace_quota_mb, created_at=now, last_used_at=now, ) diff --git a/src/langbot_plugin/box/models.py b/src/langbot_plugin/box/models.py index 8de58dc..e3d8472 100644 --- a/src/langbot_plugin/box/models.py +++ b/src/langbot_plugin/box/models.py @@ -47,6 +47,7 @@ class BoxSpec(pydantic.BaseModel): memory_mb: int = 512 pids_limit: int = 128 read_only_rootfs: bool = True + workspace_quota_mb: int = 0 @pydantic.model_validator(mode='before') @classmethod @@ -102,6 +103,13 @@ def validate_pids_limit(cls, value: int) -> int: raise ValueError('pids_limit must be at least 1') return value + @pydantic.field_validator('workspace_quota_mb') + @classmethod + def validate_workspace_quota_mb(cls, value: int) -> int: + if value < 0: + raise ValueError('workspace_quota_mb must be greater than or equal to 0') + return value + @pydantic.field_validator('session_id') @classmethod def validate_session_id(cls, value: str) -> str: @@ -162,6 +170,7 @@ class BoxProfile(pydantic.BaseModel): memory_mb: int = 512 pids_limit: int = 128 read_only_rootfs: bool = True + workspace_quota_mb: int = 0 locked: frozenset[str] = frozenset() model_config = pydantic.ConfigDict(frozen=True) @@ -225,6 +234,7 @@ class BoxSessionInfo(pydantic.BaseModel): memory_mb: int = 512 pids_limit: int = 128 read_only_rootfs: bool = True + workspace_quota_mb: int = 0 created_at: dt.datetime last_used_at: dt.datetime diff --git a/src/langbot_plugin/box/nsjail_backend.py b/src/langbot_plugin/box/nsjail_backend.py index 32a3437..52e913e 100644 --- a/src/langbot_plugin/box/nsjail_backend.py +++ b/src/langbot_plugin/box/nsjail_backend.py @@ -136,7 +136,8 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: f'session_id={spec.session_id} session_dir={session_dir} ' f'network={spec.network.value} ' f'host_path={spec.host_path} host_path_mode={spec.host_path_mode.value} mount_path={spec.mount_path} ' - f'cpus={spec.cpus} memory_mb={spec.memory_mb} pids_limit={spec.pids_limit}' + f'cpus={spec.cpus} memory_mb={spec.memory_mb} pids_limit={spec.pids_limit} ' + f'workspace_quota_mb={spec.workspace_quota_mb}' ) return BoxSessionInfo( @@ -152,6 +153,7 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: memory_mb=spec.memory_mb, pids_limit=spec.pids_limit, read_only_rootfs=spec.read_only_rootfs, + workspace_quota_mb=spec.workspace_quota_mb, created_at=now, last_used_at=now, ) diff --git a/src/langbot_plugin/box/runtime.py b/src/langbot_plugin/box/runtime.py index b91bc02..e371f07 100644 --- a/src/langbot_plugin/box/runtime.py +++ b/src/langbot_plugin/box/runtime.py @@ -220,7 +220,8 @@ async def _get_or_create_session(self, spec: BoxSpec) -> _RuntimeSession: f'network={info.network.value} ' f'host_path={info.host_path} ' f'host_path_mode={info.host_path_mode.value} ' - f'mount_path={info.mount_path}' + f'mount_path={info.mount_path} ' + f'workspace_quota_mb={info.workspace_quota_mb}' ) return runtime_session @@ -290,6 +291,7 @@ def _assert_session_compatible(self, session: BoxSessionInfo, spec: BoxSpec): 'memory_mb', 'pids_limit', 'read_only_rootfs', + 'workspace_quota_mb', ) for field in _COMPAT_FIELDS: session_val = getattr(session, field) From 120817ad1858f71de85b1ffcd4a3cfed028e248b Mon Sep 17 00:00:00 2001 From: youhuanghe <1051233107@qq.com> Date: Thu, 9 Apr 2026 10:28:56 +0000 Subject: [PATCH 10/14] feat(box): add Windows support for Docker backend Accept Windows-style absolute paths (e.g. C:\Users\...) in host_path validation, and make security path comparison case-insensitive and separator-aware on Windows. Only the Docker backend is supported on Windows (via Docker Desktop); Podman and nsjail remain Linux-only. --- src/langbot_plugin/box/models.py | 4 +++- src/langbot_plugin/box/security.py | 23 +++++++++++++++++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/langbot_plugin/box/models.py b/src/langbot_plugin/box/models.py index e3d8472..aa07b44 100644 --- a/src/langbot_plugin/box/models.py +++ b/src/langbot_plugin/box/models.py @@ -2,6 +2,8 @@ import datetime as dt import enum +import ntpath +import posixpath import pydantic @@ -129,7 +131,7 @@ def validate_host_path(cls, value: str | None) -> str | None: if value is None: return None value = value.strip() - if not value.startswith('/'): + if not (posixpath.isabs(value) or ntpath.isabs(value)): raise ValueError('host_path must be an absolute host path') return value diff --git a/src/langbot_plugin/box/security.py b/src/langbot_plugin/box/security.py index d5a8c51..2e8ed72 100644 --- a/src/langbot_plugin/box/security.py +++ b/src/langbot_plugin/box/security.py @@ -1,11 +1,12 @@ from __future__ import annotations import os +import sys from .errors import BoxValidationError from .models import BoxSpec -BLOCKED_HOST_PATHS = frozenset( +_BLOCKED_HOST_PATHS_POSIX = frozenset( { '/etc', '/proc', @@ -22,6 +23,22 @@ } ) +_BLOCKED_HOST_PATHS_WINDOWS = frozenset( + { + r'C:\Windows', + r'C:\Program Files', + r'C:\Program Files (x86)', + r'C:\ProgramData', + r'\\.\pipe\docker_engine', + } +) + +BLOCKED_HOST_PATHS = ( + _BLOCKED_HOST_PATHS_POSIX | _BLOCKED_HOST_PATHS_WINDOWS + if sys.platform == 'win32' + else _BLOCKED_HOST_PATHS_POSIX +) + def validate_sandbox_security(spec: BoxSpec) -> None: """Validate that a BoxSpec does not request dangerous container config. @@ -30,6 +47,8 @@ def validate_sandbox_security(spec: BoxSpec) -> None: """ if spec.host_path: real = os.path.realpath(spec.host_path) + sep = os.sep + _norm = os.path.normcase for blocked in BLOCKED_HOST_PATHS: - if real == blocked or real.startswith(blocked + '/'): + if _norm(real) == _norm(blocked) or _norm(real).startswith(_norm(blocked) + sep): raise BoxValidationError(f'host_path {spec.host_path} is blocked for security') From 8c71ec5fe3c0aa5ea21709a1a6e5eb72685d9bfd Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Fri, 17 Apr 2026 23:52:21 +0800 Subject: [PATCH 11/14] refactor(box): merge action RPC and WS relay into single port MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the two-port scheme (5410 relay + 5411 RPC) with a single aiohttp server on port 5410, using path-based routing: /rpc/ws — Action RPC (control channel) /v1/sessions/{id}/managed-process/ws — Managed process stdio relay Add AiohttpWSConnection adapter to bridge aiohttp WebSocketResponse to the SDK Connection interface, keeping Handler/BoxServerHandler unchanged. --- src/langbot_plugin/box/server.py | 110 ++++++++++++++++++++++++------- 1 file changed, 88 insertions(+), 22 deletions(-) diff --git a/src/langbot_plugin/box/server.py b/src/langbot_plugin/box/server.py index 95e5d80..aca5c94 100644 --- a/src/langbot_plugin/box/server.py +++ b/src/langbot_plugin/box/server.py @@ -3,8 +3,12 @@ Usage (stdio, launched by LangBot as subprocess): python -m langbot_plugin.box.server -Usage (ws + ws relay, for remote/docker mode): - python -m langbot_plugin.box.server --port 5410 +Usage (ws, for remote/docker mode): + python -m langbot_plugin.box.server --mode ws --port 5410 + +All WebSocket endpoints share a single port (default 5410): + /rpc/ws — Action RPC (control channel) + /v1/sessions/{session_id}/managed-process/ws — Managed process stdio relay """ from __future__ import annotations @@ -12,6 +16,7 @@ import argparse import asyncio import datetime as dt +import json import logging import sys from typing import Any @@ -20,6 +25,7 @@ from aiohttp import web from langbot_plugin.entities.io.actions.enums import CommonAction +from langbot_plugin.entities.io.errors import ConnectionClosedError from langbot_plugin.entities.io.resp import ActionResponse from langbot_plugin.runtime.io.connection import Connection from langbot_plugin.runtime.io.handler import Handler @@ -41,6 +47,47 @@ def _result_to_dict(result: BoxExecutionResult) -> dict: return result.model_dump(mode='json') +# ── aiohttp WebSocket → Connection adapter ─────────────────────────── + + +class AiohttpWSConnection(Connection): + """Adapt an aiohttp ``WebSocketResponse`` to the SDK ``Connection`` interface. + + This allows ``BoxServerHandler`` (and therefore ``Handler``) to work over + an aiohttp WebSocket without any changes to the handler/IO layer. + """ + + def __init__(self, ws: web.WebSocketResponse) -> None: + self._ws = ws + self._send_lock = asyncio.Lock() + + async def send(self, message: str) -> None: + async with self._send_lock: + try: + await self._ws.send_str(message) + except ConnectionResetError: + raise ConnectionClosedError('Connection closed during send') + + async def receive(self) -> str: + msg = await self._ws.receive() + if msg.type == web.WSMsgType.TEXT: + return msg.data + if msg.type in ( + web.WSMsgType.CLOSE, + web.WSMsgType.CLOSING, + web.WSMsgType.CLOSED, + web.WSMsgType.ERROR, + ): + raise ConnectionClosedError('Connection closed') + raise ConnectionClosedError(f'Unexpected message type: {msg.type}') + + async def close(self) -> None: + await self._ws.close() + + +# ── BoxServerHandler ───────────────────────────────────────────────── + + class BoxServerHandler(Handler): """Server-side handler that registers box actions backed by BoxRuntime.""" @@ -122,7 +169,7 @@ async def shutdown(data: dict[str, Any]) -> ActionResponse: return ActionResponse.success({}) -# ── Managed process WebSocket relay (aiohttp) ──────────────────────── +# ── Managed process WebSocket relay ────────────────────────────────── def _error_response(exc: Exception) -> web.Response: @@ -198,10 +245,31 @@ async def _ws_to_stdin() -> None: return ws -def create_ws_relay_app(runtime: BoxRuntime) -> web.Application: - """Create a minimal aiohttp app that only serves the managed-process ws relay.""" +# ── Action RPC WebSocket handler ───────────────────────────────────── + + +async def handle_rpc_ws(request: web.Request) -> web.StreamResponse: + """Handle action RPC over a single aiohttp WebSocket connection.""" + runtime: BoxRuntime = request.app['runtime'] + + ws = web.WebSocketResponse() + await ws.prepare(request) + + connection = AiohttpWSConnection(ws) + handler = BoxServerHandler(connection, runtime) + await handler.run() + + return ws + + +# ── App factory ────────────────────────────────────────────────────── + + +def create_app(runtime: BoxRuntime) -> web.Application: + """Create the aiohttp app with all WebSocket routes on a single port.""" app = web.Application() app['runtime'] = runtime + app.router.add_get('/rpc/ws', handle_rpc_ws) app.router.add_get('/v1/sessions/{session_id}/managed-process/ws', handle_managed_process_ws) return app @@ -213,38 +281,36 @@ async def _run_server(host: str, port: int, mode: str) -> None: runtime = BoxRuntime(logger=logger) await runtime.initialize() - # Start aiohttp for ws relay (non-fatal — managed process attach - # degrades gracefully if the port is unavailable). + # Start aiohttp — serves managed-process relay and (in ws mode) + # also the action RPC endpoint, all on the same port. runner: web.AppRunner | None = None try: - ws_app = create_ws_relay_app(runtime) + ws_app = create_app(runtime) runner = web.AppRunner(ws_app) await runner.setup() site = web.TCPSite(runner, host, port) await site.start() - logger.info(f'Box ws relay listening on {host}:{port}') + logger.info(f'Box server listening on {host}:{port}') except OSError as exc: - logger.warning(f'Box ws relay failed to bind {host}:{port}: {exc}') + logger.warning(f'Box server failed to bind {host}:{port}: {exc}') logger.warning('Managed process WebSocket attach will be unavailable.') - async def new_connection_callback(connection: Connection) -> None: - handler = BoxServerHandler(connection, runtime) - await handler.run() - try: if mode == 'stdio': from langbot_plugin.runtime.io.controllers.stdio.server import StdioServerController + async def new_connection_callback(connection: Connection) -> None: + handler = BoxServerHandler(connection, runtime) + await handler.run() + ctrl = StdioServerController() await ctrl.run(new_connection_callback) else: - from langbot_plugin.runtime.io.controllers.ws.server import WebSocketServerController - - # Action RPC uses port+1 to avoid conflict with ws relay - rpc_port = port + 1 - logger.info(f'Box action RPC (ws) listening on {host}:{rpc_port}') - ctrl = WebSocketServerController(rpc_port) - await ctrl.run(new_connection_callback) + # In ws mode, action RPC is served via aiohttp on /rpc/ws. + # Keep the server alive until cancelled. + logger.info(f'Box action RPC available at ws://{host}:{port}/rpc/ws') + stop_event = asyncio.Event() + await stop_event.wait() finally: await runtime.shutdown() if runner is not None: @@ -254,7 +320,7 @@ async def new_connection_callback(connection: Connection) -> None: def main(argv: list[str] | None = None) -> None: parser = argparse.ArgumentParser(description='LangBot Box Runtime Service') parser.add_argument('--host', default='0.0.0.0', help='Bind address') - parser.add_argument('--port', type=int, default=5410, help='Bind port (ws relay)') + parser.add_argument('--port', type=int, default=5410, help='Bind port') parser.add_argument( '--mode', choices=['stdio', 'ws'], default='stdio', help='Control channel transport (default: stdio)' ) From 7209d38e889b83836f926a94c777a1931e06a405 Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Sat, 18 Apr 2026 22:11:06 +0800 Subject: [PATCH 12/14] feat(box): add extra_mounts support to BoxSpec for multi-mount containers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add BoxMountSpec model and extra_mounts field to BoxSpec, allowing multiple bind mounts per container. Docker and nsjail backends iterate extra_mounts to append additional -v / --rw_bind flags at session creation time. Backward compatible — existing single-mount usage is unchanged. --- src/langbot_plugin/box/backend.py | 4 ++++ src/langbot_plugin/box/models.py | 25 ++++++++++++++++++++++++ src/langbot_plugin/box/nsjail_backend.py | 6 ++++++ 3 files changed, 35 insertions(+) diff --git a/src/langbot_plugin/box/backend.py b/src/langbot_plugin/box/backend.py index 47bcafb..7ba35b0 100644 --- a/src/langbot_plugin/box/backend.py +++ b/src/langbot_plugin/box/backend.py @@ -122,6 +122,10 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: mount_spec = f'{spec.host_path}:{spec.mount_path}:{spec.host_path_mode.value}' args.extend(['-v', mount_spec]) + for mount in spec.extra_mounts: + if mount.mode != BoxHostMountMode.NONE: + args.extend(['-v', f'{mount.host_path}:{mount.mount_path}:{mount.mode.value}']) + args.extend([spec.image, 'sh', '-lc', 'while true; do sleep 3600; done']) self.logger.info( diff --git a/src/langbot_plugin/box/models.py b/src/langbot_plugin/box/models.py index aa07b44..97cfc34 100644 --- a/src/langbot_plugin/box/models.py +++ b/src/langbot_plugin/box/models.py @@ -33,6 +33,30 @@ class BoxManagedProcessStatus(str, enum.Enum): EXITED = 'exited' +class BoxMountSpec(pydantic.BaseModel): + """A single additional bind mount specification.""" + + host_path: str + mount_path: str + mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE + + @pydantic.field_validator('host_path') + @classmethod + def validate_host_path(cls, value: str) -> str: + value = value.strip() + if not (posixpath.isabs(value) or ntpath.isabs(value)): + raise ValueError('host_path must be an absolute host path') + return value + + @pydantic.field_validator('mount_path') + @classmethod + def validate_mount_path(cls, value: str) -> str: + value = value.strip() + if not value.startswith('/'): + raise ValueError('mount_path must be an absolute path inside the sandbox') + return value + + class BoxSpec(pydantic.BaseModel): cmd: str = '' workdir: str = DEFAULT_BOX_MOUNT_PATH @@ -44,6 +68,7 @@ class BoxSpec(pydantic.BaseModel): host_path: str | None = None host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE mount_path: str = DEFAULT_BOX_MOUNT_PATH + extra_mounts: list[BoxMountSpec] = pydantic.Field(default_factory=list) # Resource limits cpus: float = 1.0 memory_mb: int = 512 diff --git a/src/langbot_plugin/box/nsjail_backend.py b/src/langbot_plugin/box/nsjail_backend.py index 52e913e..900c5d3 100644 --- a/src/langbot_plugin/box/nsjail_backend.py +++ b/src/langbot_plugin/box/nsjail_backend.py @@ -370,6 +370,12 @@ def _build_writable_mounts( workspace_dir = session_dir / 'workspace' args.extend(['--rw_bind', f'{workspace_dir}:{spec.mount_path}']) + for mount in spec.extra_mounts: + if mount.mode == BoxHostMountMode.READ_ONLY: + args.extend(['--bindmount_ro', f'{mount.host_path}:{mount.mount_path}']) + elif mount.mode == BoxHostMountMode.READ_WRITE: + args.extend(['--rw_bind', f'{mount.host_path}:{mount.mount_path}']) + # /tmp and /home are always per-session writable. tmp_dir = session_dir / 'tmp' home_dir = session_dir / 'home' From 529088ebc78d0d2ee7e9427012c731b30abc6dc3 Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Mon, 20 Apr 2026 22:22:00 +0800 Subject: [PATCH 13/14] feat(box): add shared MCP container support with persistent sessions and multi-process - Add persistent flag to BoxSpec/BoxSessionInfo to keep sessions across shutdowns - Change managed_process to managed_processes dict (keyed by process_id) - Support multiple managed processes per session for shared containers - Skip persistent sessions during shutdown and TTL reaping - Conditionally add --rm flag only for non-persistent containers - Add process_id to RPC handlers and WebSocket relay routes - Update nsjail backend to pass persistent field --- src/langbot_plugin/box/backend.py | 15 ++--- src/langbot_plugin/box/client.py | 13 +++-- src/langbot_plugin/box/models.py | 6 +- src/langbot_plugin/box/nsjail_backend.py | 1 + src/langbot_plugin/box/runtime.py | 74 ++++++++++++++---------- src/langbot_plugin/box/security.py | 2 - src/langbot_plugin/box/server.py | 19 ++++-- 7 files changed, 79 insertions(+), 51 deletions(-) diff --git a/src/langbot_plugin/box/backend.py b/src/langbot_plugin/box/backend.py index 7ba35b0..99e6f7d 100644 --- a/src/langbot_plugin/box/backend.py +++ b/src/langbot_plugin/box/backend.py @@ -95,7 +95,12 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: self.command, 'run', '-d', - '--rm', + ] + + if not spec.persistent: + args.append('--rm') + + args.extend([ '--name', container_name, '--label', @@ -104,7 +109,7 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: f'langbot.session_id={spec.session_id}', '--label', f'langbot.box.instance_id={self.instance_id}', - ] + ]) if spec.network == BoxNetworkMode.OFF: args.extend(['--network', 'none']) @@ -148,6 +153,7 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: host_path=spec.host_path, host_path_mode=spec.host_path_mode, mount_path=spec.mount_path, + persistent=spec.persistent, cpus=spec.cpus, memory_mb=spec.memory_mb, pids_limit=spec.pids_limit, @@ -383,11 +389,6 @@ def _format_cli_error(self, message: str) -> str: return f'{self.name} backend error: {message}' -class PodmanBackend(CLISandboxBackend): - def __init__(self, logger: logging.Logger): - super().__init__(logger=logger, command='podman', backend_name='podman') - - class DockerBackend(CLISandboxBackend): def __init__(self, logger: logging.Logger): super().__init__(logger=logger, command='docker', backend_name='docker') diff --git a/src/langbot_plugin/box/client.py b/src/langbot_plugin/box/client.py index 8b8e17b..9175c59 100644 --- a/src/langbot_plugin/box/client.py +++ b/src/langbot_plugin/box/client.py @@ -50,7 +50,7 @@ async def create_session(self, spec: BoxSpec) -> dict: ... async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> BoxManagedProcessInfo: ... @abc.abstractmethod - async def get_managed_process(self, session_id: str) -> BoxManagedProcessInfo: ... + async def get_managed_process(self, session_id: str, process_id: str = 'default') -> BoxManagedProcessInfo: ... @abc.abstractmethod async def get_session(self, session_id: str) -> dict: ... @@ -159,11 +159,14 @@ async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSp ) return BoxManagedProcessInfo.model_validate(data) - async def get_managed_process(self, session_id: str) -> BoxManagedProcessInfo: - data = await self._call(LangBotToBoxAction.GET_MANAGED_PROCESS, {'session_id': session_id}) + async def get_managed_process(self, session_id: str, process_id: str = 'default') -> BoxManagedProcessInfo: + data = await self._call(LangBotToBoxAction.GET_MANAGED_PROCESS, { + 'session_id': session_id, + 'process_id': process_id, + }) return BoxManagedProcessInfo.model_validate(data) - def get_managed_process_websocket_url(self, session_id: str, ws_relay_base_url: str) -> str: + def get_managed_process_websocket_url(self, session_id: str, ws_relay_base_url: str, process_id: str = 'default') -> str: base = ws_relay_base_url if base.startswith('https://'): scheme = 'wss://' @@ -174,4 +177,4 @@ def get_managed_process_websocket_url(self, session_id: str, ws_relay_base_url: else: scheme = 'ws://' suffix = base - return f'{scheme}{suffix}/v1/sessions/{session_id}/managed-process/ws' + return f'{scheme}{suffix}/v1/sessions/{session_id}/managed-process/{process_id}/ws' diff --git a/src/langbot_plugin/box/models.py b/src/langbot_plugin/box/models.py index 97cfc34..fa34e36 100644 --- a/src/langbot_plugin/box/models.py +++ b/src/langbot_plugin/box/models.py @@ -8,7 +8,7 @@ import pydantic -DEFAULT_BOX_IMAGE = 'python:3.11-slim' +DEFAULT_BOX_IMAGE = 'rockchin/langbot-sandbox:latest' DEFAULT_BOX_MOUNT_PATH = '/workspace' @@ -69,6 +69,7 @@ class BoxSpec(pydantic.BaseModel): host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE mount_path: str = DEFAULT_BOX_MOUNT_PATH extra_mounts: list[BoxMountSpec] = pydantic.Field(default_factory=list) + persistent: bool = False # Resource limits cpus: float = 1.0 memory_mb: int = 512 @@ -257,6 +258,7 @@ class BoxSessionInfo(pydantic.BaseModel): host_path: str | None = None host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE mount_path: str = DEFAULT_BOX_MOUNT_PATH + persistent: bool = False cpus: float = 1.0 memory_mb: int = 512 pids_limit: int = 128 @@ -267,6 +269,7 @@ class BoxSessionInfo(pydantic.BaseModel): class BoxManagedProcessSpec(pydantic.BaseModel): + process_id: str = 'default' command: str args: list[str] = pydantic.Field(default_factory=list) env: dict[str, str] = pydantic.Field(default_factory=dict) @@ -301,6 +304,7 @@ def validate_cwd(cls, value: str) -> str: class BoxManagedProcessInfo(pydantic.BaseModel): session_id: str + process_id: str = 'default' status: BoxManagedProcessStatus command: str args: list[str] diff --git a/src/langbot_plugin/box/nsjail_backend.py b/src/langbot_plugin/box/nsjail_backend.py index 900c5d3..921c8ff 100644 --- a/src/langbot_plugin/box/nsjail_backend.py +++ b/src/langbot_plugin/box/nsjail_backend.py @@ -154,6 +154,7 @@ async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: pids_limit=spec.pids_limit, read_only_rootfs=spec.read_only_rootfs, workspace_quota_mb=spec.workspace_quota_mb, + persistent=spec.persistent, created_at=now, last_used_at=now, ) diff --git a/src/langbot_plugin/box/runtime.py b/src/langbot_plugin/box/runtime.py index e371f07..8509289 100644 --- a/src/langbot_plugin/box/runtime.py +++ b/src/langbot_plugin/box/runtime.py @@ -7,7 +7,7 @@ import logging import uuid -from .backend import BaseSandboxBackend, DockerBackend, PodmanBackend +from .backend import BaseSandboxBackend, DockerBackend from .nsjail_backend import NsjailBackend from .errors import ( BoxBackendUnavailableError, @@ -51,7 +51,7 @@ def is_running(self) -> bool: class _RuntimeSession: info: BoxSessionInfo lock: asyncio.Lock - managed_process: _ManagedProcess | None = None + managed_processes: dict[str, _ManagedProcess] = dataclasses.field(default_factory=dict) class BoxRuntime: @@ -62,7 +62,7 @@ def __init__( session_ttl_sec: int = 300, ): self.logger = logger - self.backends = backends or [PodmanBackend(logger), DockerBackend(logger), NsjailBackend(logger)] + self.backends = backends or [DockerBackend(logger), NsjailBackend(logger)] self.session_ttl_sec = session_ttl_sec self._backend: BaseSandboxBackend | None = None self._sessions: dict[str, _RuntimeSession] = {} @@ -108,6 +108,9 @@ async def shutdown(self): async with self._lock: session_ids = list(self._sessions.keys()) for session_id in session_ids: + session = self._sessions.get(session_id) + if session is not None and session.info.persistent: + continue await self._drop_session_locked(session_id) async def create_session(self, spec: BoxSpec) -> dict: @@ -127,9 +130,12 @@ async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSp raise BoxSessionNotFoundError(f'session {session_id} not found') async with runtime_session.lock: - existing = runtime_session.managed_process + process_id = spec.process_id + existing = runtime_session.managed_processes.get(process_id) if existing is not None and existing.is_running: - raise BoxManagedProcessConflictError(f'session {session_id} already has a managed process') + raise BoxManagedProcessConflictError( + f'session {session_id} already has a running managed process with process_id={process_id}' + ) backend = await self._get_backend() process = await backend.start_managed_process(runtime_session.info, spec) @@ -140,19 +146,20 @@ async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSp attach_lock=asyncio.Lock(), stderr_chunks=collections.deque(), ) - runtime_session.managed_process = managed_process + runtime_session.managed_processes[process_id] = managed_process runtime_session.info.last_used_at = dt.datetime.now(_UTC) - asyncio.create_task(self._drain_managed_process_stderr(runtime_session.info.session_id, managed_process)) - asyncio.create_task(self._watch_managed_process(runtime_session.info.session_id, managed_process)) - return self._managed_process_to_dict(runtime_session.info.session_id, managed_process) + asyncio.create_task(self._drain_managed_process_stderr(runtime_session.info.session_id, process_id, managed_process)) + asyncio.create_task(self._watch_managed_process(runtime_session.info.session_id, process_id, managed_process)) + return self._managed_process_to_dict(runtime_session.info.session_id, process_id, managed_process) - def get_managed_process(self, session_id: str) -> dict: + def get_managed_process(self, session_id: str, process_id: str = 'default') -> dict: runtime_session = self._sessions.get(session_id) if runtime_session is None: raise BoxSessionNotFoundError(f'session {session_id} not found') - if runtime_session.managed_process is None: - raise BoxManagedProcessNotFoundError(f'session {session_id} has no managed process') - return self._managed_process_to_dict(session_id, runtime_session.managed_process) + managed_process = runtime_session.managed_processes.get(process_id) + if managed_process is None: + raise BoxManagedProcessNotFoundError(f'session {session_id} has no managed process with process_id={process_id}') + return self._managed_process_to_dict(session_id, process_id, managed_process) # ── Observability ───────────────────────────────────────────────── @@ -174,8 +181,11 @@ def get_session(self, session_id: str) -> dict: if runtime_session is None: raise BoxSessionNotFoundError(f'session {session_id} not found') result = self._session_to_dict(runtime_session.info) - if runtime_session.managed_process is not None: - result['managed_process'] = self._managed_process_to_dict(session_id, runtime_session.managed_process) + if runtime_session.managed_processes: + result['managed_processes'] = { + pid: self._managed_process_to_dict(session_id, pid, mp) + for pid, mp in runtime_session.managed_processes.items() + } return result async def get_status(self) -> dict: @@ -186,7 +196,8 @@ async def get_status(self) -> dict: 'managed_processes': sum( 1 for runtime_session in self._sessions.values() - if runtime_session.managed_process is not None and runtime_session.managed_process.is_running + for mp in runtime_session.managed_processes.values() + if mp.is_running ), 'session_ttl_sec': self.session_ttl_sec, } @@ -230,7 +241,7 @@ async def _get_backend(self) -> BaseSandboxBackend: self._backend = await self._select_backend() if self._backend is None: raise BoxBackendUnavailableError( - 'LangBot Box backend unavailable. Install and start Podman, Docker, or nsjail before using exec.' + 'LangBot Box backend unavailable. Install and start Docker or nsjail before using exec.' ) return self._backend @@ -244,7 +255,7 @@ async def _select_backend(self) -> BaseSandboxBackend | None: except Exception as exc: self.logger.warning(f'LangBot Box backend {backend.name} probe failed: {exc}') - self.logger.warning('LangBot Box backend unavailable: no supported backend (Podman, Docker, nsjail) is ready') + self.logger.warning('LangBot Box backend unavailable: no supported backend (Docker, nsjail) is ready') return None async def _reap_expired_sessions_locked(self): @@ -255,8 +266,9 @@ async def _reap_expired_sessions_locked(self): expired_session_ids = [ session_id for session_id, session in self._sessions.items() - if session.info.last_used_at < deadline - and not (session.managed_process is not None and session.managed_process.is_running) + if not session.info.persistent + and session.info.last_used_at < deadline + and not any(mp.is_running for mp in session.managed_processes.values()) ] for session_id in expired_session_ids: @@ -267,7 +279,8 @@ async def _drop_session_locked(self, session_id: str): if runtime_session is None or self._backend is None: return - await self._terminate_managed_process(runtime_session) + for mp in runtime_session.managed_processes.values(): + await self._terminate_managed_process(mp) try: self.logger.info( @@ -287,6 +300,7 @@ def _assert_session_compatible(self, session: BoxSessionInfo, spec: BoxSpec): 'host_path', 'host_path_mode', 'mount_path', + 'persistent', 'cpus', 'memory_mb', 'pids_limit', @@ -302,7 +316,7 @@ def _assert_session_compatible(self, session: BoxSessionInfo, spec: BoxSpec): f'Box session {spec.session_id} already exists with {field}={display}' ) - async def _drain_managed_process_stderr(self, session_id: str, managed_process: _ManagedProcess) -> None: + async def _drain_managed_process_stderr(self, session_id: str, process_id: str, managed_process: _ManagedProcess) -> None: stream = managed_process.process.stderr if stream is None: return @@ -323,22 +337,21 @@ async def _drain_managed_process_stderr(self, session_id: str, managed_process: ): removed = managed_process.stderr_chunks.popleft() managed_process.stderr_total_len -= len(removed) + 1 - self.logger.info(f'LangBot Box managed process stderr: session_id={session_id} {text}') + self.logger.info(f'LangBot Box managed process stderr: session_id={session_id} process_id={process_id} {text}') except Exception as exc: - self.logger.warning(f'Failed to drain managed process stderr for {session_id}: {exc}') + self.logger.warning(f'Failed to drain managed process stderr for {session_id}/{process_id}: {exc}') - async def _watch_managed_process(self, session_id: str, managed_process: _ManagedProcess) -> None: + async def _watch_managed_process(self, session_id: str, process_id: str, managed_process: _ManagedProcess) -> None: return_code = await managed_process.process.wait() managed_process.exit_code = return_code managed_process.exited_at = dt.datetime.now(_UTC) runtime_session = self._sessions.get(session_id) if runtime_session is not None: runtime_session.info.last_used_at = managed_process.exited_at - self.logger.info(f'LangBot Box managed process exited: session_id={session_id} return_code={return_code}') + self.logger.info(f'LangBot Box managed process exited: session_id={session_id} process_id={process_id} return_code={return_code}') - async def _terminate_managed_process(self, runtime_session: _RuntimeSession) -> None: - managed_process = runtime_session.managed_process - if managed_process is None or not managed_process.is_running: + async def _terminate_managed_process(self, managed_process: _ManagedProcess) -> None: + if not managed_process.is_running: return process = managed_process.process @@ -366,11 +379,12 @@ async def _terminate_managed_process(self, runtime_session: _RuntimeSession) -> managed_process.exit_code = process.returncode managed_process.exited_at = dt.datetime.now(_UTC) - def _managed_process_to_dict(self, session_id: str, managed_process: _ManagedProcess) -> dict: + def _managed_process_to_dict(self, session_id: str, process_id: str, managed_process: _ManagedProcess) -> dict: stderr_preview = '\n'.join(managed_process.stderr_chunks) status = BoxManagedProcessStatus.RUNNING if managed_process.is_running else BoxManagedProcessStatus.EXITED return BoxManagedProcessInfo( session_id=session_id, + process_id=process_id, status=status, command=managed_process.spec.command, args=managed_process.spec.args, diff --git a/src/langbot_plugin/box/security.py b/src/langbot_plugin/box/security.py index 2e8ed72..7b3b98e 100644 --- a/src/langbot_plugin/box/security.py +++ b/src/langbot_plugin/box/security.py @@ -18,8 +18,6 @@ '/var/run', '/run/docker.sock', '/var/run/docker.sock', - '/run/podman', - '/var/run/podman', } ) diff --git a/src/langbot_plugin/box/server.py b/src/langbot_plugin/box/server.py index aca5c94..c2bb8f8 100644 --- a/src/langbot_plugin/box/server.py +++ b/src/langbot_plugin/box/server.py @@ -7,8 +7,9 @@ python -m langbot_plugin.box.server --mode ws --port 5410 All WebSocket endpoints share a single port (default 5410): - /rpc/ws — Action RPC (control channel) - /v1/sessions/{session_id}/managed-process/ws — Managed process stdio relay + /rpc/ws — Action RPC (control channel) + /v1/sessions/{session_id}/managed-process/{process_id}/ws — Managed process stdio relay + /v1/sessions/{session_id}/managed-process/ws — Legacy (process_id defaults to 'default') """ from __future__ import annotations @@ -156,7 +157,10 @@ async def start_managed_process(data: dict[str, Any]) -> ActionResponse: @self.action(LangBotToBoxAction.GET_MANAGED_PROCESS) async def get_managed_process(data: dict[str, Any]) -> ActionResponse: - return ActionResponse.success(self._runtime.get_managed_process(data['session_id'])) + return ActionResponse.success(self._runtime.get_managed_process( + data['session_id'], + data.get('process_id', 'default'), + )) @self.action(LangBotToBoxAction.GET_BACKEND_INFO) async def get_backend_info(data: dict[str, Any]) -> ActionResponse: @@ -182,17 +186,18 @@ def _error_response(exc: Exception) -> web.Response: async def handle_managed_process_ws(request: web.Request) -> web.StreamResponse: runtime: BoxRuntime = request.app['runtime'] session_id = request.match_info['session_id'] + process_id = request.match_info.get('process_id', 'default') runtime_session = runtime._sessions.get(session_id) if runtime_session is None: return _error_response(BoxSessionNotFoundError(f'session {session_id} not found')) - managed_process = runtime_session.managed_process + managed_process = runtime_session.managed_processes.get(process_id) if managed_process is None: - return _error_response(BoxManagedProcessNotFoundError(f'session {session_id} has no managed process')) + return _error_response(BoxManagedProcessNotFoundError(f'session {session_id} has no managed process with process_id={process_id}')) if not managed_process.is_running: return _error_response( - BoxManagedProcessConflictError(f'managed process in session {session_id} is not running') + BoxManagedProcessConflictError(f'managed process {process_id} in session {session_id} is not running') ) ws = web.WebSocketResponse(protocols=('mcp',)) @@ -270,6 +275,8 @@ def create_app(runtime: BoxRuntime) -> web.Application: app = web.Application() app['runtime'] = runtime app.router.add_get('/rpc/ws', handle_rpc_ws) + app.router.add_get('/v1/sessions/{session_id}/managed-process/{process_id}/ws', handle_managed_process_ws) + # Backward-compatible route (defaults to process_id='default') app.router.add_get('/v1/sessions/{session_id}/managed-process/ws', handle_managed_process_ws) return app From d593734ed49faa51aa5203b6aee6192906ce006d Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Mon, 20 Apr 2026 22:40:55 +0800 Subject: [PATCH 14/14] fix: terminate stale managed process on restart instead of raising conflict error --- src/langbot_plugin/box/runtime.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/langbot_plugin/box/runtime.py b/src/langbot_plugin/box/runtime.py index 8509289..292740c 100644 --- a/src/langbot_plugin/box/runtime.py +++ b/src/langbot_plugin/box/runtime.py @@ -11,7 +11,6 @@ from .nsjail_backend import NsjailBackend from .errors import ( BoxBackendUnavailableError, - BoxManagedProcessConflictError, BoxManagedProcessNotFoundError, BoxSessionConflictError, BoxSessionNotFoundError, @@ -133,9 +132,15 @@ async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSp process_id = spec.process_id existing = runtime_session.managed_processes.get(process_id) if existing is not None and existing.is_running: - raise BoxManagedProcessConflictError( - f'session {session_id} already has a running managed process with process_id={process_id}' + # Terminate the stale process before starting a new one. + # This happens when LangBot restarts while the Box runtime + # keeps the persistent session alive. + self.logger.info( + f'LangBot Box terminating stale managed process before restart: ' + f'session_id={session_id} process_id={process_id}' ) + await self._terminate_managed_process(existing) + del runtime_session.managed_processes[process_id] backend = await self._get_backend() process = await backend.start_managed_process(runtime_session.info, spec)