-
Notifications
You must be signed in to change notification settings - Fork 5
feat(web-ui): AgentTerminal — XTerm.js interactive terminal for session workspace (#506) #516
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6fbacba
3d3254f
275338c
d0400cf
a93d069
e104221
f38b55e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,267 @@ | ||
| """WebSocket router for interactive terminal in a session workspace. | ||
|
|
||
| Endpoint: | ||
| WS /ws/sessions/{session_id}/terminal?token=<JWT> | ||
|
|
||
| Client → Server message types: | ||
| Raw bytes / text: forwarded verbatim to subprocess stdin. | ||
| {"type": "resize", "cols": 120, "rows": 40}: resize the terminal window. | ||
|
|
||
| Server → Client: | ||
| Raw bytes from subprocess stdout/stderr. | ||
|
|
||
| Note: Uses asyncio pipes (not PTY) for simplicity. Arrow keys, colour output, | ||
| and interactive programs like vim require a PTY — that is a known limitation of | ||
| this initial implementation. | ||
| """ | ||
|
|
||
| import asyncio | ||
| import json | ||
| import logging | ||
| import os | ||
| import shutil | ||
|
|
||
| import jwt as pyjwt | ||
| from fastapi import APIRouter, WebSocket, WebSocketDisconnect | ||
| from sqlalchemy import select | ||
|
|
||
| from codeframe.auth.manager import SECRET, JWT_ALGORITHM, JWT_AUDIENCE, get_async_session_maker | ||
| from codeframe.auth.models import User | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| router = APIRouter(tags=["websocket"]) | ||
|
|
||
| # Per-user concurrent terminal connection counter (in-process; resets on restart) | ||
| _MAX_TERMINALS_PER_USER = 3 | ||
| _user_terminal_counts: dict[int, int] = {} | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Helpers | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
|
|
||
| async def _authenticate_websocket(websocket: WebSocket) -> int | None: | ||
| """Validate JWT from query param. Returns user_id or closes the socket.""" | ||
| token = websocket.query_params.get("token") | ||
| if not token: | ||
| await websocket.close(code=4001, reason="Authentication required: missing token") | ||
| return None | ||
|
|
||
| try: | ||
| payload = pyjwt.decode(token, SECRET, algorithms=[JWT_ALGORITHM], audience=JWT_AUDIENCE) | ||
| user_id_str = payload.get("sub") | ||
| if not user_id_str: | ||
| await websocket.close(code=4001, reason="Invalid token: missing subject") | ||
| return None | ||
| user_id = int(user_id_str) | ||
| except pyjwt.ExpiredSignatureError: | ||
| await websocket.close(code=4001, reason="Token expired") | ||
| return None | ||
| except (pyjwt.InvalidTokenError, ValueError) as exc: | ||
| logger.debug("Terminal WS JWT decode error: %s", exc) | ||
| await websocket.close(code=4001, reason="Invalid authentication token") | ||
| return None | ||
|
|
||
| try: | ||
| async_session_maker = get_async_session_maker() | ||
| async with async_session_maker() as session: | ||
| result = await session.execute(select(User).where(User.id == user_id)) | ||
| user = result.scalar_one_or_none() | ||
| if user is None: | ||
| await websocket.close(code=4001, reason="User not found") | ||
| return None | ||
| if not user.is_active: | ||
| await websocket.close(code=4001, reason="User is inactive") | ||
| return None | ||
| except Exception as exc: | ||
| logger.error("Terminal WS user lookup error: %s", exc) | ||
| await websocket.close(code=4001, reason="Authentication failed") | ||
| return None | ||
|
|
||
| return user_id | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Endpoint | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
|
|
||
| @router.websocket("/ws/sessions/{session_id}/terminal") | ||
| async def session_terminal_ws(session_id: str, websocket: WebSocket) -> None: | ||
| """Bidirectional WebSocket that shells bash in the session's workspace.""" | ||
| # --- Auth --- | ||
| user_id = await _authenticate_websocket(websocket) | ||
| if user_id is None: | ||
| return | ||
|
|
||
| # --- Session lookup --- | ||
| db = getattr(websocket.app.state, "db", None) | ||
| if db is None: | ||
| await websocket.close(code=1011, reason="Database unavailable") | ||
| return | ||
|
|
||
| session = await asyncio.to_thread(db.interactive_sessions.get, session_id) | ||
| if session is None or session.get("state") == "ended": | ||
| await websocket.close(code=4004, reason="Session not found or ended") | ||
| return | ||
|
|
||
| # --- Ownership check --- | ||
| session_user_id = session.get("user_id") | ||
| if session_user_id is not None and int(session_user_id) != user_id: | ||
| await websocket.close(code=4003, reason="Forbidden: session belongs to another user") | ||
| return | ||
|
Comment on lines
+110
to
+114
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Verify if interactive_sessions table schema includes user_id column
rg -n "user_id" codeframe/persistence/repositories/interactive_sessions.py
# Check the schema definition
rg -n "CREATE TABLE.*interactive_sessions" -A 20 codeframe/persistence/
# Check if session_chat_ws has the same pattern
rg -n "session.get.*user_id" codeframe/ui/routers/Repository: frankbria/codeframe Length of output: 1882 Critical: Ownership check is silently bypassed due to missing The Implement fail-closed behavior: Suggested fix # --- Ownership check ---
session_user_id = session.get("user_id")
- if session_user_id is not None and int(session_user_id) != user_id:
+ if session_user_id is None:
+ logger.warning("Session %s missing user_id; denying terminal access", session_id)
+ await websocket.close(code=4003, reason="Session ownership cannot be verified")
+ return
+ if int(session_user_id) != user_id:
await websocket.close(code=4003, reason="Forbidden: session belongs to another user")
return🤖 Prompt for AI Agents |
||
|
|
||
| workspace_path = session.get("workspace_path") | ||
| if not workspace_path: | ||
| logger.error("session_id=%s has no workspace_path; refusing terminal spawn", session_id) | ||
| await websocket.close(code=4008, reason="Session has no workspace configured") | ||
| return | ||
|
|
||
| # --- Per-user connection cap --- | ||
| current = _user_terminal_counts.get(user_id, 0) | ||
| if current >= _MAX_TERMINALS_PER_USER: | ||
| await websocket.close(code=4029, reason="Too many open terminals; close an existing session first") | ||
| return | ||
| _user_terminal_counts[user_id] = current + 1 | ||
|
|
||
| await websocket.accept() | ||
|
|
||
| # --- Spawn bash with a minimal, explicit environment --- | ||
| # Do NOT use os.environ.copy() — it would expose server secrets (API keys, DB creds) | ||
| # to the subprocess. Only pass variables required for a functional terminal. | ||
| env = { | ||
| "TERM": "xterm-256color", | ||
| "HOME": os.environ.get("HOME", "/tmp"), | ||
| "PATH": os.environ.get("PATH", "/usr/local/bin:/usr/bin:/bin"), | ||
| "SHELL": "/bin/bash", | ||
| "LANG": os.environ.get("LANG", "en_US.UTF-8"), | ||
| "USER": os.environ.get("USER", ""), | ||
| } | ||
|
|
||
| shell_exe = shutil.which("bash") or shutil.which("sh") or "sh" | ||
|
|
||
| process: asyncio.subprocess.Process | None = None | ||
| ws_to_stdin_task: asyncio.Task | None = None | ||
| stdout_to_ws_task: asyncio.Task | None = None | ||
|
|
||
| try: | ||
| process = await asyncio.create_subprocess_exec( | ||
| shell_exe, | ||
| stdin=asyncio.subprocess.PIPE, | ||
| stdout=asyncio.subprocess.PIPE, | ||
| stderr=asyncio.subprocess.STDOUT, | ||
| cwd=workspace_path, | ||
| env=env, | ||
| ) | ||
|
|
||
| # --- Relay: stdout → WebSocket --- | ||
| async def _stdout_relay() -> None: | ||
| assert process is not None | ||
| assert process.stdout is not None | ||
| try: | ||
| while True: | ||
| chunk = await process.stdout.read(4096) | ||
| if not chunk: | ||
| break | ||
| try: | ||
| await websocket.send_bytes(chunk) | ||
| except Exception: | ||
| break | ||
| except asyncio.CancelledError: | ||
| pass | ||
| except Exception as exc: | ||
| logger.debug("Terminal stdout relay error: %s", exc) | ||
|
|
||
| # --- Relay: WebSocket → stdin (handles both text and binary frames) --- | ||
| async def _stdin_relay() -> None: | ||
| assert process is not None | ||
| assert process.stdin is not None | ||
| try: | ||
| while True: | ||
| try: | ||
| msg = await websocket.receive() | ||
| except WebSocketDisconnect: | ||
| raise | ||
|
|
||
| if "text" in msg: | ||
| raw_text: str = msg["text"] | ||
| if len(raw_text) > 65536: | ||
| logger.warning("session_id=%s: dropping oversized text frame (%d bytes)", session_id, len(raw_text)) | ||
| continue | ||
| try: | ||
| parsed = json.loads(raw_text) | ||
| if isinstance(parsed, dict) and parsed.get("type") == "resize": | ||
| # Resize: nothing to do without a PTY | ||
| continue | ||
| except json.JSONDecodeError: | ||
| pass | ||
| process.stdin.write(raw_text.encode()) | ||
| await process.stdin.drain() | ||
| elif "bytes" in msg: | ||
| raw_bytes: bytes = msg["bytes"] | ||
| if len(raw_bytes) > 65536: | ||
| logger.warning("session_id=%s: dropping oversized binary frame (%d bytes)", session_id, len(raw_bytes)) | ||
| continue | ||
| try: | ||
| parsed = json.loads(raw_bytes) | ||
| if isinstance(parsed, dict) and parsed.get("type") == "resize": | ||
| continue | ||
| except (json.JSONDecodeError, UnicodeDecodeError): | ||
| pass | ||
| process.stdin.write(raw_bytes) | ||
| await process.stdin.drain() | ||
|
|
||
| except WebSocketDisconnect: | ||
| raise | ||
| except asyncio.CancelledError: | ||
| pass | ||
| except Exception as exc: | ||
| logger.debug("Terminal stdin relay error: %s", exc) | ||
|
|
||
| stdout_to_ws_task = asyncio.create_task(_stdout_relay()) | ||
| ws_to_stdin_task = asyncio.create_task(_stdin_relay()) | ||
|
|
||
| # Wait for either task to finish (disconnect or process exit) | ||
| await asyncio.wait( | ||
| [stdout_to_ws_task, ws_to_stdin_task], | ||
| return_when=asyncio.FIRST_COMPLETED, | ||
| ) | ||
|
|
||
| except WebSocketDisconnect: | ||
| logger.debug("Terminal WebSocket disconnected: session_id=%s", session_id) | ||
| except Exception as exc: | ||
| logger.error("Terminal WebSocket error: %s", exc, exc_info=True) | ||
| finally: | ||
| # Cancel relay tasks | ||
| for task in [ws_to_stdin_task, stdout_to_ws_task]: | ||
| if task and not task.done(): | ||
| task.cancel() | ||
| try: | ||
| await task | ||
| except (asyncio.CancelledError, Exception): | ||
| pass | ||
|
|
||
| # Terminate subprocess | ||
| if process is not None: | ||
| try: | ||
| process.terminate() | ||
| await asyncio.wait_for(process.wait(), timeout=3.0) | ||
| except (ProcessLookupError, asyncio.TimeoutError): | ||
| try: | ||
| process.kill() | ||
| except ProcessLookupError: | ||
| pass | ||
|
|
||
| # Release the per-user connection slot | ||
| count = _user_terminal_counts.get(user_id, 0) | ||
| if count > 1: | ||
| _user_terminal_counts[user_id] = count - 1 | ||
| else: | ||
| _user_terminal_counts.pop(user_id, None) | ||
|
|
||
| try: | ||
| await websocket.close() | ||
| except Exception: | ||
| pass | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 12943
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 2270
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 6079
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 6872
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 5476
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 1124
Only
"active"sessions should have terminal access; currently"paused"sessions are allowed.The state check rejects only
"ended"sessions, but"paused"is also an invalid state perVALID_STATES = frozenset({"active", "paused", "ended"})ininteractive_sessions_v2.py. Change the check fromsession.get("state") == "ended"tosession.get("state") != "active"to ensure only active sessions can access the terminal. This same issue exists insession_chat_ws.py:154.🤖 Prompt for AI Agents