Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 46 additions & 122 deletions codeframe/ui/routers/streaming_v2.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,21 @@
"""SSE streaming router for real-time task execution events.
"""SSE streaming utilities for real-time task execution events.

This module provides Server-Sent Events (SSE) endpoints for streaming
task execution progress to web clients.
This module provides shared SSE utilities (formatting, event generation,
publisher management) used by streaming consumers.

Endpoints:
- GET /api/v2/tasks/{task_id}/stream - SSE stream of execution events

This router follows the thin adapter pattern:
1. Parse HTTP request parameters
2. Subscribe to EventPublisher from core.streaming
3. Format events as SSE and stream to client
4. Handle disconnection gracefully
The actual SSE endpoint for tasks is in tasks_v2.py:
GET /api/v2/tasks/{task_id}/stream (requires workspace_path only)
"""

import asyncio
import logging
from typing import AsyncGenerator, Optional

from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import StreamingResponse
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse # noqa: F401 — re-exported

from codeframe.auth import User
from codeframe.auth.dependencies import get_current_user
from codeframe.core import tasks
from codeframe.core.models import ExecutionEvent
from codeframe.core.streaming import EventPublisher
from codeframe.core.workspace import Workspace
from codeframe.ui.dependencies import get_v2_workspace

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -96,37 +85,53 @@ async def event_stream_generator(
task_id: str,
publisher: EventPublisher,
request: Request,
heartbeat_interval: float = 30.0,
heartbeat_interval: float = 15.0,
) -> AsyncGenerator[str, None]:
"""Generate SSE events for a task.
"""Generate SSE events for a task with heartbeat keep-alive.

This async generator yields SSE-formatted strings as events
are published for the given task.
Subscribes to the EventPublisher and yields SSE-formatted strings.
Emits SSE comments as heartbeats during idle periods to prevent
proxy/browser timeouts.

Args:
task_id: Task ID to stream events for
publisher: EventPublisher to subscribe to
request: FastAPI request (for disconnect detection)
heartbeat_interval: Seconds between heartbeat events
heartbeat_interval: Seconds between heartbeat comments

Yields:
SSE-formatted event strings
SSE-formatted event strings or comment heartbeats
"""
logger.info(f"Starting SSE stream for task {task_id}")

queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
loop = asyncio.get_running_loop()

from codeframe.core.streaming import _Subscription
subscription = _Subscription(task_id, queue, loop)

async with publisher._lock:
publisher._subscribers[task_id].append(subscription)
Comment on lines +107 to +114

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, let's look at the full streaming_v2.py file to see the context around lines 107-114 and 143-147
fd streaming_v2.py --type f

Repository: frankbria/codeframe

Length of output: 99


🏁 Script executed:

# Find the core streaming module
fd streaming.py --type f | grep core

Repository: frankbria/codeframe

Length of output: 154


🏁 Script executed:

# Read streaming_v2.py to see context around lines 107-114 and 143-147
wc -l codeframe/ui/routers/streaming_v2.py

Repository: frankbria/codeframe

Length of output: 103


🏁 Script executed:

# Read core/streaming.py to check EventPublisher implementation
wc -l codeframe/core/streaming.py

Repository: frankbria/codeframe

Length of output: 94


🏁 Script executed:

# Read the full streaming_v2.py file
cat -n codeframe/ui/routers/streaming_v2.py

Repository: frankbria/codeframe

Length of output: 5950


🏁 Script executed:

# Read core/streaming.py to find EventPublisher class definition
cat -n codeframe/core/streaming.py | head -200

Repository: frankbria/codeframe

Length of output: 7011


🏁 Script executed:

# Find the EventPublisher class definition in core/streaming.py
ast-grep --pattern $'class EventPublisher {
  $$$
}'

Repository: frankbria/codeframe

Length of output: 45


🏁 Script executed:

# Also search for public subscribe methods
rg -n 'def subscribe|def unsubscribe|async def subscribe|async def unsubscribe' codeframe/core/streaming.py -C3

Repository: frankbria/codeframe

Length of output: 1059


🏁 Script executed:

