Skip to content

Commit 3e4afcc

Browse files
jerryliang64claude
andauthored
feat(tegg): [3/3] add @AgentController decorator with plugin integration (#5827)
## Summary - Add `@AgentController()` decorator that auto-registers 7 OpenAI-compatible HTTP routes under `/api/v1` - Add `AgentHandler` interface, `AgentInfoUtil` metadata utilities, and `AgentControllerProto`/`AgentControllerObject` for full IoC lifecycle integration - Add `NodeSSEWriter` implementation with lazy header writing and lowercase HTTP headers - Enforce `createStore()` method in `@AgentController` (no default store — users must provide their own `AgentStore`) - Use `AgentRuntime.create()` with `AgentExecutor` interface (replaces removed `createAgentRuntime` + `AgentControllerHost`) - Add `AGENT_CONTROLLER_PROTO_IMPL_TYPE` and metadata symbols in `@eggjs/tegg-types` - Register agent controller factory in controller plugin boot hook (`app.ts`) - Add `@eggjs/tegg/agent` public API entry point for re-exports ## Stacked PRs This is part of a 3-PR series (review in order): 1. #5823 — Store layer (AgentStore + OSSAgentStore) ✅ merged 2. #5824 — Runtime (AgentRuntime + MessageConverter + RunBuilder) ✅ merged 3. **This PR** — Decorator (@AgentController + Plugin integration) ## Changes from previous PR (#5820) - **Fixed broken imports**: `AgentHandler.ts` and `AgentFooController.ts` now import types from `@eggjs/tegg-types/agent-runtime` instead of non-existent `model/AgentControllerTypes.ts` - **Added `NodeSSEWriter`**: New implementation in `@eggjs/agent-runtime` with lazy header writing, lowercase HTTP headers (`content-type`), and single `res.write()` per event - **Enforced `createStore()`**: Removed `FileAgentStore` fallback — `@AgentController` now requires users to implement `createStore()` - **Switched to `AgentRuntime.create()`**: Replaced `createAgentRuntime({ host })` with `AgentRuntime.create({ executor })` to match PR2's API - **Fixed `agent.ts` exports**: Removed `FileAgentStore`, added `NodeSSEWriter`, types now re-exported from `@eggjs/agent-runtime` - **Updated snapshot**: Removed agent-related enums from `@eggjs/controller-decorator` exports (they live in `@eggjs/tegg-types/agent-runtime`) - **Squashed into single commit** for clean history ## Test plan - [x] NodeSSEWriter unit tests pass (8 tests: lazy headers, event format, closed state, onClose callbacks, idempotent end) - [x] Controller-decorator snapshot test updated and passing - [x] `tsc --noEmit` type checking passes for all affected packages - [x] Pre-commit hooks (oxfmt + oxlint) pass 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Agent controller support with lifecycle-integrated controller objects and a streaming AgentHandler. * Server-Sent Events support for Node.js to stream runtime events to clients. * New unified agent module exposing controller/handler types, runtime types, and agent errors. * **Bug Fixes** * Improved cancellation handling and clearer distinction between cancelled vs failed streaming runs. * Completed message outputs now include richer, consistent message structure. * **Tests** * Added comprehensive unit tests for decorators, controller lifecycle, SSE streaming, and fixtures. * **Chores** * Package/export updates to publish the new agent surface. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 1a4fb20 commit 3e4afcc

32 files changed

Lines changed: 1404 additions & 99 deletions

pnpm-lock.yaml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tegg/core/agent-runtime/src/AgentRuntime.ts

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import type {
99
AgentStreamMessage,
1010
AgentStore,
1111
} from '@eggjs/tegg-types/agent-runtime';
12-
import { RunStatus, AgentSSEEvent, AgentObjectType, MessageStatus } from '@eggjs/tegg-types/agent-runtime';
12+
import { RunStatus, AgentSSEEvent, AgentObjectType } from '@eggjs/tegg-types/agent-runtime';
1313
import { AgentConflictError } from '@eggjs/tegg-types/agent-runtime';
1414
import type { EggLogger } from 'egg-logger';
1515

@@ -284,14 +284,13 @@ export class AgentRuntime {
284284
}
285285

286286
// event: thread.message.completed
287-
msgObj.status = MessageStatus.Completed;
288-
msgObj.content = content;
289-
writer.writeEvent(AgentSSEEvent.ThreadMessageCompleted, msgObj);
287+
const completedMsg = MessageConverter.completeMessage(msgObj, content);
288+
writer.writeEvent(AgentSSEEvent.ThreadMessageCompleted, completedMsg);
290289

291290
// Persist and emit completion — append messages before marking run as completed
292291
// so a failure leaves the run in_progress (retryable) instead of completed-but-incomplete.
293292
// TODO(atomicity): add aggregate store method for full transactional guarantee.
294-
const output: MessageObject[] = content.length > 0 ? [msgObj] : [];
293+
const output: MessageObject[] = content.length > 0 ? [completedMsg] : [];
295294
await this.store.appendMessages(threadId, [
296295
...MessageConverter.toInputMessageObjects(input.input.messages, threadId),
297296
...output,
@@ -301,15 +300,28 @@ export class AgentRuntime {
301300
// event: thread.run.completed
302301
writer.writeEvent(AgentSSEEvent.ThreadRunCompleted, rb.snapshot());
303302
} catch (err: unknown) {
304-
try {
305-
await this.store.updateRun(run.id, rb.fail(err as Error));
306-
} catch (storeErr) {
307-
this.logger.error('[AgentRuntime] failed to update run status after error:', storeErr);
308-
}
303+
if (abortController.signal.aborted) {
304+
// Client disconnected or cancelRun fired — mark as cancelled, not failed
305+
rb.cancelling();
306+
try {
307+
await this.store.updateRun(run.id, rb.cancel());
308+
} catch (storeErr) {
309+
this.logger.error('[AgentRuntime] failed to write cancelled status during stream error:', storeErr);
310+
}
311+
if (!writer.closed) {
312+
writer.writeEvent(AgentSSEEvent.ThreadRunCancelled, rb.snapshot());
313+
}
314+
} else {
315+
try {
316+
await this.store.updateRun(run.id, rb.fail(err as Error));
317+
} catch (storeErr) {
318+
this.logger.error('[AgentRuntime] failed to update run status after error:', storeErr);
319+
}
309320

310-
// event: thread.run.failed
311-
if (!writer.closed) {
312-
writer.writeEvent(AgentSSEEvent.ThreadRunFailed, rb.snapshot());
321+
// event: thread.run.failed
322+
if (!writer.closed) {
323+
writer.writeEvent(AgentSSEEvent.ThreadRunFailed, rb.snapshot());
324+
}
313325
}
314326
} finally {
315327
resolveTask();
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import type { ServerResponse } from 'node:http';
2+
3+
import type { SSEWriter } from './SSEWriter.ts';
4+
5+
export class HttpSSEWriter implements SSEWriter {
6+
private res: ServerResponse;
7+
private _closed = false;
8+
private closeCallbacks: Array<() => void> = [];
9+
private headersSent = false;
10+
private readonly onResClose: () => void;
11+
12+
constructor(res: ServerResponse) {
13+
this.res = res;
14+
this.onResClose = () => {
15+
this._closed = true;
16+
for (const cb of this.closeCallbacks) cb();
17+
this.closeCallbacks.length = 0;
18+
};
19+
res.on('close', this.onResClose);
20+
}
21+
22+
/** Lazily write headers on first event — avoids sending corrupt headers if constructor throws. */
23+
private ensureHeaders(): void {
24+
if (this.headersSent) return;
25+
this.headersSent = true;
26+
this.res.writeHead(200, {
27+
'content-type': 'text/event-stream',
28+
'cache-control': 'no-cache',
29+
connection: 'keep-alive',
30+
});
31+
}
32+
33+
writeEvent(event: string, data: unknown): void {
34+
if (this._closed) return;
35+
this.ensureHeaders();
36+
this.res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
37+
}
38+
39+
get closed(): boolean {
40+
return this._closed;
41+
}
42+
43+
end(): void {
44+
if (!this._closed) {
45+
this._closed = true;
46+
this.res.off('close', this.onResClose);
47+
this.closeCallbacks.length = 0;
48+
this.res.end();
49+
}
50+
}
51+
52+
onClose(callback: () => void): void {
53+
this.closeCallbacks.push(callback);
54+
}
55+
}

tegg/core/agent-runtime/src/MessageConverter.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,13 @@ export class MessageConverter {
8181
return { output, usage };
8282
}
8383

84+
/**
85+
* Produce a completed copy of a streaming MessageObject with final content.
86+
*/
87+
static completeMessage(msg: MessageObject, content: MessageContentBlock[]): MessageObject {
88+
return { ...msg, status: MessageStatus.Completed, content };
89+
}
90+
8491
/**
8592
* Create an in-progress MessageObject for streaming (before content is known).
8693
*/

tegg/core/agent-runtime/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ export * from './AgentStoreUtils.ts';
77
export * from './MessageConverter.ts';
88
export * from './RunBuilder.ts';
99
export * from './SSEWriter.ts';
10+
export * from './HttpSSEWriter.ts';
1011
export { AgentRuntime, AGENT_RUNTIME } from './AgentRuntime.ts';
1112
export type { AgentExecutor, AgentRuntimeOptions } from './AgentRuntime.ts';

tegg/core/agent-runtime/test/AgentRuntime.test.ts

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,7 @@ import {
99
MessageStatus,
1010
ContentBlockType,
1111
} from '@eggjs/tegg-types/agent-runtime';
12-
import type {
13-
RunRecord,
14-
CreateRunInput,
15-
AgentStreamMessage,
16-
MessageContentBlock,
17-
} from '@eggjs/tegg-types/agent-runtime';
12+
import type { RunRecord, RunObject, CreateRunInput, AgentStreamMessage } from '@eggjs/tegg-types/agent-runtime';
1813
import { AgentNotFoundError, AgentConflictError } from '@eggjs/tegg-types/agent-runtime';
1914
import { describe, it, beforeEach, afterEach } from 'vitest';
2015

@@ -183,9 +178,9 @@ describe('test/AgentRuntime.test.ts', () => {
183178
assert(result.threadId.startsWith('thread_'));
184179
assert.equal(result.output!.length, 1);
185180
assert.equal(result.output![0].object, AgentObjectType.ThreadMessage);
186-
assert.equal(result.output![0]['role'], MessageRole.Assistant);
187-
assert.equal(result.output![0]['status'], MessageStatus.Completed);
188-
const content = result.output![0]['content'] as MessageContentBlock[];
181+
assert.equal(result.output![0].role, MessageRole.Assistant);
182+
assert.equal(result.output![0].status, MessageStatus.Completed);
183+
const content = result.output![0].content;
189184
assert.equal(content[0].type, ContentBlockType.Text);
190185
assert.equal(content[0].text.value, 'Hello 1 messages');
191186
assert(Array.isArray(content[0].text.annotations));
@@ -226,8 +221,8 @@ describe('test/AgentRuntime.test.ts', () => {
226221

227222
const updated = await runtime.getThread(thread.id);
228223
assert.equal(updated.messages.length, 2);
229-
assert.equal(updated.messages[0]['role'], MessageRole.User);
230-
assert.equal(updated.messages[1]['role'], MessageRole.Assistant);
224+
assert.equal(updated.messages[0].role, MessageRole.User);
225+
assert.equal(updated.messages[1].role, MessageRole.Assistant);
231226
});
232227

233228
it('should auto-create thread and append messages when threadId not provided', async () => {
@@ -239,8 +234,8 @@ describe('test/AgentRuntime.test.ts', () => {
239234

240235
const thread = await runtime.getThread(result.threadId);
241236
assert.equal(thread.messages.length, 2);
242-
assert.equal(thread.messages[0]['role'], MessageRole.User);
243-
assert.equal(thread.messages[1]['role'], MessageRole.Assistant);
237+
assert.equal(thread.messages[0].role, MessageRole.User);
238+
assert.equal(thread.messages[1].role, MessageRole.Assistant);
244239
});
245240

246241
it('should not throw when store.updateRun fails in catch block', async () => {
@@ -290,7 +285,7 @@ describe('test/AgentRuntime.test.ts', () => {
290285

291286
const run = await store.getRun(result.id);
292287
assert.equal(run.status, RunStatus.Completed);
293-
const outputContent = run.output![0]['content'] as MessageContentBlock[];
288+
const outputContent = run.output![0].content;
294289
assert.equal(outputContent[0].text.value, 'Hello 1 messages');
295290
});
296291

@@ -304,8 +299,8 @@ describe('test/AgentRuntime.test.ts', () => {
304299

305300
const thread = await store.getThread(result.threadId);
306301
assert.equal(thread.messages.length, 2);
307-
assert.equal(thread.messages[0]['role'], MessageRole.User);
308-
assert.equal(thread.messages[1]['role'], MessageRole.Assistant);
302+
assert.equal(thread.messages[0].role, MessageRole.User);
303+
assert.equal(thread.messages[1].role, MessageRole.Assistant);
309304
});
310305

311306
it('should pass metadata through to store and return it', async () => {
@@ -352,6 +347,14 @@ describe('test/AgentRuntime.test.ts', () => {
352347
assert(deltaIdx < msgCompletedIdx);
353348
assert(msgCompletedIdx < runCompletedIdx);
354349
assert(runCompletedIdx < doneIdx);
350+
351+
// Verify messages persisted to thread (consistent with syncRun/asyncRun tests)
352+
const runCreatedEvent = writer.events.find((e) => e.event === AgentSSEEvent.ThreadRunCreated);
353+
const threadId = (runCreatedEvent!.data as RunObject).threadId;
354+
const thread = await runtime.getThread(threadId);
355+
assert.equal(thread.messages.length, 2);
356+
assert.equal(thread.messages[0]['role'], MessageRole.User);
357+
assert.equal(thread.messages[1]['role'], MessageRole.Assistant);
355358
});
356359

357360
it('should emit cancelled event on client disconnect', async () => {
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
import assert from 'node:assert';
2+
import { EventEmitter } from 'node:events';
3+
4+
import { describe, it, beforeEach } from 'vitest';
5+
6+
import { HttpSSEWriter } from '../src/HttpSSEWriter.ts';
7+
8+
/**
9+
* Minimal mock of Node.js ServerResponse for testing HttpSSEWriter.
10+
* Captures writeHead/write/end calls and emits 'close' on demand.
11+
*/
12+
class MockServerResponse extends EventEmitter {
13+
writtenHead: { statusCode: number; headers: Record<string, string> } | null = null;
14+
chunks: string[] = [];
15+
ended = false;
16+
17+
writeHead(statusCode: number, headers: Record<string, string>): void {
18+
this.writtenHead = { statusCode, headers };
19+
}
20+
21+
write(chunk: string): boolean {
22+
this.chunks.push(chunk);
23+
return true;
24+
}
25+
26+
end(): void {
27+
this.ended = true;
28+
}
29+
}
30+
31+
describe('test/HttpSSEWriter.test.ts', () => {
32+
let res: MockServerResponse;
33+
34+
beforeEach(() => {
35+
res = new MockServerResponse();
36+
});
37+
38+
it('should delay headers until first writeEvent', () => {
39+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
40+
const writer = new HttpSSEWriter(res as any);
41+
42+
// Headers not sent yet after construction
43+
assert.equal(res.writtenHead, null);
44+
assert.equal(res.chunks.length, 0);
45+
46+
writer.writeEvent('test', { foo: 'bar' });
47+
48+
// Now headers should be sent
49+
assert.ok(res.writtenHead);
50+
assert.equal(res.writtenHead.statusCode, 200);
51+
});
52+
53+
it('should use lowercase header keys', () => {
54+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
55+
const writer = new HttpSSEWriter(res as any);
56+
writer.writeEvent('ping', {});
57+
58+
assert.ok(res.writtenHead);
59+
assert.equal(res.writtenHead.headers['content-type'], 'text/event-stream');
60+
assert.equal(res.writtenHead.headers['cache-control'], 'no-cache');
61+
assert.equal(res.writtenHead.headers['connection'], 'keep-alive');
62+
});
63+
64+
it('should format SSE events correctly', () => {
65+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
66+
const writer = new HttpSSEWriter(res as any);
67+
writer.writeEvent('message', { text: 'hello' });
68+
69+
assert.equal(res.chunks.length, 1);
70+
assert.equal(res.chunks[0], 'event: message\ndata: {"text":"hello"}\n\n');
71+
});
72+
73+
it('should not write after connection closes', () => {
74+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
75+
const writer = new HttpSSEWriter(res as any);
76+
77+
// Simulate client disconnect
78+
res.emit('close');
79+
80+
assert.equal(writer.closed, true);
81+
writer.writeEvent('late', { data: 'ignored' });
82+
83+
// No headers sent, no chunks written
84+
assert.equal(res.writtenHead, null);
85+
assert.equal(res.chunks.length, 0);
86+
});
87+
88+
it('should trigger onClose callbacks when connection closes', () => {
89+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
90+
const writer = new HttpSSEWriter(res as any);
91+
const calls: number[] = [];
92+
93+
writer.onClose(() => calls.push(1));
94+
writer.onClose(() => calls.push(2));
95+
96+
res.emit('close');
97+
98+
assert.deepStrictEqual(calls, [1, 2]);
99+
});
100+
101+
it('should handle end() idempotently', () => {
102+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
103+
const writer = new HttpSSEWriter(res as any);
104+
105+
assert.equal(writer.closed, false);
106+
107+
writer.end();
108+
assert.equal(writer.closed, true);
109+
assert.equal(res.ended, true);
110+
111+
// Reset flag to verify second end() doesn't call res.end() again
112+
res.ended = false;
113+
writer.end();
114+
assert.equal(res.ended, false); // Not called again
115+
});
116+
117+
it('should write multiple events sequentially', () => {
118+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
119+
const writer = new HttpSSEWriter(res as any);
120+
121+
writer.writeEvent('event1', { n: 1 });
122+
writer.writeEvent('event2', { n: 2 });
123+
writer.writeEvent('event3', { n: 3 });
124+
125+
assert.equal(res.chunks.length, 3);
126+
assert.equal(res.chunks[0], 'event: event1\ndata: {"n":1}\n\n');
127+
assert.equal(res.chunks[1], 'event: event2\ndata: {"n":2}\n\n');
128+
assert.equal(res.chunks[2], 'event: event3\ndata: {"n":3}\n\n');
129+
130+
// Headers sent only once
131+
assert.ok(res.writtenHead);
132+
});
133+
134+
it('should start with closed=false', () => {
135+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
136+
const writer = new HttpSSEWriter(res as any);
137+
assert.equal(writer.closed, false);
138+
});
139+
});

0 commit comments

Comments
 (0)