diff --git a/pyproject.toml b/pyproject.toml index 0631261..1f29456 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ authors = [ requires-python = ">=3.10" dependencies = [ "aiofiles>=24.1.0", + "aiohttp>=3.9.0", "dotenv>=0.9.9", "httpx>=0.28.1", "jinja2>=3.1.6", diff --git a/src/langbot_plugin/box/__init__.py b/src/langbot_plugin/box/__init__.py new file mode 100644 index 0000000..c1ea6e1 --- /dev/null +++ b/src/langbot_plugin/box/__init__.py @@ -0,0 +1 @@ +"""LangBot Box runtime package.""" diff --git a/src/langbot_plugin/box/__main__.py b/src/langbot_plugin/box/__main__.py new file mode 100644 index 0000000..6c41643 --- /dev/null +++ b/src/langbot_plugin/box/__main__.py @@ -0,0 +1,7 @@ +"""Allow running the Box server via ``python -m langbot_plugin.box``.""" + +from .server import main + +if __name__ == "__main__": + main() + diff --git a/src/langbot_plugin/box/actions.py b/src/langbot_plugin/box/actions.py new file mode 100644 index 0000000..954c606 --- /dev/null +++ b/src/langbot_plugin/box/actions.py @@ -0,0 +1,21 @@ +"""Box-specific action types for the action RPC protocol.""" + +from __future__ import annotations + +from langbot_plugin.entities.io.actions.enums import ActionType + + +class LangBotToBoxAction(ActionType): + """Actions sent from LangBot to the Box runtime.""" + + HEALTH = 'box_health' + STATUS = 'box_status' + EXEC = 'box_exec' + CREATE_SESSION = 'box_create_session' + GET_SESSION = 'box_get_session' + GET_SESSIONS = 'box_get_sessions' + DELETE_SESSION = 'box_delete_session' + START_MANAGED_PROCESS = 'box_start_managed_process' + GET_MANAGED_PROCESS = 'box_get_managed_process' + GET_BACKEND_INFO = 'box_get_backend_info' + SHUTDOWN = 'box_shutdown' diff --git a/src/langbot_plugin/box/backend.py b/src/langbot_plugin/box/backend.py new file mode 100644 index 0000000..99e6f7d --- /dev/null +++ b/src/langbot_plugin/box/backend.py @@ -0,0 +1,394 @@ +from __future__ import annotations + +import abc +import asyncio +import dataclasses +import datetime as dt +import logging +import re +import shlex +import shutil +import uuid + +from .errors import BoxError +from .models import ( + BoxExecutionResult, + BoxExecutionStatus, + BoxHostMountMode, + BoxNetworkMode, + BoxSessionInfo, + BoxSpec, +) +from .security import validate_sandbox_security + +# Hard cap on raw subprocess output to prevent unbounded memory usage. +# Container timeout already bounds duration, but fast commands can still +# produce large output within the time limit. After this many bytes the +# remaining output is discarded before decoding. +_MAX_RAW_OUTPUT_BYTES = 1_048_576 # 1 MB per stream + + +@dataclasses.dataclass(slots=True) +class _CommandResult: + return_code: int + stdout: str + stderr: str + timed_out: bool = False + + +class BaseSandboxBackend(abc.ABC): + name: str + instance_id: str = '' + + def __init__(self, logger: logging.Logger): + self.logger = logger + + async def initialize(self): + return None + + @abc.abstractmethod + async def is_available(self) -> bool: + pass + + @abc.abstractmethod + async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: + pass + + @abc.abstractmethod + async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult: + pass + + @abc.abstractmethod + async def stop_session(self, session: BoxSessionInfo): + pass + + async def start_managed_process(self, session: BoxSessionInfo, spec): + raise BoxError(f'{self.name} backend does not support managed processes') + + async def cleanup_orphaned_containers(self, current_instance_id: str = ''): + """Remove lingering containers from previous runs. No-op by default.""" + pass + + +class CLISandboxBackend(BaseSandboxBackend): + command: str + + def __init__(self, logger: logging.Logger, command: str, backend_name: str): + super().__init__(logger) + self.command = command + self.name = backend_name + + async def is_available(self) -> bool: + if shutil.which(self.command) is None: + return False + + result = await self._run_command([self.command, 'info'], timeout_sec=5, check=False) + return result.return_code == 0 and not result.timed_out + + async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: + validate_sandbox_security(spec) + + now = dt.datetime.now(dt.timezone.utc) + container_name = self._build_container_name(spec.session_id) + + args = [ + self.command, + 'run', + '-d', + ] + + if not spec.persistent: + args.append('--rm') + + args.extend([ + '--name', + container_name, + '--label', + 'langbot.box=true', + '--label', + f'langbot.session_id={spec.session_id}', + '--label', + f'langbot.box.instance_id={self.instance_id}', + ]) + + if spec.network == BoxNetworkMode.OFF: + args.extend(['--network', 'none']) + + # Resource limits + args.extend(['--cpus', str(spec.cpus)]) + args.extend(['--memory', f'{spec.memory_mb}m']) + args.extend(['--pids-limit', str(spec.pids_limit)]) + + if spec.read_only_rootfs: + args.append('--read-only') + args.extend(['--tmpfs', '/tmp:size=64m']) + + if spec.host_path is not None and spec.host_path_mode != BoxHostMountMode.NONE: + mount_spec = f'{spec.host_path}:{spec.mount_path}:{spec.host_path_mode.value}' + args.extend(['-v', mount_spec]) + + for mount in spec.extra_mounts: + if mount.mode != BoxHostMountMode.NONE: + args.extend(['-v', f'{mount.host_path}:{mount.mount_path}:{mount.mode.value}']) + + args.extend([spec.image, 'sh', '-lc', 'while true; do sleep 3600; done']) + + self.logger.info( + f'LangBot Box backend start_session: backend={self.name} ' + f'session_id={spec.session_id} container_name={container_name} ' + f'image={spec.image} network={spec.network.value} ' + f'host_path={spec.host_path} host_path_mode={spec.host_path_mode.value} mount_path={spec.mount_path} ' + f'cpus={spec.cpus} memory_mb={spec.memory_mb} pids_limit={spec.pids_limit} ' + f'read_only_rootfs={spec.read_only_rootfs} workspace_quota_mb={spec.workspace_quota_mb}' + ) + + await self._run_command(args, timeout_sec=30, check=True) + + return BoxSessionInfo( + session_id=spec.session_id, + backend_name=self.name, + backend_session_id=container_name, + image=spec.image, + network=spec.network, + host_path=spec.host_path, + host_path_mode=spec.host_path_mode, + mount_path=spec.mount_path, + persistent=spec.persistent, + cpus=spec.cpus, + memory_mb=spec.memory_mb, + pids_limit=spec.pids_limit, + read_only_rootfs=spec.read_only_rootfs, + workspace_quota_mb=spec.workspace_quota_mb, + created_at=now, + last_used_at=now, + ) + + async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult: + start = dt.datetime.now(dt.timezone.utc) + args = [self.command, 'exec'] + + for key, value in spec.env.items(): + args.extend(['-e', f'{key}={value}']) + + args.extend( + [ + session.backend_session_id, + 'sh', + '-lc', + self._build_exec_command(spec.workdir, spec.cmd), + ] + ) + + cmd_preview = spec.cmd.strip() + if len(cmd_preview) > 400: + cmd_preview = f'{cmd_preview[:397]}...' + self.logger.info( + f'LangBot Box backend exec: backend={self.name} ' + f'session_id={session.session_id} container_name={session.backend_session_id} ' + f'workdir={spec.workdir} timeout_sec={spec.timeout_sec} ' + f'env_keys={sorted(spec.env.keys())} cmd={cmd_preview}' + ) + + result = await self._run_command(args, timeout_sec=spec.timeout_sec, check=False) + duration_ms = int((dt.datetime.now(dt.timezone.utc) - start).total_seconds() * 1000) + + if result.timed_out: + return BoxExecutionResult( + session_id=session.session_id, + backend_name=self.name, + status=BoxExecutionStatus.TIMED_OUT, + exit_code=None, + stdout=result.stdout, + stderr=result.stderr or f'Command timed out after {spec.timeout_sec} seconds.', + duration_ms=duration_ms, + ) + + return BoxExecutionResult( + session_id=session.session_id, + backend_name=self.name, + status=BoxExecutionStatus.COMPLETED, + exit_code=result.return_code, + stdout=result.stdout, + stderr=result.stderr, + duration_ms=duration_ms, + ) + + async def stop_session(self, session: BoxSessionInfo): + self.logger.info( + f'LangBot Box backend stop_session: backend={self.name} ' + f'session_id={session.session_id} container_name={session.backend_session_id}' + ) + await self._run_command( + [self.command, 'rm', '-f', session.backend_session_id], + timeout_sec=20, + check=False, + ) + + async def cleanup_orphaned_containers(self, current_instance_id: str = ''): + """Remove langbot.box containers from previous instances. + + Only removes containers whose ``langbot.box.instance_id`` label does + NOT match *current_instance_id*. Containers without the label (from + older versions) are also removed. + """ + result = await self._run_command( + [ + self.command, + 'ps', + '-a', + '--filter', + 'label=langbot.box=true', + '--format', + '{{.ID}}\t{{.Label "langbot.box.instance_id"}}', + ], + timeout_sec=10, + check=False, + ) + if result.return_code != 0 or not result.stdout.strip(): + return + orphan_ids = [] + for line in result.stdout.strip().split('\n'): + line = line.strip() + if not line: + continue + parts = line.split('\t', 1) + cid = parts[0].strip() + label_instance = parts[1].strip() if len(parts) > 1 else '' + if label_instance != current_instance_id: + orphan_ids.append(cid) + if not orphan_ids: + return + for cid in orphan_ids: + self.logger.info(f'Cleaning up orphaned Box container: {cid}') + await self._run_command( + [self.command, 'rm', '-f', *orphan_ids], + timeout_sec=30, + check=False, + ) + + async def start_managed_process(self, session: BoxSessionInfo, spec) -> asyncio.subprocess.Process: + args = [self.command, 'exec', '-i'] + + for key, value in spec.env.items(): + args.extend(['-e', f'{key}={value}']) + + args.extend( + [ + session.backend_session_id, + 'sh', + '-lc', + self._build_spawn_command(spec.cwd, spec.command, spec.args), + ] + ) + + self.logger.info( + f'LangBot Box backend start_managed_process: backend={self.name} ' + f'session_id={session.session_id} container_name={session.backend_session_id} ' + f'cwd={spec.cwd} env_keys={sorted(spec.env.keys())} command={spec.command} args={spec.args}' + ) + + return await asyncio.create_subprocess_exec( + *args, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + def _build_container_name(self, session_id: str) -> str: + normalized = re.sub(r'[^a-zA-Z0-9_.-]+', '-', session_id).strip('-').lower() or 'session' + suffix = uuid.uuid4().hex[:8] + return f'langbot-box-{normalized[:32]}-{suffix}' + + def _build_exec_command(self, workdir: str, cmd: str) -> str: + quoted_workdir = shlex.quote(workdir) + return f'mkdir -p {quoted_workdir} && cd {quoted_workdir} && {cmd}' + + def _build_spawn_command(self, cwd: str, command: str, args: list[str]) -> str: + quoted_cwd = shlex.quote(cwd) + command_parts = [shlex.quote(command), *[shlex.quote(arg) for arg in args]] + return f'mkdir -p {quoted_cwd} && cd {quoted_cwd} && exec {" ".join(command_parts)}' + + async def _run_command( + self, + args: list[str], + timeout_sec: int, + check: bool, + ) -> _CommandResult: + process = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout_task = asyncio.create_task(self._read_stream(process.stdout)) + stderr_task = asyncio.create_task(self._read_stream(process.stderr)) + + timed_out = False + try: + await asyncio.wait_for(process.wait(), timeout=timeout_sec) + except asyncio.TimeoutError: + process.kill() + timed_out = True + await process.wait() + + stdout_bytes, stdout_total = await stdout_task + stderr_bytes, stderr_total = await stderr_task + + if timed_out: + return _CommandResult( + return_code=-1, + stdout=self._clip_captured_bytes(stdout_bytes, stdout_total), + stderr=self._clip_captured_bytes(stderr_bytes, stderr_total), + timed_out=True, + ) + + stdout = self._clip_captured_bytes(stdout_bytes, stdout_total) + stderr = self._clip_captured_bytes(stderr_bytes, stderr_total) + + if check and process.returncode != 0: + raise BoxError(self._format_cli_error(stderr or stdout or 'unknown backend error')) + + return _CommandResult( + return_code=process.returncode, + stdout=stdout, + stderr=stderr, + timed_out=False, + ) + + @staticmethod + def _clip_captured_bytes(data: bytes, total_size: int, limit: int = _MAX_RAW_OUTPUT_BYTES) -> str: + text = data.decode('utf-8', errors='replace').strip() + if total_size > limit: + text += f'\n... [raw output clipped at {limit} bytes, {total_size - limit} bytes discarded]' + return text + + @staticmethod + async def _read_stream( + stream: asyncio.StreamReader | None, + limit: int = _MAX_RAW_OUTPUT_BYTES, + ) -> tuple[bytes, int]: + if stream is None: + return b'', 0 + + chunks = bytearray() + total_size = 0 + while True: + chunk = await stream.read(65536) + if not chunk: + break + total_size += len(chunk) + remaining = limit - len(chunks) + if remaining > 0: + chunks.extend(chunk[:remaining]) + + return bytes(chunks), total_size + + def _format_cli_error(self, message: str) -> str: + message = ' '.join(message.split()) + if len(message) > 300: + message = f'{message[:297]}...' + return f'{self.name} backend error: {message}' + + +class DockerBackend(CLISandboxBackend): + def __init__(self, logger: logging.Logger): + super().__init__(logger=logger, command='docker', backend_name='docker') diff --git a/src/langbot_plugin/box/client.py b/src/langbot_plugin/box/client.py new file mode 100644 index 0000000..9175c59 --- /dev/null +++ b/src/langbot_plugin/box/client.py @@ -0,0 +1,180 @@ +"""BoxRuntimeClient abstraction for Box Runtime access.""" + +from __future__ import annotations + +import abc +import logging +from typing import Any + +from langbot_plugin.runtime.io.handler import Handler + +from .actions import LangBotToBoxAction +from .errors import BoxError, BoxRuntimeUnavailableError +from .models import ( + BoxExecutionResult, + BoxExecutionStatus, + BoxManagedProcessInfo, + BoxManagedProcessSpec, + BoxSpec, +) + + +class BoxRuntimeClient(abc.ABC): + """Abstract interface that BoxService uses to talk to a Box Runtime.""" + + @abc.abstractmethod + async def initialize(self) -> None: ... + + @abc.abstractmethod + async def execute(self, spec: BoxSpec) -> BoxExecutionResult: ... + + @abc.abstractmethod + async def shutdown(self) -> None: ... + + @abc.abstractmethod + async def get_status(self) -> dict: ... + + @abc.abstractmethod + async def get_sessions(self) -> list[dict]: ... + + @abc.abstractmethod + async def get_backend_info(self) -> dict: ... + + @abc.abstractmethod + async def delete_session(self, session_id: str) -> None: ... + + @abc.abstractmethod + async def create_session(self, spec: BoxSpec) -> dict: ... + + @abc.abstractmethod + async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> BoxManagedProcessInfo: ... + + @abc.abstractmethod + async def get_managed_process(self, session_id: str, process_id: str = 'default') -> BoxManagedProcessInfo: ... + + @abc.abstractmethod + async def get_session(self, session_id: str) -> dict: ... + + +def _translate_action_error(exc: Exception) -> BoxError: + """Convert an ActionCallError message back into the appropriate BoxError subclass.""" + from .errors import ( + BoxBackendUnavailableError, + BoxManagedProcessConflictError, + BoxManagedProcessNotFoundError, + BoxSessionConflictError, + BoxSessionNotFoundError, + BoxValidationError, + ) + + msg = str(exc) + _ERROR_PREFIX_MAP: list[tuple[str, type[BoxError]]] = [ + ('BoxValidationError:', BoxValidationError), + ('BoxSessionNotFoundError:', BoxSessionNotFoundError), + ('BoxSessionConflictError:', BoxSessionConflictError), + ('BoxManagedProcessNotFoundError:', BoxManagedProcessNotFoundError), + ('BoxManagedProcessConflictError:', BoxManagedProcessConflictError), + ('BoxBackendUnavailableError:', BoxBackendUnavailableError), + ] + for prefix, cls in _ERROR_PREFIX_MAP: + if prefix in msg: + return cls(msg) + return BoxError(msg) + + +class ActionRPCBoxClient(BoxRuntimeClient): + """Client that talks to BoxRuntime via the action RPC protocol.""" + + def __init__(self, logger: logging.Logger): + self._logger = logger + self._handler: Handler | None = None + + @property + def handler(self) -> Handler: + if self._handler is None: + raise BoxRuntimeUnavailableError('box runtime not connected') + return self._handler + + def set_handler(self, handler: Handler) -> None: + self._handler = handler + + async def _call(self, action: LangBotToBoxAction, data: dict[str, Any], timeout: float = 15.0) -> dict[str, Any]: + try: + return await self.handler.call_action(action, data, timeout=timeout) + except BoxRuntimeUnavailableError: + raise + except Exception as exc: + raise _translate_action_error(exc) from exc + + async def initialize(self) -> None: + try: + await self._call(LangBotToBoxAction.HEALTH, {}) + self._logger.info('LangBot Box runtime connected via action RPC.') + except Exception as exc: + raise BoxRuntimeUnavailableError(f'box runtime unavailable: {exc}') from exc + + async def execute(self, spec: BoxSpec) -> BoxExecutionResult: + data = await self._call(LangBotToBoxAction.EXEC, spec.model_dump(mode='json'), timeout=300.0) + return BoxExecutionResult( + session_id=data['session_id'], + backend_name=data['backend_name'], + status=BoxExecutionStatus(data['status']), + exit_code=data.get('exit_code'), + stdout=data.get('stdout', ''), + stderr=data.get('stderr', ''), + duration_ms=data['duration_ms'], + ) + + async def shutdown(self) -> None: + if self._handler is not None: + try: + await self._call(LangBotToBoxAction.SHUTDOWN, {}) + except Exception: + pass + self._handler = None + + async def get_status(self) -> dict: + return await self._call(LangBotToBoxAction.STATUS, {}) + + async def get_sessions(self) -> list[dict]: + data = await self._call(LangBotToBoxAction.GET_SESSIONS, {}) + return data['sessions'] + + async def get_session(self, session_id: str) -> dict: + return await self._call(LangBotToBoxAction.GET_SESSION, {'session_id': session_id}) + + async def get_backend_info(self) -> dict: + return await self._call(LangBotToBoxAction.GET_BACKEND_INFO, {}) + + async def delete_session(self, session_id: str) -> None: + await self._call(LangBotToBoxAction.DELETE_SESSION, {'session_id': session_id}, timeout=30.0) + + async def create_session(self, spec: BoxSpec) -> dict: + return await self._call(LangBotToBoxAction.CREATE_SESSION, spec.model_dump(mode='json')) + + async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> BoxManagedProcessInfo: + data = await self._call( + LangBotToBoxAction.START_MANAGED_PROCESS, + {'session_id': session_id, 'spec': spec.model_dump(mode='json')}, + ) + return BoxManagedProcessInfo.model_validate(data) + + async def get_managed_process(self, session_id: str, process_id: str = 'default') -> BoxManagedProcessInfo: + data = await self._call(LangBotToBoxAction.GET_MANAGED_PROCESS, { + 'session_id': session_id, + 'process_id': process_id, + }) + return BoxManagedProcessInfo.model_validate(data) + + def get_managed_process_websocket_url(self, session_id: str, ws_relay_base_url: str, process_id: str = 'default') -> str: + base = ws_relay_base_url + if base.startswith('https://'): + scheme = 'wss://' + suffix = base[len('https://') :] + elif base.startswith('http://'): + scheme = 'ws://' + suffix = base[len('http://') :] + else: + scheme = 'ws://' + suffix = base + return f'{scheme}{suffix}/v1/sessions/{session_id}/managed-process/{process_id}/ws' diff --git a/src/langbot_plugin/box/errors.py b/src/langbot_plugin/box/errors.py new file mode 100644 index 0000000..ecdde7a --- /dev/null +++ b/src/langbot_plugin/box/errors.py @@ -0,0 +1,33 @@ +from __future__ import annotations + + +class BoxError(RuntimeError): + """Base error for LangBot Box failures.""" + + +class BoxValidationError(BoxError): + """Raised when exec tool arguments are invalid.""" + + +class BoxBackendUnavailableError(BoxError): + """Raised when no supported container backend is available.""" + + +class BoxRuntimeUnavailableError(BoxError): + """Raised when the standalone Box Runtime service is unavailable.""" + + +class BoxSessionConflictError(BoxError): + """Raised when an existing session cannot satisfy a new request.""" + + +class BoxSessionNotFoundError(BoxError): + """Raised when a referenced session does not exist.""" + + +class BoxManagedProcessConflictError(BoxError): + """Raised when a session already has an active managed process.""" + + +class BoxManagedProcessNotFoundError(BoxError): + """Raised when a referenced managed process does not exist.""" diff --git a/src/langbot_plugin/box/models.py b/src/langbot_plugin/box/models.py new file mode 100644 index 0000000..fa34e36 --- /dev/null +++ b/src/langbot_plugin/box/models.py @@ -0,0 +1,331 @@ +from __future__ import annotations + +import datetime as dt +import enum +import ntpath +import posixpath + +import pydantic + + +DEFAULT_BOX_IMAGE = 'rockchin/langbot-sandbox:latest' +DEFAULT_BOX_MOUNT_PATH = '/workspace' + + +class BoxNetworkMode(str, enum.Enum): + OFF = 'off' + ON = 'on' + + +class BoxExecutionStatus(str, enum.Enum): + COMPLETED = 'completed' + TIMED_OUT = 'timed_out' + + +class BoxHostMountMode(str, enum.Enum): + NONE = 'none' + READ_ONLY = 'ro' + READ_WRITE = 'rw' + + +class BoxManagedProcessStatus(str, enum.Enum): + RUNNING = 'running' + EXITED = 'exited' + + +class BoxMountSpec(pydantic.BaseModel): + """A single additional bind mount specification.""" + + host_path: str + mount_path: str + mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE + + @pydantic.field_validator('host_path') + @classmethod + def validate_host_path(cls, value: str) -> str: + value = value.strip() + if not (posixpath.isabs(value) or ntpath.isabs(value)): + raise ValueError('host_path must be an absolute host path') + return value + + @pydantic.field_validator('mount_path') + @classmethod + def validate_mount_path(cls, value: str) -> str: + value = value.strip() + if not value.startswith('/'): + raise ValueError('mount_path must be an absolute path inside the sandbox') + return value + + +class BoxSpec(pydantic.BaseModel): + cmd: str = '' + workdir: str = DEFAULT_BOX_MOUNT_PATH + timeout_sec: int = 30 + network: BoxNetworkMode = BoxNetworkMode.OFF + session_id: str + env: dict[str, str] = pydantic.Field(default_factory=dict) + image: str = DEFAULT_BOX_IMAGE + host_path: str | None = None + host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE + mount_path: str = DEFAULT_BOX_MOUNT_PATH + extra_mounts: list[BoxMountSpec] = pydantic.Field(default_factory=list) + persistent: bool = False + # Resource limits + cpus: float = 1.0 + memory_mb: int = 512 + pids_limit: int = 128 + read_only_rootfs: bool = True + workspace_quota_mb: int = 0 + + @pydantic.model_validator(mode='before') + @classmethod + def populate_workdir_from_mount_path(cls, data): + if not isinstance(data, dict): + return data + if data.get('workdir') not in (None, ''): + return data + mount_path = data.get('mount_path') + if isinstance(mount_path, str) and mount_path.strip(): + data = dict(data) + data['workdir'] = mount_path + return data + + @pydantic.field_validator('cmd') + @classmethod + def validate_cmd(cls, value: str) -> str: + return value.strip() + + @pydantic.field_validator('workdir') + @classmethod + def validate_workdir(cls, value: str) -> str: + value = value.strip() + if not value.startswith('/'): + raise ValueError('workdir must be an absolute path inside the sandbox') + return value + + @pydantic.field_validator('timeout_sec') + @classmethod + def validate_timeout_sec(cls, value: int) -> int: + if value <= 0: + raise ValueError('timeout_sec must be greater than 0') + return value + + @pydantic.field_validator('cpus') + @classmethod + def validate_cpus(cls, value: float) -> float: + if value <= 0: + raise ValueError('cpus must be greater than 0') + return value + + @pydantic.field_validator('memory_mb') + @classmethod + def validate_memory_mb(cls, value: int) -> int: + if value < 32: + raise ValueError('memory_mb must be at least 32') + return value + + @pydantic.field_validator('pids_limit') + @classmethod + def validate_pids_limit(cls, value: int) -> int: + if value < 1: + raise ValueError('pids_limit must be at least 1') + return value + + @pydantic.field_validator('workspace_quota_mb') + @classmethod + def validate_workspace_quota_mb(cls, value: int) -> int: + if value < 0: + raise ValueError('workspace_quota_mb must be greater than or equal to 0') + return value + + @pydantic.field_validator('session_id') + @classmethod + def validate_session_id(cls, value: str) -> str: + value = value.strip() + if not value: + raise ValueError('session_id must not be empty') + return value + + @pydantic.field_validator('env') + @classmethod + def validate_env(cls, value: dict[str, str]) -> dict[str, str]: + return {str(k): str(v) for k, v in value.items()} + + @pydantic.field_validator('host_path') + @classmethod + def validate_host_path(cls, value: str | None) -> str | None: + if value is None: + return None + value = value.strip() + if not (posixpath.isabs(value) or ntpath.isabs(value)): + raise ValueError('host_path must be an absolute host path') + return value + + @pydantic.field_validator('mount_path') + @classmethod + def validate_mount_path(cls, value: str) -> str: + value = value.strip() + if not value.startswith('/'): + raise ValueError('mount_path must be an absolute path inside the sandbox') + return value + + @pydantic.model_validator(mode='after') + def validate_host_mount_consistency(self) -> 'BoxSpec': + if self.host_path is None: + return self + if self.host_path_mode == BoxHostMountMode.NONE: + return self + if self.workdir != self.mount_path and not self.workdir.startswith(f'{self.mount_path}/'): + raise ValueError('workdir must stay under mount_path when host_path is provided') + return self + + +class BoxProfile(pydantic.BaseModel): + """Preset sandbox configuration. + + Provides default values for BoxSpec fields and optionally locks fields + so that tool-call parameters cannot override them. + """ + + name: str + image: str = DEFAULT_BOX_IMAGE + network: BoxNetworkMode = BoxNetworkMode.OFF + timeout_sec: int = 30 + host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE + max_timeout_sec: int = 120 + # Resource limits + cpus: float = 1.0 + memory_mb: int = 512 + pids_limit: int = 128 + read_only_rootfs: bool = True + workspace_quota_mb: int = 0 + locked: frozenset[str] = frozenset() + + model_config = pydantic.ConfigDict(frozen=True) + + +BUILTIN_PROFILES: dict[str, BoxProfile] = { + 'default': BoxProfile( + name='default', + network=BoxNetworkMode.OFF, + host_path_mode=BoxHostMountMode.READ_WRITE, + cpus=1.0, + memory_mb=512, + pids_limit=128, + read_only_rootfs=True, + max_timeout_sec=120, + ), + 'offline_readonly': BoxProfile( + name='offline_readonly', + network=BoxNetworkMode.OFF, + host_path_mode=BoxHostMountMode.READ_ONLY, + cpus=0.5, + memory_mb=256, + pids_limit=64, + read_only_rootfs=True, + max_timeout_sec=60, + locked=frozenset({'network', 'host_path_mode', 'read_only_rootfs'}), + ), + 'network_basic': BoxProfile( + name='network_basic', + network=BoxNetworkMode.ON, + host_path_mode=BoxHostMountMode.READ_WRITE, + cpus=1.0, + memory_mb=512, + pids_limit=128, + read_only_rootfs=True, + max_timeout_sec=120, + ), + 'network_extended': BoxProfile( + name='network_extended', + network=BoxNetworkMode.ON, + host_path_mode=BoxHostMountMode.READ_WRITE, + cpus=2.0, + memory_mb=1024, + pids_limit=256, + read_only_rootfs=False, + max_timeout_sec=300, + ), +} + + +class BoxSessionInfo(pydantic.BaseModel): + session_id: str + backend_name: str + backend_session_id: str + image: str + network: BoxNetworkMode + host_path: str | None = None + host_path_mode: BoxHostMountMode = BoxHostMountMode.READ_WRITE + mount_path: str = DEFAULT_BOX_MOUNT_PATH + persistent: bool = False + cpus: float = 1.0 + memory_mb: int = 512 + pids_limit: int = 128 + read_only_rootfs: bool = True + workspace_quota_mb: int = 0 + created_at: dt.datetime + last_used_at: dt.datetime + + +class BoxManagedProcessSpec(pydantic.BaseModel): + process_id: str = 'default' + command: str + args: list[str] = pydantic.Field(default_factory=list) + env: dict[str, str] = pydantic.Field(default_factory=dict) + cwd: str = DEFAULT_BOX_MOUNT_PATH + + @pydantic.field_validator('command') + @classmethod + def validate_command(cls, value: str) -> str: + value = value.strip() + if not value: + raise ValueError('command must not be empty') + return value + + @pydantic.field_validator('args') + @classmethod + def validate_args(cls, value: list[str]) -> list[str]: + return [str(item) for item in value] + + @pydantic.field_validator('env') + @classmethod + def validate_env(cls, value: dict[str, str]) -> dict[str, str]: + return {str(k): str(v) for k, v in value.items()} + + @pydantic.field_validator('cwd') + @classmethod + def validate_cwd(cls, value: str) -> str: + value = value.strip() + if not value.startswith('/'): + raise ValueError('cwd must be an absolute path inside the sandbox') + return value + + +class BoxManagedProcessInfo(pydantic.BaseModel): + session_id: str + process_id: str = 'default' + status: BoxManagedProcessStatus + command: str + args: list[str] + cwd: str + env_keys: list[str] + attached: bool = False + started_at: dt.datetime + exited_at: dt.datetime | None = None + exit_code: int | None = None + stderr_preview: str = '' + + +class BoxExecutionResult(pydantic.BaseModel): + session_id: str + backend_name: str + status: BoxExecutionStatus + exit_code: int | None + stdout: str = '' + stderr: str = '' + duration_ms: int + + @property + def ok(self) -> bool: + return self.status == BoxExecutionStatus.COMPLETED and self.exit_code == 0 diff --git a/src/langbot_plugin/box/nsjail_backend.py b/src/langbot_plugin/box/nsjail_backend.py new file mode 100644 index 0000000..921c8ff --- /dev/null +++ b/src/langbot_plugin/box/nsjail_backend.py @@ -0,0 +1,517 @@ +from __future__ import annotations + +import asyncio +import datetime as dt +import json +import logging +import os +import pathlib +import shlex +import shutil +import signal +import uuid + +from .backend import BaseSandboxBackend, _CommandResult, _MAX_RAW_OUTPUT_BYTES +from .errors import BoxError +from .models import ( + BoxExecutionResult, + BoxExecutionStatus, + BoxHostMountMode, + BoxNetworkMode, + BoxSessionInfo, + BoxSpec, +) +from .security import validate_sandbox_security + +# System directories to mount read-only inside the sandbox. +# Only well-known paths needed for running Python/Node/shell commands. +_READONLY_SYSTEM_MOUNTS: list[str] = [ + '/usr', + '/lib', + '/lib64', + '/bin', + '/sbin', +] + +# Specific /etc entries required for dynamic linking and TLS. +_READONLY_ETC_ENTRIES: list[str] = [ + '/etc/alternatives', + '/etc/ld.so.cache', + '/etc/ld.so.conf', + '/etc/ld.so.conf.d', + '/etc/ssl/certs', + '/etc/localtime', + '/etc/resolv.conf', # needed when network=ON +] + +_DEFAULT_BASE_DIR = '/tmp/langbot-box-nsjail' + + +class NsjailBackend(BaseSandboxBackend): + """Lightweight sandbox backend using nsjail. + + Each ``exec`` invocation spawns an independent nsjail process. Session + state (workspace files) persists via a shared host directory that is + bind-mounted into every invocation. + """ + + name = 'nsjail' + + def __init__( + self, + logger: logging.Logger, + nsjail_bin: str = 'nsjail', + base_dir: str = _DEFAULT_BASE_DIR, + ): + super().__init__(logger) + self._nsjail_bin = nsjail_bin + self._base_dir = pathlib.Path(base_dir) + self._cgroup_v2_available: bool = False + + # ── lifecycle ───────────────────────────────────────────────────── + + async def is_available(self) -> bool: + if shutil.which(self._nsjail_bin) is None: + self.logger.info('nsjail binary not found in PATH') + return False + + # Quick sanity check – nsjail --help exits 0. + try: + proc = await asyncio.create_subprocess_exec( + self._nsjail_bin, '--help', + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + ) + await asyncio.wait_for(proc.wait(), timeout=5) + if proc.returncode != 0: + self.logger.info('nsjail --help returned non-zero') + return False + except Exception as exc: + self.logger.info(f'nsjail probe failed: {exc}') + return False + + self._cgroup_v2_available = self._detect_cgroup_v2() + if not self._cgroup_v2_available: + self.logger.warning( + 'cgroup v2 not available for nsjail; ' + 'falling back to rlimit-based resource limits' + ) + + self._base_dir.mkdir(parents=True, exist_ok=True) + return True + + async def start_session(self, spec: BoxSpec) -> BoxSessionInfo: + validate_sandbox_security(spec) + + now = dt.datetime.now(dt.timezone.utc) + session_dir_name = f'{self.instance_id}_{spec.session_id}_{uuid.uuid4().hex[:8]}' + session_dir = self._base_dir / session_dir_name + + # Per-session writable directories. + workspace_dir = session_dir / 'workspace' + tmp_dir = session_dir / 'tmp' + home_dir = session_dir / 'home' + + for d in (workspace_dir, tmp_dir, home_dir): + d.mkdir(parents=True, exist_ok=True) + + # If host_path is specified, we will use it directly instead of the + # per-session workspace when building nsjail args (see _build_mounts). + meta = { + 'session_id': spec.session_id, + 'instance_id': self.instance_id, + 'host_path': spec.host_path, + 'host_path_mode': spec.host_path_mode.value if spec.host_path else None, + 'mount_path': spec.mount_path, + 'network': spec.network.value, + 'cpus': spec.cpus, + 'memory_mb': spec.memory_mb, + 'pids_limit': spec.pids_limit, + 'created_at': now.isoformat(), + } + (session_dir / 'meta.json').write_text(json.dumps(meta, indent=2)) + + self.logger.info( + f'LangBot Box backend start_session: backend=nsjail ' + f'session_id={spec.session_id} session_dir={session_dir} ' + f'network={spec.network.value} ' + f'host_path={spec.host_path} host_path_mode={spec.host_path_mode.value} mount_path={spec.mount_path} ' + f'cpus={spec.cpus} memory_mb={spec.memory_mb} pids_limit={spec.pids_limit} ' + f'workspace_quota_mb={spec.workspace_quota_mb}' + ) + + return BoxSessionInfo( + session_id=spec.session_id, + backend_name=self.name, + backend_session_id=str(session_dir), + image='host', + network=spec.network, + host_path=spec.host_path, + host_path_mode=spec.host_path_mode, + mount_path=spec.mount_path, + cpus=spec.cpus, + memory_mb=spec.memory_mb, + pids_limit=spec.pids_limit, + read_only_rootfs=spec.read_only_rootfs, + workspace_quota_mb=spec.workspace_quota_mb, + persistent=spec.persistent, + created_at=now, + last_used_at=now, + ) + + async def exec(self, session: BoxSessionInfo, spec: BoxSpec) -> BoxExecutionResult: + start = dt.datetime.now(dt.timezone.utc) + session_dir = pathlib.Path(session.backend_session_id) + + args = self._build_nsjail_args(session, spec, session_dir) + + cmd_preview = spec.cmd.strip() + if len(cmd_preview) > 400: + cmd_preview = f'{cmd_preview[:397]}...' + self.logger.info( + f'LangBot Box backend exec: backend=nsjail ' + f'session_id={session.session_id} session_dir={session_dir} ' + f'workdir={spec.workdir} timeout_sec={spec.timeout_sec} ' + f'env_keys={sorted(spec.env.keys())} cmd={cmd_preview}' + ) + + result = await self._run_nsjail(args, timeout_sec=spec.timeout_sec) + duration_ms = int((dt.datetime.now(dt.timezone.utc) - start).total_seconds() * 1000) + + if result.timed_out: + return BoxExecutionResult( + session_id=session.session_id, + backend_name=self.name, + status=BoxExecutionStatus.TIMED_OUT, + exit_code=None, + stdout=result.stdout, + stderr=result.stderr or f'Command timed out after {spec.timeout_sec} seconds.', + duration_ms=duration_ms, + ) + + return BoxExecutionResult( + session_id=session.session_id, + backend_name=self.name, + status=BoxExecutionStatus.COMPLETED, + exit_code=result.return_code, + stdout=result.stdout, + stderr=result.stderr, + duration_ms=duration_ms, + ) + + async def stop_session(self, session: BoxSessionInfo): + session_dir = pathlib.Path(session.backend_session_id) + self.logger.info( + f'LangBot Box backend stop_session: backend=nsjail ' + f'session_id={session.session_id} session_dir={session_dir}' + ) + + # Kill any lingering nsjail processes whose cwd is inside session_dir. + await self._kill_session_processes(session_dir) + + try: + if session_dir.exists(): + shutil.rmtree(session_dir) + except Exception as exc: + self.logger.warning(f'Failed to remove nsjail session dir {session_dir}: {exc}') + + async def start_managed_process( + self, session: BoxSessionInfo, spec + ) -> asyncio.subprocess.Process: + session_dir = pathlib.Path(session.backend_session_id) + + # Build a BoxSpec-like object so we can reuse _build_nsjail_args. + # ManagedProcessSpec has command/args/cwd/env but not the full BoxSpec. + inner_cmd = ' '.join([shlex.quote(spec.command), *[shlex.quote(a) for a in spec.args]]) + pseudo_spec = BoxSpec( + cmd=inner_cmd, + workdir=spec.cwd, + timeout_sec=86400, # not used here + network=session.network, + session_id=session.session_id, + env=spec.env, + host_path=session.host_path, + host_path_mode=session.host_path_mode, + mount_path=session.mount_path, + cpus=session.cpus, + memory_mb=session.memory_mb, + pids_limit=session.pids_limit, + read_only_rootfs=session.read_only_rootfs, + ) + + args = self._build_nsjail_args(session, pseudo_spec, session_dir) + + self.logger.info( + f'LangBot Box backend start_managed_process: backend=nsjail ' + f'session_id={session.session_id} session_dir={session_dir} ' + f'cwd={spec.cwd} env_keys={sorted(spec.env.keys())} ' + f'command={spec.command} args={spec.args}' + ) + + return await asyncio.create_subprocess_exec( + *args, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + async def cleanup_orphaned_containers(self, current_instance_id: str = ''): + if not self._base_dir.exists(): + return + + for entry in self._base_dir.iterdir(): + if not entry.is_dir(): + continue + + # Session dirs are named: __ + # If it doesn't start with the current instance_id, it's orphaned. + if entry.name.startswith(f'{current_instance_id}_'): + continue + + self.logger.info(f'Cleaning up orphaned nsjail session dir: {entry}') + try: + await self._kill_session_processes(entry) + shutil.rmtree(entry) + except Exception as exc: + self.logger.warning(f'Failed to clean up orphaned nsjail dir {entry}: {exc}') + + # ── nsjail argument construction ────────────────────────────────── + + def _build_nsjail_args( + self, + session: BoxSessionInfo, + spec: BoxSpec, + session_dir: pathlib.Path, + ) -> list[str]: + args: list[str] = [self._nsjail_bin] + + # Mode: one-shot execution. + args.extend(['--mode', 'o']) + + # Namespace isolation. + args.extend([ + '--clone_newuser', + '--clone_newns', + '--clone_newpid', + '--clone_newipc', + '--clone_newuts', + '--clone_newcgroup', + ]) + + # Network namespace. + if spec.network == BoxNetworkMode.OFF: + args.append('--clone_newnet') + else: + args.append('--disable_clone_newnet') + + # Read-only system mounts. + args.extend(self._build_readonly_mounts(spec.network)) + + # Writable per-session mounts. + args.extend(self._build_writable_mounts(session, spec, session_dir)) + + # Isolated /proc and minimal /dev. + args.extend(['--mount', 'none:/proc:proc:rw']) + args.extend(['--mount', 'none:/dev:tmpfs:rw']) + + # Working directory. + args.extend(['--cwd', spec.workdir]) + + # Environment variables. + args.extend(['--env', 'PYTHONUNBUFFERED=1']) + args.extend(['--env', 'HOME=/home']) + args.extend(['--env', 'PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin']) + for key, value in spec.env.items(): + args.extend(['--env', f'{key}={value}']) + + # Resource limits. + args.extend(self._build_resource_limits(spec)) + + # Suppress nsjail's own log output. + args.append('--really_quiet') + + # The actual command. + quoted_workdir = shlex.quote(spec.workdir) + user_cmd = f'mkdir -p {quoted_workdir} && cd {quoted_workdir} && {spec.cmd}' + args.extend(['--', 'sh', '-lc', user_cmd]) + + return args + + def _build_readonly_mounts(self, network: BoxNetworkMode) -> list[str]: + args: list[str] = [] + + for path in _READONLY_SYSTEM_MOUNTS: + if os.path.exists(path): + args.extend(['--bindmount_ro', f'{path}:{path}']) + + for path in _READONLY_ETC_ENTRIES: + # /etc/resolv.conf is only needed when network is ON. + if path == '/etc/resolv.conf' and network == BoxNetworkMode.OFF: + continue + if os.path.exists(path): + args.extend(['--bindmount_ro', f'{path}:{path}']) + + return args + + def _build_writable_mounts( + self, + session: BoxSessionInfo, + spec: BoxSpec, + session_dir: pathlib.Path, + ) -> list[str]: + args: list[str] = [] + + # Workspace mount. + if spec.host_path is not None and spec.host_path_mode != BoxHostMountMode.NONE: + if spec.host_path_mode == BoxHostMountMode.READ_ONLY: + args.extend(['--bindmount_ro', f'{spec.host_path}:{spec.mount_path}']) + else: + args.extend(['--rw_bind', f'{spec.host_path}:{spec.mount_path}']) + else: + workspace_dir = session_dir / 'workspace' + args.extend(['--rw_bind', f'{workspace_dir}:{spec.mount_path}']) + + for mount in spec.extra_mounts: + if mount.mode == BoxHostMountMode.READ_ONLY: + args.extend(['--bindmount_ro', f'{mount.host_path}:{mount.mount_path}']) + elif mount.mode == BoxHostMountMode.READ_WRITE: + args.extend(['--rw_bind', f'{mount.host_path}:{mount.mount_path}']) + + # /tmp and /home are always per-session writable. + tmp_dir = session_dir / 'tmp' + home_dir = session_dir / 'home' + args.extend(['--rw_bind', f'{tmp_dir}:/tmp']) + args.extend(['--rw_bind', f'{home_dir}:/home']) + + return args + + def _build_resource_limits(self, spec: BoxSpec) -> list[str]: + args: list[str] = [] + + if self._cgroup_v2_available: + # cgroup v2 – precise limits. + memory_bytes = spec.memory_mb * 1024 * 1024 + args.extend(['--cgroup_mem_max', str(memory_bytes)]) + args.extend(['--cgroup_pids_max', str(spec.pids_limit)]) + cpu_ms = int(spec.cpus * 1000) + args.extend(['--cgroup_cpu_ms_per_sec', str(cpu_ms)]) + else: + # rlimit fallback – best-effort. + args.extend(['--rlimit_as', str(spec.memory_mb)]) + args.extend(['--rlimit_nproc', str(spec.pids_limit)]) + + # Always set these rlimits regardless of cgroup mode. + args.extend(['--rlimit_fsize', '512']) # max file size 512 MB + args.extend(['--rlimit_nofile', '256']) # max open fds + + return args + + # ── process execution ───────────────────────────────────────────── + + async def _run_nsjail( + self, + args: list[str], + timeout_sec: int, + ) -> _CommandResult: + process = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout_task = asyncio.create_task(self._read_stream(process.stdout)) + stderr_task = asyncio.create_task(self._read_stream(process.stderr)) + + timed_out = False + try: + await asyncio.wait_for(process.wait(), timeout=timeout_sec) + except asyncio.TimeoutError: + process.kill() + timed_out = True + await process.wait() + + stdout_bytes, stdout_total = await stdout_task + stderr_bytes, stderr_total = await stderr_task + + return _CommandResult( + return_code=process.returncode if not timed_out else -1, + stdout=self._clip_captured_bytes(stdout_bytes, stdout_total), + stderr=self._clip_captured_bytes(stderr_bytes, stderr_total), + timed_out=timed_out, + ) + + # ── helpers ─────────────────────────────────────────────────────── + + @staticmethod + def _detect_cgroup_v2() -> bool: + """Check whether the host runs cgroup v2 and we can write to it.""" + cgroup_mount = pathlib.Path('/sys/fs/cgroup') + if not cgroup_mount.exists(): + return False + # cgroup v2 has a single hierarchy with cgroup.controllers file. + controllers = cgroup_mount / 'cgroup.controllers' + if not controllers.exists(): + return False + # Check if we can write to a cgroup subtree (needed for nsjail). + # A rough heuristic: if the user owns a cgroup directory we're probably + # running under systemd user delegation. + user_slice = cgroup_mount / f'user.slice/user-{os.getuid()}.slice' + if user_slice.exists(): + return True + # If running as root (uid 0), cgroup v2 is always usable. + if os.getuid() == 0: + return True + # Conservative: if we can't confirm writability, report unavailable. + return False + + async def _kill_session_processes(self, session_dir: pathlib.Path) -> None: + """Best-effort kill of nsjail processes associated with a session dir. + + We scan /proc for nsjail processes whose command line contains the + session directory path. + """ + session_path_str = str(session_dir) + proc_dir = pathlib.Path('/proc') + if not proc_dir.exists(): + return + + for pid_dir in proc_dir.iterdir(): + if not pid_dir.name.isdigit(): + continue + try: + cmdline = (pid_dir / 'cmdline').read_bytes().decode('utf-8', errors='replace') + if self._nsjail_bin in cmdline and session_path_str in cmdline: + pid = int(pid_dir.name) + os.kill(pid, signal.SIGKILL) + self.logger.info(f'Killed orphaned nsjail process {pid}') + except (OSError, ValueError): + continue + + @staticmethod + def _clip_captured_bytes( + data: bytes, total_size: int, limit: int = _MAX_RAW_OUTPUT_BYTES + ) -> str: + text = data.decode('utf-8', errors='replace').strip() + if total_size > limit: + text += f'\n... [raw output clipped at {limit} bytes, {total_size - limit} bytes discarded]' + return text + + @staticmethod + async def _read_stream( + stream: asyncio.StreamReader | None, + limit: int = _MAX_RAW_OUTPUT_BYTES, + ) -> tuple[bytes, int]: + if stream is None: + return b'', 0 + + chunks = bytearray() + total_size = 0 + while True: + chunk = await stream.read(65536) + if not chunk: + break + total_size += len(chunk) + remaining = limit - len(chunks) + if remaining > 0: + chunks.extend(chunk[:remaining]) + + return bytes(chunks), total_size diff --git a/src/langbot_plugin/box/runtime.py b/src/langbot_plugin/box/runtime.py new file mode 100644 index 0000000..292740c --- /dev/null +++ b/src/langbot_plugin/box/runtime.py @@ -0,0 +1,407 @@ +from __future__ import annotations + +import asyncio +import collections +import dataclasses +import datetime as dt +import logging +import uuid + +from .backend import BaseSandboxBackend, DockerBackend +from .nsjail_backend import NsjailBackend +from .errors import ( + BoxBackendUnavailableError, + BoxManagedProcessNotFoundError, + BoxSessionConflictError, + BoxSessionNotFoundError, + BoxValidationError, +) +from .models import ( + BoxExecutionResult, + BoxExecutionStatus, + BoxManagedProcessInfo, + BoxManagedProcessSpec, + BoxManagedProcessStatus, + BoxSessionInfo, + BoxSpec, +) + +_UTC = dt.timezone.utc +_MANAGED_PROCESS_STDERR_PREVIEW_LIMIT = 4000 + + +@dataclasses.dataclass(slots=True) +class _ManagedProcess: + spec: BoxManagedProcessSpec + process: asyncio.subprocess.Process + started_at: dt.datetime + attach_lock: asyncio.Lock + stderr_chunks: collections.deque[str] + stderr_total_len: int = 0 + exit_code: int | None = None + exited_at: dt.datetime | None = None + + @property + def is_running(self) -> bool: + return self.exit_code is None and self.process.returncode is None + + +@dataclasses.dataclass(slots=True) +class _RuntimeSession: + info: BoxSessionInfo + lock: asyncio.Lock + managed_processes: dict[str, _ManagedProcess] = dataclasses.field(default_factory=dict) + + +class BoxRuntime: + def __init__( + self, + logger: logging.Logger, + backends: list[BaseSandboxBackend] | None = None, + session_ttl_sec: int = 300, + ): + self.logger = logger + self.backends = backends or [DockerBackend(logger), NsjailBackend(logger)] + self.session_ttl_sec = session_ttl_sec + self._backend: BaseSandboxBackend | None = None + self._sessions: dict[str, _RuntimeSession] = {} + self._lock = asyncio.Lock() + self.instance_id = uuid.uuid4().hex[:12] + + async def initialize(self): + self._backend = await self._select_backend() + if self._backend is not None: + self._backend.instance_id = self.instance_id + try: + await self._backend.cleanup_orphaned_containers(self.instance_id) + except Exception as exc: + self.logger.warning(f'LangBot Box orphan container cleanup failed: {exc}') + + async def execute(self, spec: BoxSpec) -> BoxExecutionResult: + if not spec.cmd: + raise BoxValidationError('cmd must not be empty') + session = await self._get_or_create_session(spec) + + async with session.lock: + self.logger.info( + 'LangBot Box execute: ' + f'session_id={spec.session_id} ' + f'backend_session_id={session.info.backend_session_id} ' + f'backend={session.info.backend_name} ' + f'workdir={spec.workdir} ' + f'timeout_sec={spec.timeout_sec}' + ) + result = await (await self._get_backend()).exec(session.info, spec) + + async with self._lock: + now = dt.datetime.now(_UTC) + if spec.session_id in self._sessions: + self._sessions[spec.session_id].info.last_used_at = now + + if result.status == BoxExecutionStatus.TIMED_OUT: + await self._drop_session_locked(spec.session_id) + + return result + + async def shutdown(self): + async with self._lock: + session_ids = list(self._sessions.keys()) + for session_id in session_ids: + session = self._sessions.get(session_id) + if session is not None and session.info.persistent: + continue + await self._drop_session_locked(session_id) + + async def create_session(self, spec: BoxSpec) -> dict: + session = await self._get_or_create_session(spec) + return self._session_to_dict(session.info) + + async def delete_session(self, session_id: str) -> None: + async with self._lock: + if session_id not in self._sessions: + raise BoxSessionNotFoundError(f'session {session_id} not found') + await self._drop_session_locked(session_id) + + async def start_managed_process(self, session_id: str, spec: BoxManagedProcessSpec) -> dict: + async with self._lock: + runtime_session = self._sessions.get(session_id) + if runtime_session is None: + raise BoxSessionNotFoundError(f'session {session_id} not found') + + async with runtime_session.lock: + process_id = spec.process_id + existing = runtime_session.managed_processes.get(process_id) + if existing is not None and existing.is_running: + # Terminate the stale process before starting a new one. + # This happens when LangBot restarts while the Box runtime + # keeps the persistent session alive. + self.logger.info( + f'LangBot Box terminating stale managed process before restart: ' + f'session_id={session_id} process_id={process_id}' + ) + await self._terminate_managed_process(existing) + del runtime_session.managed_processes[process_id] + + backend = await self._get_backend() + process = await backend.start_managed_process(runtime_session.info, spec) + managed_process = _ManagedProcess( + spec=spec, + process=process, + started_at=dt.datetime.now(_UTC), + attach_lock=asyncio.Lock(), + stderr_chunks=collections.deque(), + ) + runtime_session.managed_processes[process_id] = managed_process + runtime_session.info.last_used_at = dt.datetime.now(_UTC) + asyncio.create_task(self._drain_managed_process_stderr(runtime_session.info.session_id, process_id, managed_process)) + asyncio.create_task(self._watch_managed_process(runtime_session.info.session_id, process_id, managed_process)) + return self._managed_process_to_dict(runtime_session.info.session_id, process_id, managed_process) + + def get_managed_process(self, session_id: str, process_id: str = 'default') -> dict: + runtime_session = self._sessions.get(session_id) + if runtime_session is None: + raise BoxSessionNotFoundError(f'session {session_id} not found') + managed_process = runtime_session.managed_processes.get(process_id) + if managed_process is None: + raise BoxManagedProcessNotFoundError(f'session {session_id} has no managed process with process_id={process_id}') + return self._managed_process_to_dict(session_id, process_id, managed_process) + + # ── Observability ───────────────────────────────────────────────── + + async def get_backend_info(self) -> dict: + backend = self._backend + if backend is None: + return {'name': None, 'available': False} + try: + available = await backend.is_available() + except Exception: + available = False + return {'name': backend.name, 'available': available} + + def get_sessions(self) -> list[dict]: + return [self._session_to_dict(s.info) for s in self._sessions.values()] + + def get_session(self, session_id: str) -> dict: + runtime_session = self._sessions.get(session_id) + if runtime_session is None: + raise BoxSessionNotFoundError(f'session {session_id} not found') + result = self._session_to_dict(runtime_session.info) + if runtime_session.managed_processes: + result['managed_processes'] = { + pid: self._managed_process_to_dict(session_id, pid, mp) + for pid, mp in runtime_session.managed_processes.items() + } + return result + + async def get_status(self) -> dict: + backend_info = await self.get_backend_info() + return { + 'backend': backend_info, + 'active_sessions': len(self._sessions), + 'managed_processes': sum( + 1 + for runtime_session in self._sessions.values() + for mp in runtime_session.managed_processes.values() + if mp.is_running + ), + 'session_ttl_sec': self.session_ttl_sec, + } + + async def _get_or_create_session(self, spec: BoxSpec) -> _RuntimeSession: + async with self._lock: + await self._reap_expired_sessions_locked() + + existing = self._sessions.get(spec.session_id) + if existing is not None: + self._assert_session_compatible(existing.info, spec) + existing.info.last_used_at = dt.datetime.now(_UTC) + self.logger.info( + 'LangBot Box session reused: ' + f'session_id={spec.session_id} ' + f'backend_session_id={existing.info.backend_session_id} ' + f'backend={existing.info.backend_name}' + ) + return existing + + backend = await self._get_backend() + info = await backend.start_session(spec) + runtime_session = _RuntimeSession(info=info, lock=asyncio.Lock()) + self._sessions[spec.session_id] = runtime_session + self.logger.info( + 'LangBot Box session created: ' + f'session_id={spec.session_id} ' + f'backend_session_id={info.backend_session_id} ' + f'backend={info.backend_name} ' + f'image={info.image} ' + f'network={info.network.value} ' + f'host_path={info.host_path} ' + f'host_path_mode={info.host_path_mode.value} ' + f'mount_path={info.mount_path} ' + f'workspace_quota_mb={info.workspace_quota_mb}' + ) + return runtime_session + + async def _get_backend(self) -> BaseSandboxBackend: + if self._backend is None: + self._backend = await self._select_backend() + if self._backend is None: + raise BoxBackendUnavailableError( + 'LangBot Box backend unavailable. Install and start Docker or nsjail before using exec.' + ) + return self._backend + + async def _select_backend(self) -> BaseSandboxBackend | None: + for backend in self.backends: + try: + await backend.initialize() + if await backend.is_available(): + self.logger.info(f'LangBot Box using backend: {backend.name}') + return backend + except Exception as exc: + self.logger.warning(f'LangBot Box backend {backend.name} probe failed: {exc}') + + self.logger.warning('LangBot Box backend unavailable: no supported backend (Docker, nsjail) is ready') + return None + + async def _reap_expired_sessions_locked(self): + if self.session_ttl_sec <= 0: + return + + deadline = dt.datetime.now(_UTC) - dt.timedelta(seconds=self.session_ttl_sec) + expired_session_ids = [ + session_id + for session_id, session in self._sessions.items() + if not session.info.persistent + and session.info.last_used_at < deadline + and not any(mp.is_running for mp in session.managed_processes.values()) + ] + + for session_id in expired_session_ids: + await self._drop_session_locked(session_id) + + async def _drop_session_locked(self, session_id: str): + runtime_session = self._sessions.pop(session_id, None) + if runtime_session is None or self._backend is None: + return + + for mp in runtime_session.managed_processes.values(): + await self._terminate_managed_process(mp) + + try: + self.logger.info( + 'LangBot Box session cleanup: ' + f'session_id={session_id} ' + f'backend_session_id={runtime_session.info.backend_session_id} ' + f'backend={runtime_session.info.backend_name}' + ) + await self._backend.stop_session(runtime_session.info) + except Exception as exc: + self.logger.warning(f'Failed to clean up box session {session_id}: {exc}') + + def _assert_session_compatible(self, session: BoxSessionInfo, spec: BoxSpec): + _COMPAT_FIELDS = ( + 'network', + 'image', + 'host_path', + 'host_path_mode', + 'mount_path', + 'persistent', + 'cpus', + 'memory_mb', + 'pids_limit', + 'read_only_rootfs', + 'workspace_quota_mb', + ) + for field in _COMPAT_FIELDS: + session_val = getattr(session, field) + spec_val = getattr(spec, field) + if session_val != spec_val: + display = session_val.value if hasattr(session_val, 'value') else session_val + raise BoxSessionConflictError( + f'Box session {spec.session_id} already exists with {field}={display}' + ) + + async def _drain_managed_process_stderr(self, session_id: str, process_id: str, managed_process: _ManagedProcess) -> None: + stream = managed_process.process.stderr + if stream is None: + return + + try: + while True: + chunk = await stream.readline() + if not chunk: + break + text = chunk.decode('utf-8', errors='replace').rstrip() + if not text: + continue + managed_process.stderr_chunks.append(text) + managed_process.stderr_total_len += len(text) + 1 # +1 for '\n' separator + while ( + managed_process.stderr_total_len > _MANAGED_PROCESS_STDERR_PREVIEW_LIMIT + and managed_process.stderr_chunks + ): + removed = managed_process.stderr_chunks.popleft() + managed_process.stderr_total_len -= len(removed) + 1 + self.logger.info(f'LangBot Box managed process stderr: session_id={session_id} process_id={process_id} {text}') + except Exception as exc: + self.logger.warning(f'Failed to drain managed process stderr for {session_id}/{process_id}: {exc}') + + async def _watch_managed_process(self, session_id: str, process_id: str, managed_process: _ManagedProcess) -> None: + return_code = await managed_process.process.wait() + managed_process.exit_code = return_code + managed_process.exited_at = dt.datetime.now(_UTC) + runtime_session = self._sessions.get(session_id) + if runtime_session is not None: + runtime_session.info.last_used_at = managed_process.exited_at + self.logger.info(f'LangBot Box managed process exited: session_id={session_id} process_id={process_id} return_code={return_code}') + + async def _terminate_managed_process(self, managed_process: _ManagedProcess) -> None: + if not managed_process.is_running: + return + + process = managed_process.process + try: + if process.stdin is not None: + process.stdin.close() + except Exception: + pass + + try: + if process.returncode is None: + try: + process.terminate() + except ProcessLookupError: + pass + await asyncio.wait_for(asyncio.shield(process.wait()), timeout=5) + except asyncio.TimeoutError: + if process.returncode is None: + try: + process.kill() + except ProcessLookupError: + pass + await process.wait() + finally: + managed_process.exit_code = process.returncode + managed_process.exited_at = dt.datetime.now(_UTC) + + def _managed_process_to_dict(self, session_id: str, process_id: str, managed_process: _ManagedProcess) -> dict: + stderr_preview = '\n'.join(managed_process.stderr_chunks) + status = BoxManagedProcessStatus.RUNNING if managed_process.is_running else BoxManagedProcessStatus.EXITED + return BoxManagedProcessInfo( + session_id=session_id, + process_id=process_id, + status=status, + command=managed_process.spec.command, + args=managed_process.spec.args, + cwd=managed_process.spec.cwd, + env_keys=sorted(managed_process.spec.env.keys()), + attached=managed_process.attach_lock.locked(), + started_at=managed_process.started_at, + exited_at=managed_process.exited_at, + exit_code=managed_process.exit_code, + stderr_preview=stderr_preview, + ).model_dump(mode='json') + + @staticmethod + def _session_to_dict(info: BoxSessionInfo) -> dict: + return info.model_dump(mode='json') diff --git a/src/langbot_plugin/box/security.py b/src/langbot_plugin/box/security.py new file mode 100644 index 0000000..7b3b98e --- /dev/null +++ b/src/langbot_plugin/box/security.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import os +import sys + +from .errors import BoxValidationError +from .models import BoxSpec + +_BLOCKED_HOST_PATHS_POSIX = frozenset( + { + '/etc', + '/proc', + '/sys', + '/dev', + '/root', + '/boot', + '/run', + '/var/run', + '/run/docker.sock', + '/var/run/docker.sock', + } +) + +_BLOCKED_HOST_PATHS_WINDOWS = frozenset( + { + r'C:\Windows', + r'C:\Program Files', + r'C:\Program Files (x86)', + r'C:\ProgramData', + r'\\.\pipe\docker_engine', + } +) + +BLOCKED_HOST_PATHS = ( + _BLOCKED_HOST_PATHS_POSIX | _BLOCKED_HOST_PATHS_WINDOWS + if sys.platform == 'win32' + else _BLOCKED_HOST_PATHS_POSIX +) + + +def validate_sandbox_security(spec: BoxSpec) -> None: + """Validate that a BoxSpec does not request dangerous container config. + + Raises BoxValidationError when the spec contains a blocked host_path. + """ + if spec.host_path: + real = os.path.realpath(spec.host_path) + sep = os.sep + _norm = os.path.normcase + for blocked in BLOCKED_HOST_PATHS: + if _norm(real) == _norm(blocked) or _norm(real).startswith(_norm(blocked) + sep): + raise BoxValidationError(f'host_path {spec.host_path} is blocked for security') diff --git a/src/langbot_plugin/box/server.py b/src/langbot_plugin/box/server.py new file mode 100644 index 0000000..c2bb8f8 --- /dev/null +++ b/src/langbot_plugin/box/server.py @@ -0,0 +1,341 @@ +"""Standalone Box Runtime service exposing BoxRuntime via action RPC. + +Usage (stdio, launched by LangBot as subprocess): + python -m langbot_plugin.box.server + +Usage (ws, for remote/docker mode): + python -m langbot_plugin.box.server --mode ws --port 5410 + +All WebSocket endpoints share a single port (default 5410): + /rpc/ws — Action RPC (control channel) + /v1/sessions/{session_id}/managed-process/{process_id}/ws — Managed process stdio relay + /v1/sessions/{session_id}/managed-process/ws — Legacy (process_id defaults to 'default') +""" + +from __future__ import annotations + +import argparse +import asyncio +import datetime as dt +import json +import logging +import sys +from typing import Any + +import pydantic +from aiohttp import web + +from langbot_plugin.entities.io.actions.enums import CommonAction +from langbot_plugin.entities.io.errors import ConnectionClosedError +from langbot_plugin.entities.io.resp import ActionResponse +from langbot_plugin.runtime.io.connection import Connection +from langbot_plugin.runtime.io.handler import Handler +from langbot_plugin.utils.log import configure_process_logging + +from .actions import LangBotToBoxAction +from .errors import ( + BoxManagedProcessConflictError, + BoxManagedProcessNotFoundError, + BoxSessionNotFoundError, +) +from .models import BoxExecutionResult, BoxManagedProcessSpec, BoxSpec +from .runtime import BoxRuntime + +logger = logging.getLogger('langbot.box.server') + + +def _result_to_dict(result: BoxExecutionResult) -> dict: + return result.model_dump(mode='json') + + +# ── aiohttp WebSocket → Connection adapter ─────────────────────────── + + +class AiohttpWSConnection(Connection): + """Adapt an aiohttp ``WebSocketResponse`` to the SDK ``Connection`` interface. + + This allows ``BoxServerHandler`` (and therefore ``Handler``) to work over + an aiohttp WebSocket without any changes to the handler/IO layer. + """ + + def __init__(self, ws: web.WebSocketResponse) -> None: + self._ws = ws + self._send_lock = asyncio.Lock() + + async def send(self, message: str) -> None: + async with self._send_lock: + try: + await self._ws.send_str(message) + except ConnectionResetError: + raise ConnectionClosedError('Connection closed during send') + + async def receive(self) -> str: + msg = await self._ws.receive() + if msg.type == web.WSMsgType.TEXT: + return msg.data + if msg.type in ( + web.WSMsgType.CLOSE, + web.WSMsgType.CLOSING, + web.WSMsgType.CLOSED, + web.WSMsgType.ERROR, + ): + raise ConnectionClosedError('Connection closed') + raise ConnectionClosedError(f'Unexpected message type: {msg.type}') + + async def close(self) -> None: + await self._ws.close() + + +# ── BoxServerHandler ───────────────────────────────────────────────── + + +class BoxServerHandler(Handler): + """Server-side handler that registers box actions backed by BoxRuntime.""" + + name = 'BoxServerHandler' + + def __init__(self, connection: Connection, runtime: BoxRuntime): + super().__init__(connection) + self._runtime = runtime + self._register_actions() + + def _register_actions(self) -> None: + @self.action(CommonAction.PING) + async def ping(data: dict[str, Any]) -> ActionResponse: + return ActionResponse.success({}) + + @self.action(LangBotToBoxAction.HEALTH) + async def health(data: dict[str, Any]) -> ActionResponse: + info = await self._runtime.get_backend_info() + return ActionResponse.success(info) + + @self.action(LangBotToBoxAction.STATUS) + async def status(data: dict[str, Any]) -> ActionResponse: + result = await self._runtime.get_status() + return ActionResponse.success(result) + + @self.action(LangBotToBoxAction.EXEC) + async def exec_cmd(data: dict[str, Any]) -> ActionResponse: + try: + spec = BoxSpec.model_validate(data) + except pydantic.ValidationError as exc: + return ActionResponse.error(f'BoxValidationError: {exc}') + result = await self._runtime.execute(spec) + return ActionResponse.success(_result_to_dict(result)) + + @self.action(LangBotToBoxAction.CREATE_SESSION) + async def create_session(data: dict[str, Any]) -> ActionResponse: + try: + spec = BoxSpec.model_validate(data) + except pydantic.ValidationError as exc: + return ActionResponse.error(f'BoxValidationError: {exc}') + info = await self._runtime.create_session(spec) + return ActionResponse.success(info) + + @self.action(LangBotToBoxAction.GET_SESSION) + async def get_session(data: dict[str, Any]) -> ActionResponse: + return ActionResponse.success(self._runtime.get_session(data['session_id'])) + + @self.action(LangBotToBoxAction.GET_SESSIONS) + async def get_sessions(data: dict[str, Any]) -> ActionResponse: + return ActionResponse.success({'sessions': self._runtime.get_sessions()}) + + @self.action(LangBotToBoxAction.DELETE_SESSION) + async def delete_session(data: dict[str, Any]) -> ActionResponse: + await self._runtime.delete_session(data['session_id']) + return ActionResponse.success({'deleted': data['session_id']}) + + @self.action(LangBotToBoxAction.START_MANAGED_PROCESS) + async def start_managed_process(data: dict[str, Any]) -> ActionResponse: + session_id = data['session_id'] + try: + spec = BoxManagedProcessSpec.model_validate(data['spec']) + except pydantic.ValidationError as exc: + return ActionResponse.error(f'BoxValidationError: {exc}') + info = await self._runtime.start_managed_process(session_id, spec) + return ActionResponse.success(info) + + @self.action(LangBotToBoxAction.GET_MANAGED_PROCESS) + async def get_managed_process(data: dict[str, Any]) -> ActionResponse: + return ActionResponse.success(self._runtime.get_managed_process( + data['session_id'], + data.get('process_id', 'default'), + )) + + @self.action(LangBotToBoxAction.GET_BACKEND_INFO) + async def get_backend_info(data: dict[str, Any]) -> ActionResponse: + info = await self._runtime.get_backend_info() + return ActionResponse.success(info) + + @self.action(LangBotToBoxAction.SHUTDOWN) + async def shutdown(data: dict[str, Any]) -> ActionResponse: + await self._runtime.shutdown() + return ActionResponse.success({}) + + +# ── Managed process WebSocket relay ────────────────────────────────── + + +def _error_response(exc: Exception) -> web.Response: + return web.json_response( + {'error': {'code': type(exc).__name__, 'message': str(exc)}}, + status=400, + ) + + +async def handle_managed_process_ws(request: web.Request) -> web.StreamResponse: + runtime: BoxRuntime = request.app['runtime'] + session_id = request.match_info['session_id'] + process_id = request.match_info.get('process_id', 'default') + + runtime_session = runtime._sessions.get(session_id) + if runtime_session is None: + return _error_response(BoxSessionNotFoundError(f'session {session_id} not found')) + + managed_process = runtime_session.managed_processes.get(process_id) + if managed_process is None: + return _error_response(BoxManagedProcessNotFoundError(f'session {session_id} has no managed process with process_id={process_id}')) + if not managed_process.is_running: + return _error_response( + BoxManagedProcessConflictError(f'managed process {process_id} in session {session_id} is not running') + ) + + ws = web.WebSocketResponse(protocols=('mcp',)) + await ws.prepare(request) + + async with managed_process.attach_lock: + process = managed_process.process + stdout = process.stdout + stdin = process.stdin + if stdout is None or stdin is None: + await ws.close(message=b'managed process stdio unavailable') + return ws + + async def _stdout_to_ws() -> None: + while True: + line = await stdout.readline() + if not line: + break + await ws.send_str(line.decode('utf-8', errors='replace').rstrip('\n')) + runtime_session.info.last_used_at = dt.datetime.now(dt.timezone.utc) + + async def _ws_to_stdin() -> None: + async for msg in ws: + if msg.type == web.WSMsgType.TEXT: + stdin.write((msg.data + '\n').encode('utf-8')) + await stdin.drain() + runtime_session.info.last_used_at = dt.datetime.now(dt.timezone.utc) + elif msg.type in ( + web.WSMsgType.CLOSE, + web.WSMsgType.CLOSING, + web.WSMsgType.CLOSED, + web.WSMsgType.ERROR, + ): + break + + stdout_task = asyncio.create_task(_stdout_to_ws()) + stdin_task = asyncio.create_task(_ws_to_stdin()) + try: + done, pending = await asyncio.wait( + [stdout_task, stdin_task], + return_when=asyncio.FIRST_COMPLETED, + ) + for task in pending: + task.cancel() + for task in done: + task.result() + finally: + await ws.close() + + return ws + + +# ── Action RPC WebSocket handler ───────────────────────────────────── + + +async def handle_rpc_ws(request: web.Request) -> web.StreamResponse: + """Handle action RPC over a single aiohttp WebSocket connection.""" + runtime: BoxRuntime = request.app['runtime'] + + ws = web.WebSocketResponse() + await ws.prepare(request) + + connection = AiohttpWSConnection(ws) + handler = BoxServerHandler(connection, runtime) + await handler.run() + + return ws + + +# ── App factory ────────────────────────────────────────────────────── + + +def create_app(runtime: BoxRuntime) -> web.Application: + """Create the aiohttp app with all WebSocket routes on a single port.""" + app = web.Application() + app['runtime'] = runtime + app.router.add_get('/rpc/ws', handle_rpc_ws) + app.router.add_get('/v1/sessions/{session_id}/managed-process/{process_id}/ws', handle_managed_process_ws) + # Backward-compatible route (defaults to process_id='default') + app.router.add_get('/v1/sessions/{session_id}/managed-process/ws', handle_managed_process_ws) + return app + + +# ── Entry point ────────────────────────────────────────────────────── + + +async def _run_server(host: str, port: int, mode: str) -> None: + runtime = BoxRuntime(logger=logger) + await runtime.initialize() + + # Start aiohttp — serves managed-process relay and (in ws mode) + # also the action RPC endpoint, all on the same port. + runner: web.AppRunner | None = None + try: + ws_app = create_app(runtime) + runner = web.AppRunner(ws_app) + await runner.setup() + site = web.TCPSite(runner, host, port) + await site.start() + logger.info(f'Box server listening on {host}:{port}') + except OSError as exc: + logger.warning(f'Box server failed to bind {host}:{port}: {exc}') + logger.warning('Managed process WebSocket attach will be unavailable.') + + try: + if mode == 'stdio': + from langbot_plugin.runtime.io.controllers.stdio.server import StdioServerController + + async def new_connection_callback(connection: Connection) -> None: + handler = BoxServerHandler(connection, runtime) + await handler.run() + + ctrl = StdioServerController() + await ctrl.run(new_connection_callback) + else: + # In ws mode, action RPC is served via aiohttp on /rpc/ws. + # Keep the server alive until cancelled. + logger.info(f'Box action RPC available at ws://{host}:{port}/rpc/ws') + stop_event = asyncio.Event() + await stop_event.wait() + finally: + await runtime.shutdown() + if runner is not None: + await runner.cleanup() + + +def main(argv: list[str] | None = None) -> None: + parser = argparse.ArgumentParser(description='LangBot Box Runtime Service') + parser.add_argument('--host', default='0.0.0.0', help='Bind address') + parser.add_argument('--port', type=int, default=5410, help='Bind port') + parser.add_argument( + '--mode', choices=['stdio', 'ws'], default='stdio', help='Control channel transport (default: stdio)' + ) + args = parser.parse_args(argv) + + configure_process_logging(stream=sys.stderr) + asyncio.run(_run_server(args.host, args.port, args.mode)) + + +if __name__ == '__main__': + main() diff --git a/src/langbot_plugin/cli/__init__.py b/src/langbot_plugin/cli/__init__.py index 7f913f2..ff9174c 100644 --- a/src/langbot_plugin/cli/__init__.py +++ b/src/langbot_plugin/cli/__init__.py @@ -33,6 +33,10 @@ - [--stdio-control -s]: Use stdio for control connection - [--ws-control-port]: The port for control connection - [--ws-debug-port]: The port for debug connection + box: Run the sandbox box runtime + - [--host]: Bind address, default is 0.0.0.0 + - [--port]: Bind port for ws relay, default is 5410 + - [--mode]: Control channel transport (stdio or ws), default is stdio """ @@ -120,6 +124,19 @@ def main(): help="Skip checking and installing dependencies for all installed plugins", ) + # box command + box_parser = subparsers.add_parser("box", help="Run the sandbox box runtime") + box_parser.add_argument( + "--host", default="0.0.0.0", help="Bind address" + ) + box_parser.add_argument( + "--port", type=int, default=5410, help="Bind port (ws relay)" + ) + box_parser.add_argument( + "--mode", choices=["stdio", "ws"], default="stdio", + help="Control channel transport (default: stdio)" + ) + args = parser.parse_args() if not args.command: @@ -148,6 +165,9 @@ def main(): publish_process() case "rt": runtime_app.main(args) + case "box": + from langbot_plugin.box.server import main as box_main + box_main(sys.argv[2:]) case _: cli_print("unknown_command", args.command) sys.exit(1) diff --git a/tests/box/__init__.py b/tests/box/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/box/test_nsjail_backend.py b/tests/box/test_nsjail_backend.py new file mode 100644 index 0000000..fca4e2b --- /dev/null +++ b/tests/box/test_nsjail_backend.py @@ -0,0 +1,384 @@ +"""Unit tests for NsjailBackend. + +These tests do NOT require nsjail to be installed – they mock subprocess +calls and filesystem checks to verify argument construction, session +directory management, and cgroup detection logic. +""" + +from __future__ import annotations + +import asyncio +import logging +import pathlib +from unittest import mock + +import pytest + +from langbot_plugin.box.nsjail_backend import ( + NsjailBackend, + _READONLY_ETC_ENTRIES, + _READONLY_SYSTEM_MOUNTS, +) +from langbot_plugin.box.models import ( + BoxExecutionStatus, + BoxHostMountMode, + BoxNetworkMode, + BoxSessionInfo, + BoxSpec, +) + + +@pytest.fixture +def logger(): + return logging.getLogger('test.nsjail') + + +@pytest.fixture +def tmp_base(tmp_path: pathlib.Path): + return tmp_path / 'nsjail-base' + + +@pytest.fixture +def backend(logger, tmp_base): + b = NsjailBackend(logger=logger, base_dir=str(tmp_base)) + b.instance_id = 'test123' + return b + + +# ── is_available ────────────────────────────────────────────────────── + +@pytest.mark.anyio +async def test_is_available_no_binary(backend): + with mock.patch('shutil.which', return_value=None): + assert await backend.is_available() is False + + +@pytest.mark.anyio +async def test_is_available_binary_exists(backend, tmp_base): + with ( + mock.patch('shutil.which', return_value='/usr/bin/nsjail'), + mock.patch('asyncio.create_subprocess_exec') as mock_exec, + ): + mock_proc = mock.AsyncMock() + mock_proc.returncode = 0 + mock_proc.wait = mock.AsyncMock(return_value=0) + mock_exec.return_value = mock_proc + + result = await backend.is_available() + assert result is True + assert tmp_base.exists() + + +# ── start_session ───────────────────────────────────────────────────── + +@pytest.mark.anyio +async def test_start_session_creates_directories(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + spec = BoxSpec(session_id='sess1', cmd='echo hi') + + info = await backend.start_session(spec) + + session_dir = pathlib.Path(info.backend_session_id) + assert session_dir.exists() + assert (session_dir / 'workspace').is_dir() + assert (session_dir / 'tmp').is_dir() + assert (session_dir / 'home').is_dir() + assert (session_dir / 'meta.json').exists() + + assert info.backend_name == 'nsjail' + assert info.session_id == 'sess1' + assert info.image == 'host' + assert info.read_only_rootfs is True + + +@pytest.mark.anyio +async def test_start_session_with_host_path(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + spec = BoxSpec( + session_id='sess2', + cmd='ls', + host_path='/some/path', + host_path_mode=BoxHostMountMode.READ_WRITE, + mount_path='/project', + ) + + info = await backend.start_session(spec) + assert info.host_path == '/some/path' + assert info.host_path_mode == BoxHostMountMode.READ_WRITE + assert info.mount_path == '/project' + + +# ── stop_session ────────────────────────────────────────────────────── + +@pytest.mark.anyio +async def test_stop_session_removes_directory(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + spec = BoxSpec(session_id='sess-rm', cmd='echo') + + info = await backend.start_session(spec) + session_dir = pathlib.Path(info.backend_session_id) + assert session_dir.exists() + + await backend.stop_session(info) + assert not session_dir.exists() + + +# ── nsjail argument construction ────────────────────────────────────── + +def test_build_nsjail_args_basic(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + session_dir = tmp_base / 'test_session' + for d in ('workspace', 'tmp', 'home'): + (session_dir / d).mkdir(parents=True) + + session = BoxSessionInfo( + session_id='s1', + backend_name='nsjail', + backend_session_id=str(session_dir), + image='host', + network=BoxNetworkMode.OFF, + created_at='2024-01-01T00:00:00+00:00', + last_used_at='2024-01-01T00:00:00+00:00', + ) + spec = BoxSpec(session_id='s1', cmd='echo hello', env={'FOO': 'bar'}) + + args = backend._build_nsjail_args(session, spec, session_dir) + + assert args[0] == 'nsjail' + assert '--mode' in args + assert args[args.index('--mode') + 1] == 'o' + assert '--clone_newnet' in args + assert '--disable_clone_newnet' not in args + assert '--really_quiet' in args + + # Writable mounts should reference session directories. + rw_binds = [args[i + 1] for i, a in enumerate(args) if a == '--rw_bind'] + workspace_mount = f'{session_dir}/workspace:/workspace' + assert workspace_mount in rw_binds + + # Custom env should be present. + env_values = [args[i + 1] for i, a in enumerate(args) if a == '--env'] + assert 'FOO=bar' in env_values + + # Command is the last part after '--'. + separator_idx = args.index('--') + assert args[separator_idx + 1] == 'sh' + + +def test_build_nsjail_args_network_on(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + session_dir = tmp_base / 'test_session_net' + for d in ('workspace', 'tmp', 'home'): + (session_dir / d).mkdir(parents=True) + + session = BoxSessionInfo( + session_id='s2', + backend_name='nsjail', + backend_session_id=str(session_dir), + image='host', + network=BoxNetworkMode.ON, + created_at='2024-01-01T00:00:00+00:00', + last_used_at='2024-01-01T00:00:00+00:00', + ) + spec = BoxSpec(session_id='s2', cmd='curl http://example.com', network=BoxNetworkMode.ON) + + args = backend._build_nsjail_args(session, spec, session_dir) + + assert '--disable_clone_newnet' in args + assert '--clone_newnet' not in args + + +def test_build_nsjail_args_host_path_ro(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + session_dir = tmp_base / 'test_hp' + for d in ('workspace', 'tmp', 'home'): + (session_dir / d).mkdir(parents=True) + + session = BoxSessionInfo( + session_id='s3', + backend_name='nsjail', + backend_session_id=str(session_dir), + image='host', + network=BoxNetworkMode.OFF, + host_path='/data/project', + host_path_mode=BoxHostMountMode.READ_ONLY, + created_at='2024-01-01T00:00:00+00:00', + last_used_at='2024-01-01T00:00:00+00:00', + ) + spec = BoxSpec( + session_id='s3', + cmd='ls', + host_path='/data/project', + host_path_mode=BoxHostMountMode.READ_ONLY, + ) + + args = backend._build_nsjail_args(session, spec, session_dir) + + ro_binds = [args[i + 1] for i, a in enumerate(args) if a == '--bindmount_ro'] + assert '/data/project:/workspace' in ro_binds + + +def test_build_nsjail_args_uses_custom_mount_path(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + session_dir = tmp_base / 'test_custom_mount' + for d in ('workspace', 'tmp', 'home'): + (session_dir / d).mkdir(parents=True) + + session = BoxSessionInfo( + session_id='s4', + backend_name='nsjail', + backend_session_id=str(session_dir), + image='host', + network=BoxNetworkMode.OFF, + host_path='/data/project', + host_path_mode=BoxHostMountMode.READ_WRITE, + mount_path='/project', + created_at='2024-01-01T00:00:00+00:00', + last_used_at='2024-01-01T00:00:00+00:00', + ) + spec = BoxSpec( + session_id='s4', + cmd='pwd', + workdir='/project/src', + host_path='/data/project', + host_path_mode=BoxHostMountMode.READ_WRITE, + mount_path='/project', + ) + + args = backend._build_nsjail_args(session, spec, session_dir) + + rw_binds = [args[i + 1] for i, a in enumerate(args) if a == '--rw_bind'] + assert '/data/project:/project' in rw_binds + assert args[args.index('--cwd') + 1] == '/project/src' + + +def test_build_resource_limits_cgroup(backend): + backend._cgroup_v2_available = True + spec = BoxSpec(session_id='s', cmd='x', cpus=2.0, memory_mb=1024, pids_limit=256) + + args = backend._build_resource_limits(spec) + + assert '--cgroup_mem_max' in args + mem_idx = args.index('--cgroup_mem_max') + assert args[mem_idx + 1] == str(1024 * 1024 * 1024) + + pids_idx = args.index('--cgroup_pids_max') + assert args[pids_idx + 1] == '256' + + cpu_idx = args.index('--cgroup_cpu_ms_per_sec') + assert args[cpu_idx + 1] == '2000' + + +def test_build_resource_limits_rlimit_fallback(backend): + backend._cgroup_v2_available = False + spec = BoxSpec(session_id='s', cmd='x', memory_mb=512, pids_limit=128) + + args = backend._build_resource_limits(spec) + + assert '--rlimit_as' in args + as_idx = args.index('--rlimit_as') + assert args[as_idx + 1] == '512' + + nproc_idx = args.index('--rlimit_nproc') + assert args[nproc_idx + 1] == '128' + + # cgroup flags should NOT be present. + assert '--cgroup_mem_max' not in args + + +# ── exec ────────────────────────────────────────────────────────────── + +@pytest.mark.anyio +async def test_exec_success(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + spec = BoxSpec(session_id='exec1', cmd='echo hello') + info = await backend.start_session(spec) + + with mock.patch.object(backend, '_run_nsjail') as mock_run: + from langbot_plugin.box.backend import _CommandResult + mock_run.return_value = _CommandResult( + return_code=0, stdout='hello\n', stderr='', timed_out=False + ) + + result = await backend.exec(info, spec) + + assert result.status == BoxExecutionStatus.COMPLETED + assert result.exit_code == 0 + assert result.stdout == 'hello\n' + assert result.backend_name == 'nsjail' + + +@pytest.mark.anyio +async def test_exec_timeout(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + spec = BoxSpec(session_id='exec2', cmd='sleep 100', timeout_sec=1) + info = await backend.start_session(spec) + + with mock.patch.object(backend, '_run_nsjail') as mock_run: + from langbot_plugin.box.backend import _CommandResult + mock_run.return_value = _CommandResult( + return_code=-1, stdout='', stderr='', timed_out=True + ) + + result = await backend.exec(info, spec) + + assert result.status == BoxExecutionStatus.TIMED_OUT + assert result.exit_code is None + + +# ── cgroup detection ────────────────────────────────────────────────── + +def test_detect_cgroup_v2_no_mount(): + with mock.patch.object(pathlib.Path, 'exists', return_value=False): + assert NsjailBackend._detect_cgroup_v2() is False + + +def test_detect_cgroup_v2_root_user(): + orig_exists = pathlib.Path.exists + + def always_exists(self): + return True + + with ( + mock.patch('os.getuid', return_value=0), + mock.patch.object(pathlib.Path, 'exists', always_exists), + ): + assert NsjailBackend._detect_cgroup_v2() is True + + +# ── cleanup_orphaned_containers ─────────────────────────────────────── + +@pytest.mark.anyio +async def test_cleanup_orphaned_removes_old_sessions(backend, tmp_base): + tmp_base.mkdir(parents=True, exist_ok=True) + + # Create a dir from a different instance. + old_dir = tmp_base / 'oldinst_sess1_abc' + old_dir.mkdir() + (old_dir / 'workspace').mkdir() + + # Create a dir from current instance. + current_dir = tmp_base / 'test123_sess2_def' + current_dir.mkdir() + (current_dir / 'workspace').mkdir() + + with mock.patch.object(backend, '_kill_session_processes', new_callable=mock.AsyncMock): + await backend.cleanup_orphaned_containers('test123') + + assert not old_dir.exists() + assert current_dir.exists() + + +# ── output clipping ────────────────────────────────────────────────── + +def test_clip_captured_bytes_within_limit(): + data = b'hello world' + result = NsjailBackend._clip_captured_bytes(data, len(data)) + assert result == 'hello world' + + +def test_clip_captured_bytes_exceeds_limit(): + data = b'hello' + result = NsjailBackend._clip_captured_bytes(data, 2_000_000, limit=1_000_000) + assert 'clipped' in result + assert '1000000' in result