# Read the full EventPublisher class definition and methods
sed -n '290,350p' codeframe/core/streaming.py

Repository: frankbria/codeframe

Length of output: 2551


🏁 Script executed:

# Read the unsubscribe method
sed -n '472,490p' codeframe/core/streaming.py

Repository: frankbria/codeframe

Length of output: 814


🏁 Script executed:

# Also read the _Subscription class to understand its structure
sed -n '200,290p' codeframe/core/streaming.py

Repository: frankbria/codeframe

Length of output: 2928


🏁 Script executed:

# Check how subscribe() is being used elsewhere in the codebase
rg -n 'publisher\.subscribe\|\.subscribe\(' --type=py -C2 | head -100

Repository: frankbria/codeframe

Length of output: 45


🏁 Script executed:

# Check if there are any other places accessing EventPublisher internals
rg -n 'publisher\._lock|publisher\._subscribers|_Subscription' --type=py -C2

Repository: frankbria/codeframe

Length of output: 3523


🏁 Script executed:

# Let's see the full subscribe() method implementation to understand what it returns
sed -n '319,365p' codeframe/core/streaming.py

Repository: frankbria/codeframe

Length of output: 2019


🏁 Script executed:

# Check if there are other SSE/streaming handlers that might use subscribe() properly
rg -n 'async def.*stream|AsyncGenerator' codeframe/ui/routers/ --type=py

Repository: frankbria/codeframe

Length of output: 432


🏁 Script executed:

# Check how stream_task_events in tasks_v2.py implements streaming
sed -n '856,920p' codeframe/ui/routers/tasks_v2.py

Repository: frankbria/codeframe

Length of output: 2063


Use the public EventPublisher.subscribe() method instead of directly accessing private internals.

The router directly imports _Subscription and accesses publisher._lock and publisher._subscribers (lines 110-114, 143-147), breaking encapsulation and violating the thin adapter pattern. EventPublisher already exposes a public subscribe(task_id) method that handles subscription management, queue creation, and cleanup. The HTTP-specific concerns (heartbeats via heartbeat_interval, request disconnection detection) should be layered on top of the public API rather than reimplementing subscription logic by reaching into private state.

🤖 Prompt for AI Agents
In `@codeframe/ui/routers/streaming_v2.py` around lines 107 - 114, The code is
directly importing and using the private _Subscription class and touching
publisher._lock and publisher._subscribers; replace that with the public
EventPublisher.subscribe(task_id) call and let the publisher manage
queue/lock/subscriber lifecycle. Call publisher.subscribe(task_id) to obtain the
subscription object (or subscription handle) and then implement HTTP-specific
behavior (heartbeat using heartbeat_interval, request disconnection monitoring
and cleanup) on top of that returned handle rather than mutating
publisher._subscribers or publisher._lock or importing _Subscription. Ensure you
remove direct references to _Subscription, publisher._lock, and
publisher._subscribers and use the public subscribe/unsubscribe APIs for
cleanup.


try:
async for event in publisher.subscribe(task_id):
# Check if client disconnected
while True:
if await request.is_disconnected():
logger.info(f"Client disconnected from task {task_id} stream")
break

yield format_sse_event(event)
try:
item = await asyncio.wait_for(queue.get(), timeout=heartbeat_interval)

# If this is a completion event, we're done
if event.event_type == "completion":
logger.info(f"Task {task_id} completed, closing stream")
break
if item is _Subscription.END_OF_STREAM:
break

yield format_sse_event(item)

if item.event_type == "completion":
logger.info(f"Task {task_id} completed, closing stream")
break
except asyncio.TimeoutError:
yield format_sse_comment("heartbeat")

