-
Notifications
You must be signed in to change notification settings - Fork 5
feat: Execution Monitor View with real-time SSE streaming (#332) #339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
87a3fc4
9f2e3ae
542ce5c
599fd10
6f7d756
fa40b59
41aca0b
762d111
97147c9
d61f7d9
f53adbd
acd08c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,32 +1,21 @@ | ||
| """SSE streaming router for real-time task execution events. | ||
| """SSE streaming utilities for real-time task execution events. | ||
|
|
||
| This module provides Server-Sent Events (SSE) endpoints for streaming | ||
| task execution progress to web clients. | ||
| This module provides shared SSE utilities (formatting, event generation, | ||
| publisher management) used by streaming consumers. | ||
|
|
||
| Endpoints: | ||
| - GET /api/v2/tasks/{task_id}/stream - SSE stream of execution events | ||
|
|
||
| This router follows the thin adapter pattern: | ||
| 1. Parse HTTP request parameters | ||
| 2. Subscribe to EventPublisher from core.streaming | ||
| 3. Format events as SSE and stream to client | ||
| 4. Handle disconnection gracefully | ||
| The actual SSE endpoint for tasks is in tasks_v2.py: | ||
| GET /api/v2/tasks/{task_id}/stream (requires workspace_path only) | ||
| """ | ||
|
|
||
| import asyncio | ||
| import logging | ||
| from typing import AsyncGenerator, Optional | ||
|
|
||
| from fastapi import APIRouter, Depends, HTTPException, Request | ||
| from fastapi.responses import StreamingResponse | ||
| from fastapi import APIRouter, Request | ||
| from fastapi.responses import StreamingResponse # noqa: F401 — re-exported | ||
|
|
||
| from codeframe.auth import User | ||
| from codeframe.auth.dependencies import get_current_user | ||
| from codeframe.core import tasks | ||
| from codeframe.core.models import ExecutionEvent | ||
| from codeframe.core.streaming import EventPublisher | ||
| from codeframe.core.workspace import Workspace | ||
| from codeframe.ui.dependencies import get_v2_workspace | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
@@ -96,37 +85,53 @@ async def event_stream_generator( | |
| task_id: str, | ||
| publisher: EventPublisher, | ||
| request: Request, | ||
| heartbeat_interval: float = 30.0, | ||
| heartbeat_interval: float = 15.0, | ||
| ) -> AsyncGenerator[str, None]: | ||
| """Generate SSE events for a task. | ||
| """Generate SSE events for a task with heartbeat keep-alive. | ||
|
|
||
| This async generator yields SSE-formatted strings as events | ||
| are published for the given task. | ||
| Subscribes to the EventPublisher and yields SSE-formatted strings. | ||
| Emits SSE comments as heartbeats during idle periods to prevent | ||
| proxy/browser timeouts. | ||
|
|
||
| Args: | ||
| task_id: Task ID to stream events for | ||
| publisher: EventPublisher to subscribe to | ||
| request: FastAPI request (for disconnect detection) | ||
| heartbeat_interval: Seconds between heartbeat events | ||
| heartbeat_interval: Seconds between heartbeat comments | ||
|
|
||
| Yields: | ||
| SSE-formatted event strings | ||
| SSE-formatted event strings or comment heartbeats | ||
| """ | ||
| logger.info(f"Starting SSE stream for task {task_id}") | ||
|
|
||
| queue: asyncio.Queue = asyncio.Queue(maxsize=1000) | ||
| loop = asyncio.get_running_loop() | ||
|
|
||
| from codeframe.core.streaming import _Subscription | ||
| subscription = _Subscription(task_id, queue, loop) | ||
|
|
||
| async with publisher._lock: | ||
| publisher._subscribers[task_id].append(subscription) | ||
|
|
||
| try: | ||
| async for event in publisher.subscribe(task_id): | ||
| # Check if client disconnected | ||
| while True: | ||
| if await request.is_disconnected(): | ||
| logger.info(f"Client disconnected from task {task_id} stream") | ||
| break | ||
|
|
||
| yield format_sse_event(event) | ||
| try: | ||
| item = await asyncio.wait_for(queue.get(), timeout=heartbeat_interval) | ||
|
|
||
| # If this is a completion event, we're done | ||
| if event.event_type == "completion": | ||
| logger.info(f"Task {task_id} completed, closing stream") | ||
| break | ||
| if item is _Subscription.END_OF_STREAM: | ||
| break | ||
|
|
||
| yield format_sse_event(item) | ||
|
|
||
| if item.event_type == "completion": | ||
| logger.info(f"Task {task_id} completed, closing stream") | ||
| break | ||
| except asyncio.TimeoutError: | ||
| yield format_sse_comment("heartbeat") | ||
|
|
||
| except asyncio.CancelledError: | ||
| logger.info(f"SSE stream cancelled for task {task_id}") | ||
|
|
@@ -135,97 +140,16 @@ async def event_stream_generator( | |
| logger.error(f"Error in SSE stream for task {task_id}: {e}") | ||
| raise | ||
| finally: | ||
| async with publisher._lock: | ||
| if subscription in publisher._subscribers[task_id]: | ||
| publisher._subscribers[task_id].remove(subscription) | ||
| if not publisher._subscribers[task_id]: | ||
| del publisher._subscribers[task_id] | ||
|
Comment on lines
+143
to
+147
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major Cleanup duplicates the private-API coupling from subscription setup. The Also note: if 🤖 Prompt for AI Agents |
||
| logger.info(f"Closing SSE stream for task {task_id}") | ||
|
|
||
|
|
||
| @router.get( | ||
| "/{task_id}/stream", | ||
| response_class=StreamingResponse, | ||
| summary="Stream task execution events", | ||
| description=""" | ||
| Stream real-time execution events for a task using Server-Sent Events (SSE). | ||
|
|
||
| **Authentication required**: Pass JWT token via Authorization header or cookie. | ||
|
|
||
| The stream includes: | ||
| - **progress**: Phase transitions and step updates | ||
| - **output**: stdout/stderr from commands | ||
| - **blocker**: Human-in-the-loop questions | ||
| - **completion**: Task finished (stream closes) | ||
| - **error**: Errors during execution | ||
| - **heartbeat**: Keep-alive (configurable, default 30s) | ||
|
|
||
| The stream closes when: | ||
| - Task completes (success or failure) | ||
| - Client disconnects | ||
| - Server error occurs | ||
|
|
||
| Example client (JavaScript): | ||
| ```javascript | ||
| const eventSource = new EventSource('/api/v2/tasks/123/stream', { | ||
| headers: { 'Authorization': 'Bearer <token>' } | ||
| }); | ||
| eventSource.onmessage = (e) => { | ||
| const event = JSON.parse(e.data); | ||
| console.log(event.event_type, event.data); | ||
| }; | ||
| ``` | ||
|
|
||
| Configuration (via environment variables): | ||
| - SSE_TIMEOUT_SECONDS: Timeout for event wait (default: 30) | ||
| - SSE_MAX_QUEUE_SIZE: Max queued events (default: 1000) | ||
| - SSE_OUTPUT_MAX_CHARS: Max output chars per event (default: 2000) | ||
| """, | ||
| responses={ | ||
| 200: { | ||
| "description": "SSE event stream", | ||
| "content": { | ||
| "text/event-stream": { | ||
| "example": 'data: {"event_type":"progress","task_id":"123",...}\n\n' | ||
| } | ||
| }, | ||
| }, | ||
| 401: {"description": "Authentication required"}, | ||
| 404: {"description": "Task not found"}, | ||
| }, | ||
| ) | ||
| async def stream_task_events( | ||
| task_id: str, | ||
| request: Request, | ||
| workspace: Workspace = Depends(get_v2_workspace), | ||
| current_user: User = Depends(get_current_user), | ||
| ) -> StreamingResponse: | ||
| """Stream execution events for a task via SSE. | ||
|
|
||
| Args: | ||
| task_id: ID of the task to stream | ||
| request: FastAPI request object | ||
| workspace: User's workspace (injected by dependency) | ||
| current_user: Authenticated user (injected by dependency) | ||
|
|
||
| Returns: | ||
| StreamingResponse with SSE content type | ||
|
|
||
| Raises: | ||
| HTTPException: 404 if task not found in workspace | ||
| """ | ||
| # Verify task exists in user's workspace | ||
| task = tasks.get(workspace, task_id) | ||
| if not task: | ||
| raise HTTPException(status_code=404, detail=f"Task {task_id} not found") | ||
|
|
||
| publisher = get_event_publisher() | ||
|
|
||
| # Log subscription without PII (use user ID instead of email) | ||
| logger.info("User %s subscribed to task %s stream in workspace %s", | ||
| current_user.id, task_id, workspace.id) | ||
|
|
||
| return StreamingResponse( | ||
| event_stream_generator(task_id, publisher, request), | ||
| media_type="text/event-stream", | ||
| headers={ | ||
| "Cache-Control": "no-cache", | ||
| "Connection": "keep-alive", | ||
| "X-Accel-Buffering": "no", # Disable nginx buffering | ||
| }, | ||
| ) | ||
| # NOTE: The SSE stream endpoint for tasks is defined in tasks_v2.py | ||
| # (GET /api/v2/tasks/{task_id}/stream) which only requires workspace_path | ||
| # and is compatible with browser EventSource (no custom auth headers needed). | ||
| # This module retains the shared utilities (format_sse_event, format_sse_comment, | ||
| # event_stream_generator, get_event_publisher) used by other streaming consumers. | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the full streaming_v2.py file to see the context around lines 107-114 and 143-147 fd streaming_v2.py --type fRepository: frankbria/codeframe
Length of output: 99
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 154
🏁 Script executed:
# Read streaming_v2.py to see context around lines 107-114 and 143-147 wc -l codeframe/ui/routers/streaming_v2.pyRepository: frankbria/codeframe
Length of output: 103
🏁 Script executed:
# Read core/streaming.py to check EventPublisher implementation wc -l codeframe/core/streaming.pyRepository: frankbria/codeframe
Length of output: 94
🏁 Script executed:
# Read the full streaming_v2.py file cat -n codeframe/ui/routers/streaming_v2.pyRepository: frankbria/codeframe
Length of output: 5950
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 7011
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 45
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 1059
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 2551
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 814
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 2928
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 45
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 3523
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 2019
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 432
🏁 Script executed:
Repository: frankbria/codeframe
Length of output: 2063
Use the public
EventPublisher.subscribe()method instead of directly accessing private internals.The router directly imports
_Subscriptionand accessespublisher._lockandpublisher._subscribers(lines 110-114, 143-147), breaking encapsulation and violating the thin adapter pattern.EventPublisheralready exposes a publicsubscribe(task_id)method that handles subscription management, queue creation, and cleanup. The HTTP-specific concerns (heartbeats viaheartbeat_interval, request disconnection detection) should be layered on top of the public API rather than reimplementing subscription logic by reaching into private state.🤖 Prompt for AI Agents