Skip to content

Commit e722d63

Browse files
committed
feat(agent): harden run lifecycle and abort handling
1 parent 5d3a73b commit e722d63

3 files changed

Lines changed: 118 additions & 31 deletions

File tree

packages/agent/__tests__/agent.test.ts

Lines changed: 66 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -347,13 +347,9 @@ describe('@agentic-kit/agent — pausable tools', () => {
347347
expect(() => agent.continue()).toThrow(/no tool calls awaiting a decision/);
348348
});
349349

350-
it('continue() resumes from a non-trailing assistant when a later message was appended after the pause', async () => {
351-
const provider = createScriptedProvider({ responses: [pauseResponse(), finalResponse()] });
352-
const execute = jest.fn(
353-
async (_id: string, _params: Record<string, unknown>, decision: unknown) => ({
354-
content: [{ type: 'text' as const, text: `decision=${JSON.stringify(decision)}` }],
355-
})
356-
);
350+
it('continue() rejects when non-toolResult messages have been appended after the pending assistant', async () => {
351+
const provider = createScriptedProvider({ responses: [pauseResponse()] });
352+
const execute = jest.fn();
357353

358354
const agent = new Agent({
359355
initialState: { model: makeFakeModel() },
@@ -372,14 +368,8 @@ describe('@agentic-kit/agent — pausable tools', () => {
372368
};
373369
agent.replaceMessages([...agent.state.messages, trailingNote]);
374370

375-
await agent.continue();
376-
377-
expect(execute).toHaveBeenCalledTimes(1);
378-
expect(execute.mock.calls[0]?.[2]).toEqual({ approved: true });
379-
expect(agent.state.messages.at(-1)).toMatchObject({
380-
role: 'assistant',
381-
content: [{ type: 'text', text: 'finalized' }],
382-
});
371+
expect(() => agent.continue()).toThrow(/non-toolResult messages have been appended/);
372+
expect(execute).not.toHaveBeenCalled();
383373
});
384374

385375
it('abort() while paused stops further work without throwing', async () => {
@@ -469,6 +459,67 @@ describe('@agentic-kit/agent — pausable tools', () => {
469459
});
470460
});
471461

462+
it('abort() during tool execution stops the loop and does not invoke the model again', async () => {
463+
const provider = createScriptedProvider({
464+
responses: [
465+
makeFakeAssistantMessage({
466+
stopReason: 'toolUse',
467+
content: [
468+
{ type: 'toolCall', id: 'tool_slow', name: 'slow', arguments: {} },
469+
],
470+
}),
471+
makeFakeAssistantMessage({
472+
stopReason: 'stop',
473+
content: [{ type: 'text', text: 'should never reach here' }],
474+
}),
475+
],
476+
});
477+
478+
const streamCalls = jest.fn(provider.stream);
479+
let abortAgent: (() => void) | undefined;
480+
const slowExecute = jest.fn<
481+
ReturnType<AgentTool['execute']>,
482+
Parameters<AgentTool['execute']>
483+
>((_id, _params, _decision, signal) => {
484+
return new Promise((_resolve, reject) => {
485+
signal?.addEventListener(
486+
'abort',
487+
() => reject(new Error('aborted')),
488+
{ once: true }
489+
);
490+
queueMicrotask(() => abortAgent?.());
491+
});
492+
});
493+
494+
const agent = new Agent({
495+
initialState: { model: makeFakeModel() },
496+
streamFn: streamCalls,
497+
});
498+
abortAgent = () => agent.abort();
499+
agent.setTools([
500+
{
501+
name: 'slow',
502+
label: 'Slow',
503+
description: 'Slow tool',
504+
parameters: { type: 'object', properties: {} },
505+
execute: slowExecute,
506+
},
507+
]);
508+
509+
const events: AgentEvent[] = [];
510+
agent.subscribe((e) => events.push(e));
511+
512+
await agent.prompt('go');
513+
514+
expect(streamCalls).toHaveBeenCalledTimes(1);
515+
expect(slowExecute).toHaveBeenCalledTimes(1);
516+
const end = events.find(
517+
(e): e is Extract<AgentEvent, { type: 'agent_end' }> => e.type === 'agent_end'
518+
);
519+
expect(end?.stopReason).toBe('aborted');
520+
expect(agent.state.isStreaming).toBe(false);
521+
});
522+
472523
it('regression: a tool without a decision schema runs without pausing', async () => {
473524
const provider = createScriptedProvider({
474525
responses: [

packages/agent/src/agent.ts

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ export class Agent {
3636
private abortController?: AbortController;
3737
private running?: Promise<void>;
3838
private runChannel?: { push: RunChannelPush };
39+
private outstandingHandle?: AgentRunHandle;
3940

4041
private _state: AgentState;
4142

@@ -103,41 +104,51 @@ export class Agent {
103104

104105
abort(): void {
105106
this.abortController?.abort();
107+
this.outstandingHandle = undefined;
106108
}
107109

108110
waitForIdle(): Promise<void> {
109111
return this.running ?? Promise.resolve();
110112
}
111113

112114
prompt(input: string | Message, opts?: { maxSteps?: number }): AgentRunHandle {
113-
if (this._state.isStreaming) {
114-
throw new Error('Agent is already processing a prompt');
115-
}
115+
this.assertIdle('prompt');
116116

117117
const message = typeof input === 'string' ? createUserMessage(input) : input;
118118
this._state.stepCount = 0;
119119

120-
return new DefaultAgentRunHandle(async (push, signal) =>
121-
this.runLoop({
120+
const handle: AgentRunHandle = new DefaultAgentRunHandle(async (push, signal) => {
121+
if (this.outstandingHandle === handle) {
122+
this.outstandingHandle = undefined;
123+
}
124+
return this.runLoop({
122125
initialMessages: [message],
123126
externalPush: push ?? undefined,
124127
externalAbortSignal: signal,
125128
maxSteps: opts?.maxSteps ?? this.defaultMaxSteps,
126-
})
127-
);
129+
});
130+
});
131+
this.outstandingHandle = handle;
132+
return handle;
128133
}
129134

130135
continue(opts?: { maxSteps?: number }): AgentRunHandle {
131-
if (this._state.isStreaming) {
132-
throw new Error('Agent is already processing');
133-
}
136+
this.assertIdle('continue');
134137

135138
if (this._state.messages.length === 0) {
136139
throw new Error('No messages to continue from');
137140
}
138141

139142
const pendingMessage = this.findMostRecentPendingAssistant();
140143
if (pendingMessage) {
144+
const pendingIndex = this._state.messages.indexOf(pendingMessage);
145+
const trailing = this._state.messages.slice(pendingIndex + 1);
146+
const hasNonToolResultTrailing = trailing.some((m) => m.role !== 'toolResult');
147+
if (hasNonToolResultTrailing) {
148+
throw new Error(
149+
'Cannot continue() with a pending decision when non-toolResult messages have been appended after the pending assistant. Use injectDeferralResults() + prompt() instead — see the agentic-kit deferral docs.'
150+
);
151+
}
141152
const pendingDecisions = this.findPendingDecisions(pendingMessage);
142153
for (const { tool, decision } of pendingDecisions) {
143154
const errors = validateSchema(tool.decision!, decision, 'root');
@@ -154,13 +165,29 @@ export class Agent {
154165
}
155166
}
156167

157-
return new DefaultAgentRunHandle(async (push, signal) =>
158-
this.runLoop({
168+
const handle: AgentRunHandle = new DefaultAgentRunHandle(async (push, signal) => {
169+
if (this.outstandingHandle === handle) {
170+
this.outstandingHandle = undefined;
171+
}
172+
return this.runLoop({
159173
externalPush: push ?? undefined,
160174
externalAbortSignal: signal,
161175
maxSteps: opts?.maxSteps ?? this.defaultMaxSteps,
162-
})
163-
);
176+
});
177+
});
178+
this.outstandingHandle = handle;
179+
return handle;
180+
}
181+
182+
private assertIdle(method: 'prompt' | 'continue'): void {
183+
if (this._state.isStreaming) {
184+
throw new Error(`Agent is already processing; cannot call ${method}() while a run is active`);
185+
}
186+
if (this.outstandingHandle) {
187+
throw new Error(
188+
`Agent has an unconsumed run handle from a previous ${method}()/prompt()/continue() call; consume it (events / toReadableStream / toResponse / wait) or abort the agent before issuing another`
189+
);
190+
}
164191
}
165192

166193
private findMostRecentPendingAssistant(): AssistantMessage | undefined {
@@ -227,7 +254,7 @@ export class Agent {
227254
}
228255
}
229256

230-
let stopReason: 'completed' | 'max_steps' = 'completed';
257+
let stopReason: 'completed' | 'max_steps' | 'aborted' = 'completed';
231258

232259
try {
233260
await this.emit({ type: 'agent_start' });
@@ -286,6 +313,11 @@ export class Agent {
286313
}
287314

288315
await this.emit({ type: 'turn_end', message: assistantMessage, toolResults: outcome.results });
316+
317+
if (localAbortController.signal.aborted) {
318+
stopReason = 'aborted';
319+
break;
320+
}
289321
}
290322

291323
await this.emit({ type: 'agent_end', messages: [...this._state.messages], stopReason });
@@ -372,6 +404,10 @@ export class Agent {
372404
continue;
373405
}
374406

407+
if (signal.aborted) {
408+
break;
409+
}
410+
375411
const tool = this._state.tools.find((candidate) => candidate.name === toolCall.name);
376412
const args = toolCall.arguments as Record<string, unknown>;
377413
const decisionAttached = 'decision' in toolCall && toolCall.decision !== undefined;

packages/agent/src/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ export interface AgentEventBase {
4949

5050
export type AgentEvent =
5151
| { type: 'agent_start' }
52-
| { type: 'agent_end'; messages: Message[]; stopReason?: 'completed' | 'max_steps' }
52+
| { type: 'agent_end'; messages: Message[]; stopReason?: 'completed' | 'max_steps' | 'aborted' }
5353
| { type: 'turn_start' }
5454
| { type: 'turn_end'; message: AssistantMessage; toolResults: ToolResultMessage[] }
5555
| { type: 'message_start'; message: Message }

0 commit comments

Comments
 (0)