Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ CodeFRAME v2 (Phases 1–6 complete) delivers the full Think-Build-Prove-Ship lo
- **PROVE**: PROOF9 quality memory system — 9-gate evidence-based verification (`cf proof run/capture/list/status/show/waive`), every glitch becomes a permanent proof obligation
- **SHIP**: GitHub PR workflow, environment validation, task self-diagnosis
- **Engine adapters**: Claude Code, Codex, OpenCode, Kilocode, and built-in ReAct — all via `--engine` flag
- **Server layer** (optional): FastAPI with 16+ v2 routers, API key auth, rate limiting, SSE streaming, OpenAPI docs
- **Web UI**: Workspace view, PRD discovery, Task board, Blocker resolution, Review/commit, PROOF9 requirements and evidence views, TUI dashboard
- **Server layer** (optional): FastAPI with 16+ v2 routers, API key auth, rate limiting, SSE streaming, WebSocket endpoints (agent chat, interactive terminal), OpenAPI docs
- **Web UI**: Workspace view, PRD discovery, Task board, Blocker resolution, Review/commit, PROOF9 requirements and evidence views, TUI dashboard, agent chat panel with streaming tool-call display, interactive terminal for session workspaces
- **Test suite**: 4200+ tests, 88% coverage

---
Expand Down
267 changes: 267 additions & 0 deletions codeframe/ui/routers/terminal_ws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
"""WebSocket router for interactive terminal in a session workspace.

Endpoint:
WS /ws/sessions/{session_id}/terminal?token=<JWT>

Client → Server message types:
Raw bytes / text: forwarded verbatim to subprocess stdin.
{"type": "resize", "cols": 120, "rows": 40}: resize the terminal window.

Server → Client:
Raw bytes from subprocess stdout/stderr.

Note: Uses asyncio pipes (not PTY) for simplicity. Arrow keys, colour output,
and interactive programs like vim require a PTY — that is a known limitation of
this initial implementation.
"""

import asyncio
import json
import logging
import os
import shutil

import jwt as pyjwt
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from sqlalchemy import select

from codeframe.auth.manager import SECRET, JWT_ALGORITHM, JWT_AUDIENCE, get_async_session_maker
from codeframe.auth.models import User

logger = logging.getLogger(__name__)

router = APIRouter(tags=["websocket"])

# Per-user concurrent terminal connection counter (in-process; resets on restart)
_MAX_TERMINALS_PER_USER = 3
_user_terminal_counts: dict[int, int] = {}


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------


async def _authenticate_websocket(websocket: WebSocket) -> int | None:
"""Validate JWT from query param. Returns user_id or closes the socket."""
token = websocket.query_params.get("token")
if not token:
await websocket.close(code=4001, reason="Authentication required: missing token")
return None

try:
payload = pyjwt.decode(token, SECRET, algorithms=[JWT_ALGORITHM], audience=JWT_AUDIENCE)
user_id_str = payload.get("sub")
if not user_id_str:
await websocket.close(code=4001, reason="Invalid token: missing subject")
return None
user_id = int(user_id_str)
except pyjwt.ExpiredSignatureError:
await websocket.close(code=4001, reason="Token expired")
return None
except (pyjwt.InvalidTokenError, ValueError) as exc:
logger.debug("Terminal WS JWT decode error: %s", exc)
await websocket.close(code=4001, reason="Invalid authentication token")
return None

try:
async_session_maker = get_async_session_maker()
async with async_session_maker() as session:
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
await websocket.close(code=4001, reason="User not found")
return None
if not user.is_active:
await websocket.close(code=4001, reason="User is inactive")
return None
except Exception as exc:
logger.error("Terminal WS user lookup error: %s", exc)
await websocket.close(code=4001, reason="Authentication failed")
return None

return user_id


# ---------------------------------------------------------------------------
# Endpoint
# ---------------------------------------------------------------------------


@router.websocket("/ws/sessions/{session_id}/terminal")
async def session_terminal_ws(session_id: str, websocket: WebSocket) -> None:
"""Bidirectional WebSocket that shells bash in the session's workspace."""
# --- Auth ---
user_id = await _authenticate_websocket(websocket)
if user_id is None:
return

# --- Session lookup ---
db = getattr(websocket.app.state, "db", None)
if db is None:
await websocket.close(code=1011, reason="Database unavailable")
return

session = await asyncio.to_thread(db.interactive_sessions.get, session_id)
if session is None or session.get("state") == "ended":
await websocket.close(code=4004, reason="Session not found or ended")
return
Comment on lines +105 to +108

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check what session states exist in the codebase
rg -n "state.*=.*['\"]" --type py -g '*session*' | head -30
# Also check the session model/schema for state enum
ast-grep --pattern 'state = $_'

Repository: frankbria/codeframe

Length of output: 12943


🏁 Script executed:

#!/bin/bash
# Find session state enum or constants
rg -n "state.*=.*\[" --type py | grep -i session | head -20

# Look for the interactive_sessions schema/model
fd -e py -path '*interactive*session*' | head -10

