Skip to content

Commit 8fb9ab6

Browse files
CoderCococlaude
andauthored
feat(app): stream live server logs to the management UI via SSE (#45)
Closes #43 Adds a real-time log tail to the Logs panel: - LogsService.streamLogs(game, signal) – AsyncGenerator that polls FilterLogEvents every 2 s, de-duplicates by eventId, advances startTime on each tick, and exits cleanly when the AbortSignal fires. Queries the whole log group so task stop+start (new stream) is handled automatically. - GET /api/logs/:game/stream – @sse() endpoint wrapping the generator in an RxJS Observable. Teardown aborts the generator when the client disconnects. - ApiTokenGuard – falls back to ?token= query param when the Authorization header is absent, which is necessary because the browser's native EventSource cannot set custom headers. - LogsPanel.tsx – fetches a snapshot on game change then opens an EventSource stream that appends new lines (capped at 1 000). Pause buffers incoming lines; Resume flushes the buffer. Replaces the old manual Refresh button. - LogsService.test.ts – seven new streamLogs cases covering clean termination, multi-poll delivery, de-duplication, log-group naming, error recovery, and missing message fields. - docs/components/management-app.md – updated endpoint table, Live Logs description, auth notes, and LogsService blurb. IAM: logs:* already covers FilterLogEvents (docs/setup.md unchanged). https://claude.ai/code/session_01THkoGDYjJhojhtnUEYi4w9 --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 7cfc87a commit 8fb9ab6

7 files changed

Lines changed: 367 additions & 24 deletions

File tree

CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ Any time you add or remove Terraform variables, update **all four** of these in
154154
- **Always use `/pr` to create pull requests.** The `.claude/commands/pr.md` skill validates the title format before calling the API. Never call `mcp__github__create_pull_request` directly without running this check first.
155155
- **PR titles MUST use Conventional Commits.** We squash-merge, so the PR title becomes the commit subject on `main` verbatim — a badly-formed title produces a badly-formed commit that can't be fixed after merge. Format: `<type>(<optional-scope>): <imperative summary>`, where `<type>` is one of `feat`, `fix`, `refactor`, `docs`, `test`, `chore`, `perf`, `build`, `ci`, `style`. Keep the subject under ~70 characters; put details in the PR body. Examples: `refactor(app): migrate server from Express+tsyringe to Nest.js`, `docs: reflect Nest.js migration in CLAUDE.md`, `fix(watchdog): stop leaking tags on failed runs`, `chore: add ESLint flat config`.
156156
- **Pre-flight check (mandatory):** before any `create_pull_request` call, verify the title matches `^(feat|fix|refactor|docs|test|chore|perf|build|ci|style)(\([^)]+\))?: .+$`. If it doesn't, fix it first. `Add ESLint configuration` fails (no type prefix); `chore: add ESLint configuration` passes.
157+
- **Always include `Closes #N`** in the PR body when the PR resolves a GitHub issue. Place it as the first line so GitHub auto-closes the issue on merge.
157158

158159
## PR Review Workflow
159160

app/packages/server/src/controllers/logs.controller.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { Controller, Get, Param, Query } from '@nestjs/common';
1+
import { Controller, Get, MessageEvent, Param, Query, Sse } from '@nestjs/common';
2+
import { Observable } from 'rxjs';
23
import { LogsService } from '../services/LogsService.js';
34

45
/** Tails CloudWatch logs from the `/ecs/{game}-server` log group. */
@@ -13,4 +14,35 @@ export class LogsController {
1314
const lines = await this.logs.getRecentLogs(game, limit);
1415
return { game, lines };
1516
}
17+
18+
/**
19+
* SSE stream of new log lines for a game, delivered as they arrive from
20+
* `FilterLogEvents`. The client receives `{ data: { line: "..." } }` events.
21+
* Auth: `Authorization: Bearer` header OR `?token=` query param (the latter
22+
* is required because the browser's native `EventSource` cannot set headers).
23+
*/
24+
@Sse(':game/stream')
25+
streamLogs(@Param('game') game: string): Observable<MessageEvent> {
26+
const ac = new AbortController();
27+
28+
return new Observable<MessageEvent>((subscriber) => {
29+
const run = async () => {
30+
try {
31+
for await (const line of this.logs.streamLogs(game, ac.signal)) {
32+
subscriber.next({ data: { line } } as MessageEvent);
33+
}
34+
subscriber.complete();
35+
} catch (err) {
36+
if ((err as Error).name === 'AbortError') {
37+
subscriber.complete();
38+
} else {
39+
subscriber.error(err);
40+
}
41+
}
42+
};
43+
void run();
44+
45+
return () => ac.abort();
46+
});
47+
}
1648
}

app/packages/server/src/guards/api-token.guard.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,22 @@ export class ApiTokenGuard implements CanActivate {
5555
return true;
5656
}
5757

