Skip to content

Commit 577b1a9

Browse files
authored
Merge pull request #2 from DIYA73/test-codewatch-review
feat: SSE streaming for execution logs
2 parents 9697fa2 + 95cc106 commit 577b1a9

5 files changed

Lines changed: 233 additions & 2 deletions

File tree

TEST.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# test

apps/api/src/executions/executions.controller.ts

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1-
import { Controller, Get, Param, UseGuards } from '@nestjs/common';
1+
import {
2+
Controller, Get, Param, UseGuards,
3+
Sse, Res, NotFoundException,
4+
} from '@nestjs/common';
25
import { ApiTags, ApiBearerAuth } from '@nestjs/swagger';
6+
import { OnEvent } from '@nestjs/event-emitter';
7+
import { Response } from 'express';
38
import { ExecutionsService } from './executions.service';
49
import { JwtAuthGuard } from '../common/guards/jwt-auth.guard';
510
import { CurrentWorkspaceId } from '../common/decorators/current-user.decorator';
@@ -20,4 +25,64 @@ export class ExecutionsController {
2025
findOne(@Param('id') id: string) {
2126
return this.executionsService.findOne(id);
2227
}
28+
29+
@Get(':id/stream')
30+
async stream(
31+
@Param('id') id: string,
32+
@Res() res: Response,
33+
): Promise<void> {
34+
const execution = await this.executionsService.findOne(id);
35+
if (!execution) throw new NotFoundException(`Execution ${id} not found`);
36+
37+
// SSE headers
38+
res.setHeader('Content-Type', 'text/event-stream');
39+
res.setHeader('Cache-Control', 'no-cache');
40+
res.setHeader('Connection', 'keep-alive');
41+
res.setHeader('Access-Control-Allow-Origin', '*');
42+
res.flushHeaders();
43+
44+
const send = (event: string, data: unknown) => {
45+
res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
46+
};
47+
48+
// Send existing logs immediately
49+
if (execution.logs?.length) {
50+
for (const log of execution.logs) {
51+
send('log', { ...(log as Record<string, unknown>), executionId: id });
52+
}
53+
}
54+
55+
// Send current status
56+
send('status', { executionId: id, status: execution.status });
57+
58+
// Store listeners so we can remove them on close
59+
const onLog = (payload: Record<string, unknown>) => {
60+
if (payload['executionId'] === id) send('log', payload);
61+
};
62+
63+
const onStatus = (payload: Record<string, unknown>) => {
64+
if (payload['executionId'] === id) {
65+
send('status', payload);
66+
const terminal = ['SUCCESS', 'FAILED', 'CANCELLED'];
67+
if (terminal.includes(String(payload['status']))) {
68+
send('done', { executionId: id });
69+
res.end();
70+
}
71+
}
72+
};
73+
74+
this.executionsService.eventEmitter.on('execution.log', onLog);
75+
this.executionsService.eventEmitter.on('execution.status', onStatus);
76+
77+
// Heartbeat every 15s to keep connection alive
78+
const heartbeat = setInterval(() => {
79+
res.write(': heartbeat\n\n');
80+
}, 15_000);
81+
82+
res.on('close', () => {
83+
clearInterval(heartbeat);
84+
this.executionsService.eventEmitter.off('execution.log', onLog);
85+
this.executionsService.eventEmitter.off('execution.status', onStatus);
86+
});
87+
}
2388
}

apps/api/src/executions/executions.service.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export class ExecutionsService {
3030
private readonly executionQueue: Queue,
3131
@InjectRepository(Execution)
3232
private readonly executionRepo: Repository<Execution>,
33-
private readonly eventEmitter: EventEmitter2,
33+
readonly eventEmitter: EventEmitter2,
3434
) {}
3535