# Check for state validation or enum
rg -n "class.*State|State.*=|VALID.*STATE|ACTIVE.*STATE" --type py | head -20

Repository: frankbria/codeframe

Length of output: 2270


🏁 Script executed:

#!/bin/bash
# Look at the test that shows paused transition to understand intent
sed -n '240,260p' tests/unit/test_interactive_sessions_api.py

# Check what happens in the terminal websocket - get more context
cat -n codeframe/ui/routers/terminal_ws.py | head -120

Repository: frankbria/codeframe

Length of output: 6079


🏁 Script executed:

#!/bin/bash
# Find all places that set session state to understand the state machine
rg -B2 -A2 "state.*=.*['\"]" --type py codeframe/persistence/repositories/interactive_sessions.py

# Check the interactive sessions service/repository for state handling
cat -n codeframe/persistence/repositories/interactive_sessions.py | head -150

Repository: frankbria/codeframe

Length of output: 6872


🏁 Script executed:

#!/bin/bash
# Search for VALID_STATES definition
rg -n "VALID_STATES|SESSION.*STATE" --type py

# Search for where "paused" state is used
rg -B3 -A3 '"paused"' --type py

# Check if there's a session model/enum
fd -e py | xargs grep -l "class.*Session" | grep -E "model|schema|type" | head -5

Repository: frankbria/codeframe

Length of output: 5476


🏁 Script executed:

#!/bin/bash
# Check session_chat_ws.py for the same pattern
cat -n codeframe/ui/routers/session_chat_ws.py | sed -n '145,165p'

# Look for any session state machine or state transitions documentation
fd -e md -o -e txt | xargs grep -l "session.*state" 2>/dev/null | head -5

Repository: frankbria/codeframe

Length of output: 1124


Only "active" sessions should have terminal access; currently "paused" sessions are allowed.

