Skip to content

Commit 510196c

Browse files
committed
feat(agent): add wait() and stream cancel on break
1 parent e722d63 commit 510196c

2 files changed

Lines changed: 68 additions & 8 deletions

File tree

packages/agent/__tests__/run-handle.test.ts

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,23 @@ describe('AgentRunHandle', () => {
304304
expect(getSignal()?.aborted).toBe(true);
305305
expect(agent.state.isStreaming).toBe(false);
306306
});
307+
308+
it('aborts streamFn when events() iteration breaks early', async () => {
309+
const { streamFn, getSignal } = makeAbortableStreamFn();
310+
const agent = new Agent({
311+
initialState: { model: makeFakeModel() },
312+
streamFn,
313+
});
314+
315+
const handle = agent.prompt('go');
316+
for await (const _event of handle.events()) {
317+
break;
318+
}
319+
await agent.waitForIdle();
320+
321+
expect(getSignal()?.aborted).toBe(true);
322+
expect(agent.state.isStreaming).toBe(false);
323+
});
307324
});
308325

309326
describe('single-use enforcement', () => {
@@ -328,6 +345,29 @@ describe('AgentRunHandle', () => {
328345
expect(() => handle.events()).toThrow(/already consumed/);
329346
});
330347

348+
it('throws on a second prompt() while a handle from the first is still unconsumed', () => {
349+
const provider = createScriptedProvider({
350+
responses: [
351+
makeFakeAssistantMessage({
352+
stopReason: 'stop',
353+
content: [{ type: 'text', text: 'x' }],
354+
}),
355+
],
356+
});
357+
const agent = new Agent({
358+
initialState: { model: makeFakeModel() },
359+
streamFn: provider.stream,
360+
});
361+
362+
const first = agent.prompt('hi');
363+
expect(() => agent.prompt('hi again')).toThrow(/unconsumed run handle/);
364+
365+
// abort frees the agent state; first remains a dangling handle reference
366+
agent.abort();
367+
expect(() => agent.prompt('after abort')).not.toThrow();
368+
void first;
369+
});
370+
331371
it('throws when toResponse() is called twice', () => {
332372
const provider = createScriptedProvider({
333373
responses: [
@@ -349,8 +389,8 @@ describe('AgentRunHandle', () => {
349389
});
350390
});
351391

352-
describe('PromiseLike auto-sink', () => {
353-
it('await on the handle drives the run to completion without an explicit consumer', async () => {
392+
describe('wait()', () => {
393+
it('drives the run to completion without an explicit event consumer', async () => {
354394
const provider = createScriptedProvider({
355395
responses: [
356396
makeFakeAssistantMessage({
@@ -373,15 +413,15 @@ describe('AgentRunHandle', () => {
373413
expect(agent.state.isStreaming).toBe(false);
374414
});
375415

376-
it('rejects the awaited handle when the binder rejects (e.g. streamFn throws)', async () => {
416+
it('rejects when the binder rejects (e.g. streamFn throws)', async () => {
377417
const agent = new Agent({
378418
initialState: { model: makeFakeModel() },
379419
streamFn: () => {
380420
throw new Error('binder failure');
381421
},
382422
});
383423

384-
await expect(agent.prompt('hi')).rejects.toThrow(/binder failure/);
424+
await expect(agent.prompt('hi').wait()).rejects.toThrow(/binder failure/);
385425
expect(agent.state.isStreaming).toBe(false);
386426
});
387427

packages/agent/src/run-handle.ts

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ export interface AgentRunHandle extends PromiseLike<void> {
1111
events(): AsyncIterable<AgentEvent>;
1212
toReadableStream(): ReadableStream<AgentEvent>;
1313
toResponse(init?: ResponseInit): Response;
14+
/**
15+
* Run to completion without observing events. Equivalent to `await handle`
16+
* (the handle is `PromiseLike<void>`), but explicit. Use this if you want
17+
* to avoid accidental thenable assimilation in code paths where the handle
18+
* might be passed through generic wrappers.
19+
*/
20+
wait(): Promise<void>;
1421
}
1522

1623
const DEFAULT_HIGH_WATER_MARK = 8;
@@ -59,14 +66,18 @@ export class DefaultAgentRunHandle implements AgentRunHandle {
5966
return new Response(sse, responseInit);
6067
}
6168

69+
wait(): Promise<void> {
70+
if (!this.startedAs) {
71+
this.startSink();
72+
}
73+
return this.completion!;
74+
}
75+
6276
then<TResult1 = void, TResult2 = never>(
6377
onfulfilled?: ((value: void) => TResult1 | PromiseLike<TResult1>) | null,
6478
onrejected?: ((reason: unknown) => TResult2 | PromiseLike<TResult2>) | null
6579
): Promise<TResult1 | TResult2> {
66-
if (!this.startedAs) {
67-
this.startSink();
68-
}
69-
return this.completion!.then(onfulfilled, onrejected);
80+
return this.wait().then(onfulfilled, onrejected);
7081
}
7182

7283
private ensureNotStarted(via: NonNullable<DefaultAgentRunHandle['startedAs']>): void {
@@ -175,15 +186,24 @@ async function* readableStreamToAsyncIterable<T>(
175186
stream: ReadableStream<T>
176187
): AsyncIterableIterator<T> {
177188
const reader = stream.getReader();
189+
let drained = false;
178190
try {
179191
while (true) {
180192
const { done, value } = await reader.read();
181193
if (done) {
194+
drained = true;
182195
return;
183196
}
184197
yield value;
185198
}
186199
} finally {
200+
if (!drained) {
201+
try {
202+
await reader.cancel();
203+
} catch {
204+
// cancel can reject if the stream already errored — safe to ignore
205+
}
206+
}
187207
reader.releaseLock();
188208
}
189209
}

0 commit comments

Comments
 (0)