58+
// Prefer the Authorization header; fall back to ?token= query param for
59+
// SSE endpoints where the browser's native EventSource cannot set headers.
5860
const header = req.headers['authorization'];
59-
if (typeof header !== 'string' || !header.startsWith('Bearer ')) {
60-
throw new UnauthorizedException({ error: 'missing bearer token' });
61+
let presented: string;
62+
if (typeof header === 'string' && header.startsWith('Bearer ')) {
63+
presented = header.slice('Bearer '.length).trim();
64+
} else {
65+
const queryToken = (req.query as Record<string, unknown> | undefined)?.['token'];
66+
if (typeof queryToken !== 'string') {
67+
throw new UnauthorizedException({ error: 'missing bearer token' });
68+
}
69+
// Remove the token from req.query so RequestLoggerMiddleware doesn't
70+
// log it — the finish handler fires after this guard runs.
71+
delete (req.query as Record<string, unknown>)['token'];
72+
presented = queryToken;
6173
}
62-
const presented = header.slice('Bearer '.length).trim();
6374
if (presented !== configured) {
6475
logger.warn('Rejected /api request with bad bearer token', {
6576
path: req.path,

app/packages/server/src/services/LogsService.test.ts

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { mockClient } from 'aws-sdk-client-mock';
44
import {
55
CloudWatchLogsClient,
66
DescribeLogStreamsCommand,
7+
FilterLogEventsCommand,
78
GetLogEventsCommand,
89
} from '@aws-sdk/client-cloudwatch-logs';
910

@@ -104,3 +105,137 @@ describe('LogsService', () => {
104105
expect(lines[0]).toContain('denied');
105106
});
106107
});
108+
109+
describe('LogsService.streamLogs', () => {
110+
/** Service under test, freshly constructed per test. */
111+
let service: LogsService;
112+
113+
beforeEach(() => {
114+
cwMock.reset();
115+
service = new LogsService(makeConfig());
116+
});
117+
118+
it('should terminate immediately when signal is already aborted before the first poll', async () => {
119+
const ac = new AbortController();
120+
ac.abort();
121+
122+
const lines: string[] = [];
123+
for await (const line of service.streamLogs('minecraft', ac.signal, 0)) {
124+
lines.push(line);
125+
}
126+
127+
expect(lines).toEqual([]);
128+
expect(cwMock.commandCalls(FilterLogEventsCommand)).toHaveLength(0);
129+
});
130+
131+
it('should yield log lines from the first poll and terminate on abort', async () => {
132+
cwMock.on(FilterLogEventsCommand).resolves({
133+
events: [
134+
{ eventId: 'e1', timestamp: 1000, message: 'line1' },
135+
{ eventId: 'e2', timestamp: 2000, message: 'line2' },
136+
],
137+
});
138+
139+
const ac = new AbortController();
140+
const gen = service.streamLogs('minecraft', ac.signal, 0);
141+
142+
const { value: l1 } = await gen.next();
143+
const { value: l2 } = await gen.next();
144+
ac.abort();
145+
const { done } = await gen.next();
146+
147+
expect(l1).toBe('line1');
148+
expect(l2).toBe('line2');
149+
expect(done).toBe(true);
150+
});
151+
152+
it('should yield new events from successive polls', async () => {
153+
cwMock
154+
.on(FilterLogEventsCommand)
155+
.resolvesOnce({ events: [{ eventId: 'e1', timestamp: 1000, message: 'first' }] })
156+
.resolves({ events: [{ eventId: 'e2', timestamp: 2000, message: 'second' }] });
157+
158+
const ac = new AbortController();
159+
const gen = service.streamLogs('minecraft', ac.signal, 0);
160+
161+
const { value: l1 } = await gen.next();
162+
const { value: l2 } = await gen.next();
163+
ac.abort();
164+
await gen.return(undefined);
165+
166+
expect(l1).toBe('first');
167+
expect(l2).toBe('second');
168+
});
169+
170+
it('should de-duplicate events with the same eventId across polls', async () => {
171+
cwMock
172+
.on(FilterLogEventsCommand)
173+
.resolvesOnce({ events: [{ eventId: 'e1', timestamp: 1000, message: 'line1' }] })
174+
.resolvesOnce({
175+
events: [
176+
{ eventId: 'e1', timestamp: 1000, message: 'line1' }, // already seen
177+
{ eventId: 'e2', timestamp: 2000, message: 'line2' }, // new
178+
],
179+
});
180+
181+
const ac = new AbortController();
182+
const gen = service.streamLogs('minecraft', ac.signal, 0);
183+
184+
const { value: l1 } = await gen.next(); // first poll yields 'line1'
185+
const { value: l2 } = await gen.next(); // second poll skips duplicate, yields 'line2'
186+
ac.abort();
187+
await gen.return(undefined);
188+
189+
expect(l1).toBe('line1');
190+
expect(l2).toBe('line2');
191+
});
192+
193+
it('should query the /ecs/{game}-server log group', async () => {
194+
cwMock.on(FilterLogEventsCommand).resolves({
195+
events: [{ eventId: 'e1', timestamp: 1000, message: 'hello' }],
196+
});
197+
198+
const ac = new AbortController();
199+
const gen = service.streamLogs('valheim', ac.signal, 0);
200+
201+
await gen.next(); // first poll runs and yields 'hello'
202+
ac.abort();
203+
await gen.return(undefined);
204+
205+
const calls = cwMock.commandCalls(FilterLogEventsCommand);
206+
expect(calls[0]!.args[0].input.logGroupName).toBe('/ecs/valheim-server');
207+
});
208+
209+
it('should yield a stream-error sentinel and continue when a poll throws', async () => {
210+
cwMock
211+
.on(FilterLogEventsCommand)
212+
.rejectsOnce(new Error('throttled'))
213+
.resolves({ events: [{ eventId: 'e1', timestamp: 1000, message: 'recovered' }] });
214+
215+
const ac = new AbortController();
216+
const gen = service.streamLogs('minecraft', ac.signal, 0);
217+
218+
const { value: errLine } = await gen.next(); // first poll throws
219+
const { value: okLine } = await gen.next(); // second poll succeeds
220+
ac.abort();
221+
await gen.return(undefined);
222+
223+
expect(errLine).toMatch(/\[stream error\].*throttled/);
224+
expect(okLine).toBe('recovered');
225+
});
226+
227+
it('should map a missing event.message to an empty string', async () => {
228+
cwMock.on(FilterLogEventsCommand).resolves({
229+
events: [{ eventId: 'e1', timestamp: 1000 }], // no message field
230+
});
231+
232+
const ac = new AbortController();
233+
const gen = service.streamLogs('minecraft', ac.signal, 0);
234+
235+
const { value: line } = await gen.next();
236+
ac.abort();
237+
await gen.return(undefined);
238+
239+
expect(line).toBe('');
240+
});
241+
});

app/packages/server/src/services/LogsService.ts

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,35 @@ import { Injectable } from '@nestjs/common';
22
import {
33
CloudWatchLogsClient,
44
DescribeLogStreamsCommand,
5+
FilterLogEventsCommand,
56
GetLogEventsCommand,
67
} from '@aws-sdk/client-cloudwatch-logs';
78
import { logger } from '../logger.js';
89
import { ConfigService } from './ConfigService.js';
910

11+
/**
12+
* Sleep for `ms` milliseconds, but reject immediately if `signal` is aborted.
13+
* Used by `streamLogs` so the poll loop exits promptly when the SSE client disconnects.
14+
*/
15+
function sleepInterruptible(ms: number, signal: AbortSignal): Promise<void> {
16+
return new Promise((resolve, reject) => {
17+
if (signal.aborted) {
18+
reject(new DOMException('Aborted', 'AbortError'));
19+
return;
20+
}
21+
const onAbort = () => {
22+
clearTimeout(timer);
23+
signal.removeEventListener('abort', onAbort);
24+
reject(new DOMException('Aborted', 'AbortError'));
25+
};
26+
const timer = setTimeout(() => {
27+
signal.removeEventListener('abort', onAbort);
28+
resolve();
29+
}, ms);
30+
signal.addEventListener('abort', onAbort);
31+
});
32+
}
33+
1034
/**
1135
* Fetches recent CloudWatch Logs lines for a game's ECS task so the UI can
1236
* render a tail. Assumes the Terraform-provisioned log group naming
@@ -25,6 +49,55 @@ export class LogsService {
2549
return this.client;
2650
}
2751

52+
/**
53+
* Async generator that polls `FilterLogEvents` every `pollInterval` ms and
54+
* yields new log lines as they arrive. De-duplicates by `eventId` so lines
55+
* are never emitted twice even when `startTime` windows overlap at the
56+
* boundary. The generator exits cleanly when `signal` is aborted (i.e.
57+
* when the SSE client disconnects).
58+
*
59+
* Queries the whole log group so that a stop+start of the ECS task
60+
* (which creates a new stream) is handled automatically without reconnecting.
61+
*/
62+
async *streamLogs(
63+
game: string,
64+
signal: AbortSignal,
65+
pollInterval = 2000,
66+
): AsyncGenerator<string> {
67+
const logGroup = `/ecs/${game}-server`;
68+
let startTime = Date.now();
69+
const seen = new Set<string>();
70+
71+
while (!signal.aborted) {
72+
try {
73+
const resp = await this.getClient().send(
74+
new FilterLogEventsCommand({ logGroupName: logGroup, startTime, limit: 100 }),
75+
{ abortSignal: signal },
76+
);
77+
for (const e of resp.events ?? []) {
78+
const id = e.eventId ?? `${e.timestamp}-${e.message}`;
79+
if (!seen.has(id)) {
80+
seen.add(id);
81+
yield e.message ?? '';
82+
}
83+
if ((e.timestamp ?? 0) >= startTime) {
84+
startTime = (e.timestamp ?? startTime) + 1;
85+
}
86+
}
87+
} catch (err) {
88+
if ((err as Error).name === 'AbortError') break;
89+
logger.error('Log stream poll error', { err, game, logGroup });
90+
yield `[stream error] ${String(err)}`;
91+
}
92+
93+
try {
94+
await sleepInterruptible(pollInterval, signal);
95+
} catch {
96+
break;
97+
}
98+
}
99+
}
100+
28101
/**
29102
* Return up to `limit` recent messages from the most recently written log
30103
* stream in `/ecs/{game}-server`. Errors are folded into a single-element

0 commit comments

Comments
 (0)