The state check rejects only "ended" sessions, but "paused" is also an invalid state per VALID_STATES = frozenset({"active", "paused", "ended"}) in interactive_sessions_v2.py. Change the check from session.get("state") == "ended" to session.get("state") != "active" to ensure only active sessions can access the terminal. This same issue exists in session_chat_ws.py:154.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/ui/routers/terminal_ws.py` around lines 100 - 103, The current
check in the terminal websocket handler that only forbids "ended" sessions is
too permissive — change the condition in the block that retrieves the session
(session = await asyncio.to_thread(db.interactive_sessions.get, session_id))
from checking session.get("state") == "ended" to session.get("state") !=
"active" so only "active" sessions are allowed (and likewise update the
identical check in session_chat_ws at the referenced location); keep the
existing websocket.close(...) call but ensure it triggers when state is not
"active".


# --- Ownership check ---
session_user_id = session.get("user_id")
if session_user_id is not None and int(session_user_id) != user_id:
await websocket.close(code=4003, reason="Forbidden: session belongs to another user")
return
Comment on lines +110 to +114

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify if interactive_sessions table schema includes user_id column
rg -n "user_id" codeframe/persistence/repositories/interactive_sessions.py

# Check the schema definition
rg -n "CREATE TABLE.*interactive_sessions" -A 20 codeframe/persistence/

# Check if session_chat_ws has the same pattern
rg -n "session.get.*user_id" codeframe/ui/routers/

Repository: frankbria/codeframe

Length of output: 1882


Critical: Ownership check is silently bypassed due to missing user_id column in schema.

The interactive_sessions table schema (defined in codeframe/persistence/schema_manager.py:842-856) does not include a user_id column. When session.get("user_id") is called at line 106, it will always return None, causing the condition at line 107 to be False and the entire ownership validation to be unreachable. This allows any authenticated user to access any session's terminal.

Implement fail-closed behavior:

Suggested fix
     # --- Ownership check ---
     session_user_id = session.get("user_id")
-    if session_user_id is not None and int(session_user_id) != user_id:
+    if session_user_id is None:
+        logger.warning("Session %s missing user_id; denying terminal access", session_id)
+        await websocket.close(code=4003, reason="Session ownership cannot be verified")
+        return
+    if int(session_user_id) != user_id:
         await websocket.close(code=4003, reason="Forbidden: session belongs to another user")
         return
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/ui/routers/terminal_ws.py` around lines 105 - 109, The ownership
check currently assumes session.get("user_id") exists but the
interactive_sessions schema lacks that column, so update the check to
fail-closed: if session.get("user_id") is None or int(session.get("user_id")) !=
user_id then call websocket.close(code=4003, reason="Forbidden: session belongs
to another user") and return; additionally, if you can, replace reliance on
session data by validating against the persistent record (e.g., query the
interactive_sessions record for the session id and compare its owner column to
user_id) so the check in terminal_ws.py (the session_user_id /
session.get("user_id") branch around websocket.close) always enforces ownership
even when schema/session data is missing.


workspace_path = session.get("workspace_path")
if not workspace_path:
logger.error("session_id=%s has no workspace_path; refusing terminal spawn", session_id)
await websocket.close(code=4008, reason="Session has no workspace configured")
return

# --- Per-user connection cap ---
current = _user_terminal_counts.get(user_id, 0)
if current >= _MAX_TERMINALS_PER_USER:
await websocket.close(code=4029, reason="Too many open terminals; close an existing session first")
return
_user_terminal_counts[user_id] = current + 1

await websocket.accept()

# --- Spawn bash with a minimal, explicit environment ---
# Do NOT use os.environ.copy() — it would expose server secrets (API keys, DB creds)
# to the subprocess. Only pass variables required for a functional terminal.
env = {
"TERM": "xterm-256color",
"HOME": os.environ.get("HOME", "/tmp"),
"PATH": os.environ.get("PATH", "/usr/local/bin:/usr/bin:/bin"),
"SHELL": "/bin/bash",
"LANG": os.environ.get("LANG", "en_US.UTF-8"),
"USER": os.environ.get("USER", ""),
}

shell_exe = shutil.which("bash") or shutil.which("sh") or "sh"

process: asyncio.subprocess.Process | None = None
ws_to_stdin_task: asyncio.Task | None = None
stdout_to_ws_task: asyncio.Task | None = None

try:
process = await asyncio.create_subprocess_exec(
shell_exe,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=workspace_path,
env=env,
)

# --- Relay: stdout → WebSocket ---
async def _stdout_relay() -> None:
assert process is not None
assert process.stdout is not None
try:
while True:
chunk = await process.stdout.read(4096)
if not chunk:
break
try:
await websocket.send_bytes(chunk)
except Exception:
break
except asyncio.CancelledError:
pass
except Exception as exc:
logger.debug("Terminal stdout relay error: %s", exc)

# --- Relay: WebSocket → stdin (handles both text and binary frames) ---
async def _stdin_relay() -> None:
assert process is not None
assert process.stdin is not None
try:
while True:
try:
msg = await websocket.receive()
except WebSocketDisconnect:
raise

if "text" in msg:
raw_text: str = msg["text"]
if len(raw_text) > 65536:
logger.warning("session_id=%s: dropping oversized text frame (%d bytes)", session_id, len(raw_text))
continue
try:
parsed = json.loads(raw_text)
if isinstance(parsed, dict) and parsed.get("type") == "resize":
# Resize: nothing to do without a PTY
continue
except json.JSONDecodeError:
pass
process.stdin.write(raw_text.encode())
await process.stdin.drain()
elif "bytes" in msg:
raw_bytes: bytes = msg["bytes"]
if len(raw_bytes) > 65536:
logger.warning("session_id=%s: dropping oversized binary frame (%d bytes)", session_id, len(raw_bytes))
continue
try:
parsed = json.loads(raw_bytes)
if isinstance(parsed, dict) and parsed.get("type") == "resize":
continue
except (json.JSONDecodeError, UnicodeDecodeError):
pass
process.stdin.write(raw_bytes)
await process.stdin.drain()

except WebSocketDisconnect:
raise
except asyncio.CancelledError:
pass
except Exception as exc:
logger.debug("Terminal stdin relay error: %s", exc)

stdout_to_ws_task = asyncio.create_task(_stdout_relay())
ws_to_stdin_task = asyncio.create_task(_stdin_relay())

# Wait for either task to finish (disconnect or process exit)
await asyncio.wait(
[stdout_to_ws_task, ws_to_stdin_task],
return_when=asyncio.FIRST_COMPLETED,
)

except WebSocketDisconnect:
logger.debug("Terminal WebSocket disconnected: session_id=%s", session_id)
except Exception as exc:
logger.error("Terminal WebSocket error: %s", exc, exc_info=True)
finally:
# Cancel relay tasks
for task in [ws_to_stdin_task, stdout_to_ws_task]:
if task and not task.done():
task.cancel()
try:
await task
except (asyncio.CancelledError, Exception):
pass

# Terminate subprocess
if process is not None:
try:
process.terminate()
await asyncio.wait_for(process.wait(), timeout=3.0)
except (ProcessLookupError, asyncio.TimeoutError):
try:
process.kill()
except ProcessLookupError:
pass

# Release the per-user connection slot
count = _user_terminal_counts.get(user_id, 0)
if count > 1:
_user_terminal_counts[user_id] = count - 1
else:
_user_terminal_counts.pop(user_id, None)

try:
await websocket.close()
except Exception:
pass
2 changes: 2 additions & 0 deletions codeframe/ui/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
review_v2,
schedule_v2,
session_chat_ws,
terminal_ws,
streaming_v2,
tasks_v2,
templates_v2,
Expand Down Expand Up @@ -489,6 +490,7 @@ async def test_broadcast(message: dict, project_id: int = None):
app.include_router(git_v2.router) # /api/v2/git
app.include_router(interactive_sessions_v2.router) # /api/v2/sessions
app.include_router(session_chat_ws.router) # /ws/sessions/{id}/chat
app.include_router(terminal_ws.router) # /ws/sessions/{id}/terminal
app.include_router(pr_v2.router) # /api/v2/pr
app.include_router(prd_v2.router) # /api/v2/prd
app.include_router(proof_v2.router) # /api/v2/proof
Expand Down
Loading
Loading