except asyncio.CancelledError:
logger.info(f"SSE stream cancelled for task {task_id}")
Expand All @@ -135,97 +140,16 @@ async def event_stream_generator(
logger.error(f"Error in SSE stream for task {task_id}: {e}")
raise
finally:
async with publisher._lock:
if subscription in publisher._subscribers[task_id]:
publisher._subscribers[task_id].remove(subscription)
if not publisher._subscribers[task_id]:
del publisher._subscribers[task_id]
Comment on lines +143 to +147

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Cleanup duplicates the private-API coupling from subscription setup.

The finally block again accesses publisher._lock and publisher._subscribers directly. If a public unsubscribe(subscription) method existed on EventPublisher, both the setup (lines 113-114) and teardown would be cleaner and less fragile.

Also note: if publisher._subscribers is a defaultdict, the del on line 147 removes the key, but a subsequent append (from another connection) would silently recreate it. If it's a plain dict, accessing a non-existent task_id in line 144 could KeyError if another coroutine already deleted the key between the _lock acquisition and the in check — though the async lock should prevent this within a single event loop.

🤖 Prompt for AI Agents
In `@codeframe/ui/routers/streaming_v2.py` around lines 143 - 147, Add a public
unsubscribe method on EventPublisher (e.g.,
EventPublisher.unsubscribe(subscription)) and use it from the finally block
instead of touching publisher._lock and publisher._subscribers directly;
implement unsubscribe to take care of acquiring the internal lock, safely
locating the subscriber list for subscription.task_id (use get to avoid
KeyError), remove the subscription if present, and delete the task_id key only
if the list is empty (avoiding recreating behavior for defaultdicts), so both
the setup (where subscriptions are added) and teardown use the same safe, public
API.

logger.info(f"Closing SSE stream for task {task_id}")


@router.get(
"/{task_id}/stream",
response_class=StreamingResponse,
summary="Stream task execution events",
description="""
Stream real-time execution events for a task using Server-Sent Events (SSE).

**Authentication required**: Pass JWT token via Authorization header or cookie.

The stream includes:
- **progress**: Phase transitions and step updates
- **output**: stdout/stderr from commands
- **blocker**: Human-in-the-loop questions
- **completion**: Task finished (stream closes)
- **error**: Errors during execution
- **heartbeat**: Keep-alive (configurable, default 30s)

The stream closes when:
- Task completes (success or failure)
- Client disconnects
- Server error occurs

Example client (JavaScript):
```javascript
const eventSource = new EventSource('/api/v2/tasks/123/stream', {
headers: { 'Authorization': 'Bearer <token>' }
});
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
console.log(event.event_type, event.data);
};
```

Configuration (via environment variables):
- SSE_TIMEOUT_SECONDS: Timeout for event wait (default: 30)
- SSE_MAX_QUEUE_SIZE: Max queued events (default: 1000)
- SSE_OUTPUT_MAX_CHARS: Max output chars per event (default: 2000)
""",
responses={
200: {
"description": "SSE event stream",
"content": {
"text/event-stream": {
"example": 'data: {"event_type":"progress","task_id":"123",...}\n\n'
}
},
},
401: {"description": "Authentication required"},
404: {"description": "Task not found"},
},
)
async def stream_task_events(
task_id: str,
request: Request,
workspace: Workspace = Depends(get_v2_workspace),
current_user: User = Depends(get_current_user),
) -> StreamingResponse:
"""Stream execution events for a task via SSE.

Args:
task_id: ID of the task to stream
request: FastAPI request object
workspace: User's workspace (injected by dependency)
current_user: Authenticated user (injected by dependency)

Returns:
StreamingResponse with SSE content type

Raises:
HTTPException: 404 if task not found in workspace
"""
# Verify task exists in user's workspace
task = tasks.get(workspace, task_id)
if not task:
raise HTTPException(status_code=404, detail=f"Task {task_id} not found")

publisher = get_event_publisher()

# Log subscription without PII (use user ID instead of email)
logger.info("User %s subscribed to task %s stream in workspace %s",
current_user.id, task_id, workspace.id)

return StreamingResponse(
event_stream_generator(task_id, publisher, request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)
# NOTE: The SSE stream endpoint for tasks is defined in tasks_v2.py
# (GET /api/v2/tasks/{task_id}/stream) which only requires workspace_path
# and is compatible with browser EventSource (no custom auth headers needed).
# This module retains the shared utilities (format_sse_event, format_sse_comment,
# event_stream_generator, get_event_publisher) used by other streaming consumers.
Loading
Loading