3636
async enqueue(flow: Flow, input?: Record<string, unknown>): Promise<Execution> {
@@ -84,3 +84,4 @@ export class ExecutionsService {
8484
return this.executionRepo.findOne({ where: { id } });
8585
}
8686
}
87+
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
'use client';
2+
3+
import { useExecutionStream } from '@/hooks/useExecutionStream';
4+
import { useAuth } from '@/hooks/useAuth';
5+
6+
interface Props {
7+
executionId: string;
8+
}
9+
10+
const levelColor: Record<string, string> = {
11+
info: 'text-green-400',
12+
warn: 'text-yellow-400',
13+
error: 'text-red-400',
14+
};
15+
16+
const statusColor: Record<string, string> = {
17+
QUEUED: 'text-gray-400',
18+
RUNNING: 'text-blue-400',
19+
SUCCESS: 'text-green-400',
20+
FAILED: 'text-red-400',
21+
CANCELLED: 'text-yellow-400',
22+
};
23+
24+
export function ExecutionStream({ executionId }: Props) {
25+
const { token } = useAuth();
26+
const { logs, status, done, connected } = useExecutionStream({
27+
executionId,
28+
token,
29+
});
30+
31+
return (
32+
<div className="flex flex-col h-full bg-gray-950 rounded-lg border border-gray-800">
33+
{/* Header */}
34+
<div className="flex items-center justify-between px-4 py-2 border-b border-gray-800">
35+
<span className="text-xs font-mono text-gray-400">
36+
execution/{executionId.slice(0, 8)}
37+
</span>
38+
<div className="flex items-center gap-2">
39+
{connected && (
40+
<span className="flex items-center gap-1 text-xs text-blue-400">
41+
<span className="w-1.5 h-1.5 rounded-full bg-blue-400 animate-pulse" />
42+
streaming
43+
</span>
44+
)}
45+
{status && (
46+
<span className={`text-xs font-medium ${statusColor[status] ?? 'text-gray-400'}`}>
47+
{status}
48+
</span>
49+
)}
50+
</div>
51+
</div>
52+
53+
{/* Log output */}
54+
<div className="flex-1 overflow-y-auto p-4 font-mono text-xs space-y-1">
55+
{logs.length === 0 && !done && (
56+
<span className="text-gray-600">Waiting for output...</span>
57+
)}
58+
{logs.map((log, i) => (
59+
<div key={i} className="flex gap-3">
60+
<span className="text-gray-600 shrink-0">
61+
{new Date(log.timestamp).toLocaleTimeString()}
62+
</span>
63+
<span className="text-gray-500 shrink-0 w-24 truncate">
64+
{log.nodeId}
65+
</span>
66+
<span className={levelColor[log.level] ?? 'text-gray-300'}>
67+
{log.message}
68+
</span>
69+
</div>
70+
))}
71+
{done && (
72+
<div className="pt-2 border-t border-gray-800 text-gray-600">
73+
— execution complete —
74+
</div>
75+
)}
76+
</div>
77+
</div>
78+
);
79+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import { useEffect, useRef, useState } from 'react';
2+
3+
export type LogLevel = 'info' | 'warn' | 'error';
4+
5+
export interface ExecutionLog {
6+
executionId: string;
7+
nodeId: string;
8+
message: string;
9+
level: LogLevel;
10+
timestamp: string;
11+
}
12+
13+
export interface ExecutionStatus {
14+
executionId: string;
15+
status: 'QUEUED' | 'RUNNING' | 'SUCCESS' | 'FAILED' | 'CANCELLED';
16+
}
17+
18+
interface UseExecutionStreamOptions {
19+
executionId: string | null;
20+
apiBase?: string;
21+
token: string | null;
22+
}
23+
24+
interface UseExecutionStreamResult {
25+
logs: ExecutionLog[];
26+
status: ExecutionStatus['status'] | null;
27+
done: boolean;
28+
connected: boolean;
29+
}
30+
31+
export function useExecutionStream({
32+
executionId,
33+
apiBase = process.env.NEXT_PUBLIC_API_URL ?? 'http://localhost:3001',
34+
token,
35+
}: UseExecutionStreamOptions): UseExecutionStreamResult {
36+
const [logs, setLogs] = useState<ExecutionLog[]>([]);
37+
const [status, setStatus] = useState<ExecutionStatus['status'] | null>(null);
38+
const [done, setDone] = useState(false);
39+
const [connected, setConnected] = useState(false);
40+
const esRef = useRef<EventSource | null>(null);
41+
42+
useEffect(() => {
43+
if (!executionId || !token) return;
44+
45+
setLogs([]);
46+
setStatus(null);
47+
setDone(false);
48+
setConnected(false);
49+
50+
// EventSource doesn't support headers — pass token as query param
51+
const url = `${apiBase}/executions/${executionId}/stream?token=${encodeURIComponent(token)}`;
52+
const es = new EventSource(url);
53+
esRef.current = es;
54+
55+
es.addEventListener('open', () => setConnected(true));
56+
57+
es.addEventListener('log', (e: MessageEvent<string>) => {
58+
const log = JSON.parse(e.data) as ExecutionLog;
59+
setLogs((prev) => [...prev, log]);
60+
});
61+
62+
es.addEventListener('status', (e: MessageEvent<string>) => {
63+
const payload = JSON.parse(e.data) as ExecutionStatus;
64+
setStatus(payload.status);
65+
});
66+
67+
es.addEventListener('done', () => {
68+
setDone(true);
69+
setConnected(false);
70+
es.close();
71+
});
72+
73+
es.addEventListener('error', () => {
74+
setConnected(false);
75+
es.close();
76+
});
77+
78+
return () => {
79+
es.close();
80+
esRef.current = null;
81+
};
82+
}, [executionId, token, apiBase]);
83+
84+
return { logs, status, done, connected };
85+
}

0 commit comments

Comments
 (0)