diff --git a/strands-ts/src/a2a/__tests__/a2a-agent.test.ts b/strands-ts/src/a2a/__tests__/a2a-agent.test.ts index 4414ff688..f89f35930 100644 --- a/strands-ts/src/a2a/__tests__/a2a-agent.test.ts +++ b/strands-ts/src/a2a/__tests__/a2a-agent.test.ts @@ -215,6 +215,102 @@ describe('A2AAgent', () => { }) }) + describe('task lifecycle state mapping', () => { + it.each([ + { state: 'completed', expectedStopReason: 'endTurn', expectedTaskState: 'completed' }, + { state: 'failed', expectedStopReason: 'endTurn', expectedTaskState: 'failed' }, + { state: 'canceled', expectedStopReason: 'cancelled', expectedTaskState: 'canceled' }, + { state: 'rejected', expectedStopReason: 'endTurn', expectedTaskState: 'rejected' }, + { state: 'input-required', expectedStopReason: 'interrupt', expectedTaskState: 'input-required' }, + { state: 'auth-required', expectedStopReason: 'interrupt', expectedTaskState: 'auth-required' }, + { state: 'unknown', expectedStopReason: 'endTurn', expectedTaskState: 'unknown' }, + ])('maps $state to stopReason=$expectedStopReason', async ({ state, expectedStopReason, expectedTaskState }) => { + const statusUpdate: TaskStatusUpdateEvent = { + kind: 'status-update', + taskId: 'task-1', + contextId: 'ctx-1', + status: { + state: state as TaskStatusUpdateEvent['status']['state'], + message: { + kind: 'message', + messageId: 'msg-1', + role: 'agent', + parts: [{ kind: 'text', text: `Task ${state}` }], + }, + }, + final: true, + } + mockSendMessageStream.mockReturnValue(mockStream(statusUpdate)) + + const agent = new A2AAgent({ url: 'http://localhost:9000' }) + const result = await agent.invoke('Hello') + + expect(result.stopReason).toBe(expectedStopReason) + expect(result.invocationState.a2aTaskState).toBe(expectedTaskState) + }) + + it('includes a2aTaskState in invocationState for completed Task response', async () => { + const agent = new A2AAgent({ url: 'http://localhost:9000' }) + const result = await agent.invoke('Hello') + + expect(result.invocationState.a2aTaskState).toBe('completed') + }) + + it('recognizes all terminal states as complete events (stops streaming)', async () => { + // Simulate: working → failed (terminal) — should not loop forever + const workingUpdate: TaskStatusUpdateEvent = { + kind: 'status-update', + taskId: 'task-1', + contextId: 'ctx-1', + status: { state: 'working' }, + final: false, + } + const failedUpdate: TaskStatusUpdateEvent = { + kind: 'status-update', + taskId: 'task-1', + contextId: 'ctx-1', + status: { + state: 'failed', + message: { kind: 'message', messageId: 'msg-1', role: 'agent', parts: [{ kind: 'text', text: 'Error' }] }, + }, + final: true, + } + mockSendMessageStream.mockReturnValue(mockStream(workingUpdate, failedUpdate)) + + const agent = new A2AAgent({ url: 'http://localhost:9000' }) + const result = await agent.invoke('Hello') + + expect(result.stopReason).toBe('endTurn') + expect(result.invocationState.a2aTaskState).toBe('failed') + }) + + it('recognizes input-required as complete event', async () => { + const inputRequired: TaskStatusUpdateEvent = { + kind: 'status-update', + taskId: 'task-1', + contextId: 'ctx-1', + status: { + state: 'input-required', + message: { + kind: 'message', + messageId: 'msg-1', + role: 'agent', + parts: [{ kind: 'text', text: 'Need more info' }], + }, + }, + final: true, + } + mockSendMessageStream.mockReturnValue(mockStream(inputRequired)) + + const agent = new A2AAgent({ url: 'http://localhost:9000' }) + const result = await agent.invoke('Hello') + + expect(result.stopReason).toBe('interrupt') + expect(result.invocationState.a2aTaskState).toBe('input-required') + expect((result.lastMessage.content[0] as TextBlock).text).toBe('Need more info') + }) + }) + describe('stream', () => { it('yields A2AStreamUpdateEvent for each A2A event and A2AResultEvent at the end', async () => { const task = createMockTaskResponse() diff --git a/strands-ts/src/a2a/__tests__/executor.test.ts b/strands-ts/src/a2a/__tests__/executor.test.ts index a20aba87b..fe370f83a 100644 --- a/strands-ts/src/a2a/__tests__/executor.test.ts +++ b/strands-ts/src/a2a/__tests__/executor.test.ts @@ -11,6 +11,8 @@ import { ImageBlock, encodeBase64 } from '../../types/media.js' import { ContentBlockEvent, ModelStreamUpdateEvent } from '../../hooks/events.js' import { AgentResult } from '../../types/agent.js' import { Message } from '../../types/messages.js' +import { CancelledError } from '../../errors.js' +import { Interrupt } from '../../interrupt.js' function createMockEventBus(): ExecutionEventBus & { events: AgentExecutionEvent[] } { const events: AgentExecutionEvent[] = [] @@ -135,7 +137,7 @@ describe('A2AExecutor', () => { expect(agent.stream).toHaveBeenCalledWith( [new TextBlock('Line 1'), new TextBlock('[File: file (file://test.txt)]'), new TextBlock('Line 2')], - { invocationState: { a2aRequestContext: context } } + { invocationState: { a2aRequestContext: context }, cancelSignal: expect.any(AbortSignal) } ) }) @@ -154,18 +156,185 @@ describe('A2AExecutor', () => { expect(options?.invocationState).toEqual({ a2aRequestContext: context }) }) - it('re-throws when agent throws, publishing only the initial task event', async () => { + it('transitions to failed state when agent throws', async () => { const model = new MockMessageModel().addTurn(new Error('Agent failed')) const agent = new Agent({ model, printer: false }) const executor = new A2AExecutor(agent) const eventBus = createMockEventBus() - await expect(executor.execute(createRequestContext('Hello'), eventBus)).rejects.toThrow('Agent failed') + await executor.execute(createRequestContext('Hello'), eventBus) - // Only the initial task registration event is published before the error - expect(eventBus.events).toStrictEqual([ - { kind: 'task', id: 'task-1', contextId: 'ctx-1', status: { state: 'working' } }, - ]) + const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update') + + // Should have a failed status event (not throw) + expect(statusEvents).toHaveLength(1) + expect(statusEvents[0]).toStrictEqual({ + kind: 'status-update', + taskId: 'task-1', + contextId: 'ctx-1', + status: { + state: 'failed', + message: { + kind: 'message', + messageId: expect.any(String), + role: 'agent', + parts: [{ kind: 'text', text: 'Agent execution failed' }], + }, + }, + final: true, + }) + }) + + it('does not leak error details in failed status message', async () => { + const model = new MockMessageModel().addTurn(new Error('Secret internal path /home/user/.env')) + const agent = new Agent({ model, printer: false }) + const executor = new A2AExecutor(agent) + const eventBus = createMockEventBus() + + await executor.execute(createRequestContext('Hello'), eventBus) + + const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update') + const failedMessage = statusEvents[0]!.status.message!.parts[0] + expect(failedMessage).toStrictEqual({ kind: 'text', text: 'Agent execution failed' }) + expect(JSON.stringify(statusEvents[0])).not.toContain('Secret internal path') + }) + + it('transitions to canceled state when CancelledError is thrown', async () => { + const mockAgent: InvokableAgent = { + id: 'test-agent', + name: 'Test Agent', + invoke: vi.fn(), + // eslint-disable-next-line require-yield + async *stream() { + throw new CancelledError() + }, + } + + const executor = new A2AExecutor(mockAgent) + const eventBus = createMockEventBus() + + await executor.execute(createRequestContext('Hello'), eventBus) + + const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update') + expect(statusEvents).toHaveLength(1) + expect(statusEvents[0]!.status.state).toBe('canceled') + expect(statusEvents[0]!.final).toBe(true) + }) + + it('transitions to canceled state when agent returns stopReason cancelled', async () => { + const mockAgent: InvokableAgent = { + id: 'test-agent', + name: 'Test Agent', + invoke: vi.fn(), + // eslint-disable-next-line require-yield + async *stream() { + return new AgentResult({ + stopReason: 'cancelled', + lastMessage: new Message({ role: 'assistant', content: [new TextBlock('Cancelled')] }), + invocationState: {}, + }) + }, + } + + const executor = new A2AExecutor(mockAgent) + const eventBus = createMockEventBus() + + await executor.execute(createRequestContext('Hello'), eventBus) + + const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update') + expect(statusEvents).toHaveLength(1) + expect(statusEvents[0]!.status.state).toBe('canceled') + expect(statusEvents[0]!.final).toBe(true) + }) + + it('transitions to input-required state when agent returns stopReason interrupt', async () => { + const mockAgent: InvokableAgent = { + id: 'test-agent', + name: 'Test Agent', + invoke: vi.fn(), + // eslint-disable-next-line require-yield + async *stream() { + return new AgentResult({ + stopReason: 'interrupt', + lastMessage: new Message({ role: 'assistant', content: [new TextBlock('Need input')] }), + invocationState: {}, + interrupts: [new Interrupt({ id: 'int-1', name: 'confirm', reason: 'Please confirm the action' })], + }) + }, + } + + const executor = new A2AExecutor(mockAgent) + const eventBus = createMockEventBus() + + await executor.execute(createRequestContext('Do something'), eventBus) + + const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update') + expect(statusEvents).toHaveLength(1) + expect(statusEvents[0]!.status.state).toBe('input-required') + expect(statusEvents[0]!.final).toBe(true) + expect(statusEvents[0]!.status.message!.parts[0]).toStrictEqual({ + kind: 'text', + text: '[confirm]: Please confirm the action', + }) + }) + + it('transitions to input-required with generic message when no interrupts provided', async () => { + const mockAgent: InvokableAgent = { + id: 'test-agent', + name: 'Test Agent', + invoke: vi.fn(), + // eslint-disable-next-line require-yield + async *stream() { + return new AgentResult({ + stopReason: 'interrupt', + lastMessage: new Message({ role: 'assistant', content: [new TextBlock('Need input')] }), + invocationState: {}, + }) + }, + } + + const executor = new A2AExecutor(mockAgent) + const eventBus = createMockEventBus() + + await executor.execute(createRequestContext('Do something'), eventBus) + + const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update') + expect(statusEvents[0]!.status.state).toBe('input-required') + expect(statusEvents[0]!.status.message!.parts[0]).toStrictEqual({ + kind: 'text', + text: 'Agent requires additional input', + }) + }) + + it('transitions to input-required with multiple interrupts', async () => { + const mockAgent: InvokableAgent = { + id: 'test-agent', + name: 'Test Agent', + invoke: vi.fn(), + // eslint-disable-next-line require-yield + async *stream() { + return new AgentResult({ + stopReason: 'interrupt', + lastMessage: new Message({ role: 'assistant', content: [new TextBlock('')] }), + invocationState: {}, + interrupts: [ + new Interrupt({ id: 'int-1', name: 'approve', reason: 'Approve deployment' }), + new Interrupt({ id: 'int-2', name: 'select_env', reason: 'Choose environment' }), + ], + }) + }, + } + + const executor = new A2AExecutor(mockAgent) + const eventBus = createMockEventBus() + + await executor.execute(createRequestContext('Deploy'), eventBus) + + const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update') + expect(statusEvents[0]!.status.message!.parts[0]).toStrictEqual({ + kind: 'text', + text: '[approve]: Approve deployment\n[select_env]: Choose environment', + }) }) it('publishes image content blocks as separate file artifacts', async () => { @@ -239,14 +408,61 @@ describe('A2AExecutor', () => { }) describe('cancelTask', () => { - it('throws A2AError.unsupportedOperation', async () => { + it('throws taskNotCancelable when task is not running', async () => { const model = new MockMessageModel().addTurn({ type: 'textBlock', text: '' }) const agent = new Agent({ model, printer: false }) const executor = new A2AExecutor(agent) const eventBus = createMockEventBus() - await expect(executor.cancelTask('task-1', eventBus)).rejects.toThrow('Task cancellation is not supported') - expect(eventBus.events).toStrictEqual([]) + await expect(executor.cancelTask('nonexistent-task', eventBus)).rejects.toThrow() + }) + + it('signals cancellation to a running task via AbortController', async () => { + let cancelSignalReceived: AbortSignal | undefined + const mockAgent: InvokableAgent = { + id: 'test-agent', + name: 'Test Agent', + invoke: vi.fn(), + // eslint-disable-next-line require-yield + async *stream(_args, options) { + cancelSignalReceived = options?.cancelSignal + // Simulate some work + await new Promise((resolve) => setTimeout(resolve, 50)) + // Check if cancelled + if (cancelSignalReceived?.aborted) { + throw new CancelledError() + } + return new AgentResult({ + stopReason: 'endTurn', + lastMessage: new Message({ role: 'assistant', content: [new TextBlock('Done')] }), + invocationState: {}, + }) + }, + } + + const executor = new A2AExecutor(mockAgent) + const eventBus = createMockEventBus() + const context = createRequestContext('Hello', 'cancel-task-1') + + // Start execution in background + const execPromise = executor.execute(context, eventBus) + + // Give it time to start + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Cancel the task + await executor.cancelTask('cancel-task-1', eventBus) + + // Wait for execution to complete + await execPromise + + // Verify the cancel signal was passed to the agent + expect(cancelSignalReceived).toBeDefined() + expect(cancelSignalReceived!.aborted).toBe(true) + + // Verify the task was transitioned to canceled + const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update') + expect(statusEvents.some((e) => e.status.state === 'canceled')).toBe(true) }) }) }) diff --git a/strands-ts/src/a2a/a2a-agent.ts b/strands-ts/src/a2a/a2a-agent.ts index 160ceb38a..a929cf61c 100644 --- a/strands-ts/src/a2a/a2a-agent.ts +++ b/strands-ts/src/a2a/a2a-agent.ts @@ -13,10 +13,52 @@ import { ClientFactory } from '@a2a-js/sdk/client' import type { InvocationState, InvokableAgent, InvokeArgs, InvokeOptions } from '../types/agent.js' import { AgentResult } from '../types/agent.js' import { Message, TextBlock, type ContentBlock, type ContentBlockData, type MessageData } from '../types/messages.js' +import type { StopReason } from '../types/messages.js' import { A2AStreamUpdateEvent, A2AResultEvent, type A2AEventData, type A2AStreamEvent } from './events.js' import { logger } from '../logging/logger.js' import { logExperimentalWarning } from './logging.js' +/** + * Maps A2A task states to Strands StopReason values. + * + * This is the single source of truth for state-to-stopReason mapping. + * Terminal states map to 'endTurn', input states map to 'interrupt'. + */ +const STATE_TO_STOP_REASON: Record = { + completed: 'endTurn', + failed: 'endTurn', + canceled: 'cancelled', + rejected: 'endTurn', + 'input-required': 'interrupt', + 'auth-required': 'interrupt', + unknown: 'endTurn', +} + +/** + * Terminal states that indicate the task is finished. + * Derived from STATE_TO_STOP_REASON for single source of truth. + */ +const TERMINAL_STATES = new Set( + Object.entries(STATE_TO_STOP_REASON) + .filter(([, reason]) => reason === 'endTurn') + .map(([state]) => state) +) + +/** + * Input-required states that indicate the task needs additional input. + * Derived from STATE_TO_STOP_REASON for single source of truth. + */ +const INPUT_STATES = new Set( + Object.entries(STATE_TO_STOP_REASON) + .filter(([, reason]) => reason === 'interrupt') + .map(([state]) => state) +) + +/** + * All complete states (terminal + input). + */ +const COMPLETE_STATES = new Set([...TERMINAL_STATES, ...INPUT_STATES]) + /** * Configuration options for creating an A2AAgent. */ @@ -42,6 +84,17 @@ export interface A2AAgentConfig { * On invocation, the agent lazily connects to the remote endpoint via the A2A protocol * and returns the response as an `AgentResult`. * + * ## Task Lifecycle State Support + * + * The agent recognizes all A2A task lifecycle states and maps them to appropriate + * `stopReason` values: + * - `completed`, `failed`, `rejected`, `unknown` → `stopReason: 'endTurn'` + * - `canceled` → `stopReason: 'cancelled'` + * - `input-required`, `auth-required` → `stopReason: 'interrupt'` + * + * The task state is also included in the result's `invocationState` under the + * key `a2aTaskState` for downstream consumers. + * * @example * ```typescript * import { A2AAgent } from '@strands-agents/sdk/a2a' @@ -49,6 +102,7 @@ export interface A2AAgentConfig { * const remoteAgent = new A2AAgent({ url: 'http://localhost:9000' }) * const result = await remoteAgent.invoke('Hello, remote agent!') * console.log(result.toString()) + * console.log(result.invocationState.a2aTaskState) // 'completed' * ``` */ export class A2AAgent implements InvokableAgent { @@ -228,6 +282,10 @@ export class A2AAgent implements InvokableAgent { /** * Checks whether an A2A streaming event represents a complete response. * + * Recognizes all terminal and input-required states from the A2A task lifecycle: + * - Terminal: completed, failed, canceled, rejected + * - Input: input-required, auth-required + * * @param event - The A2A streaming event * @returns True if the event is a terminal/complete event */ @@ -235,13 +293,33 @@ export class A2AAgent implements InvokableAgent { if (event.kind === 'message') return true if (event.kind === 'task') return true if (event.kind === 'artifact-update') return event.lastChunk === true - if (event.kind === 'status-update') return event.status.state === 'completed' + if (event.kind === 'status-update') { + return COMPLETE_STATES.has(event.status.state) + } return false } + /** + * Extracts the A2A task state from the final event. + * + * @param event - The final A2A event + * @returns The task state string, or undefined if not available + */ + private _extractTaskState(event: A2AEventData | undefined): string | undefined { + if (!event) return undefined + if (event.kind === 'task') return event.status?.state + if (event.kind === 'status-update') return event.status.state + return undefined + } + /** * Builds an AgentResult from the final A2A streaming event. * + * Maps the A2A task state to the appropriate StopReason: + * - completed/failed/rejected/unknown → 'endTurn' + * - canceled → 'cancelled' + * - input-required/auth-required → 'interrupt' + * * @param event - The final A2A event, or undefined if no events were received * @param invocationState - Caller-provided invocation state, threaded through to the result * @param accumulatedText - Optional accumulated text from streaming artifacts @@ -257,7 +335,18 @@ export class A2AAgent implements InvokableAgent { role: 'assistant', content: [new TextBlock(text)], }) - return new AgentResult({ stopReason: 'endTurn', lastMessage, invocationState }) + + // Determine stopReason from task state + const taskState = this._extractTaskState(event) + const stopReason: StopReason = taskState ? (STATE_TO_STOP_REASON[taskState] ?? 'endTurn') : 'endTurn' + + // Include task state in invocationState for downstream consumers + const enrichedState: InvocationState = { + ...invocationState, + ...(taskState ? { a2aTaskState: taskState } : {}), + } + + return new AgentResult({ stopReason, lastMessage, invocationState: enrichedState }) } /** diff --git a/strands-ts/src/a2a/executor.ts b/strands-ts/src/a2a/executor.ts index 0ba75f388..8f32665e4 100644 --- a/strands-ts/src/a2a/executor.ts +++ b/strands-ts/src/a2a/executor.ts @@ -2,16 +2,19 @@ * A2A executor that bridges a Strands Agent into the A2A protocol. * * Implements the AgentExecutor interface from `@a2a-js/sdk/server` to allow - * a Strands Agent to handle A2A JSON-RPC requests. + * a Strands Agent to handle A2A JSON-RPC requests. Supports the full A2A + * task lifecycle including `completed`, `failed`, `input-required`, and + * `canceled` states. */ import type { ExecutionEventBus, RequestContext } from '@a2a-js/sdk/server' import type { AgentExecutor } from '@a2a-js/sdk/server' import { A2AError } from '@a2a-js/sdk/server' +import type { Part } from '@a2a-js/sdk' import type { InvokableAgent } from '../types/agent.js' import { ModelStreamUpdateEvent, ContentBlockEvent } from '../hooks/events.js' import { contentBlocksToParts, partsToContentBlocks } from './adapters.js' -import { normalizeError } from '../errors.js' +import { CancelledError, normalizeError } from '../errors.js' import { logger } from '../logging/logger.js' /** @@ -22,6 +25,14 @@ import { logger } from '../logging/logger.js' * event bus. Text chunks are appended to a single artifact as they arrive, * implementing A2A-compliant streaming behavior. * + * ## Task Lifecycle States + * + * The executor maps agent execution outcomes to A2A task states: + * - **completed** — Agent finished successfully with `stopReason: 'endTurn'` + * - **failed** — Agent threw an error during execution + * - **input-required** — Agent returned with `stopReason: 'interrupt'` + * - **canceled** — Task was canceled via `cancelTask()` or agent was cancelled + * * ## Invocation state * * The executor populates the agent's `invocationState` with the incoming A2A @@ -46,6 +57,7 @@ import { logger } from '../logging/logger.js' */ export class A2AExecutor implements AgentExecutor { private _agent: InvokableAgent + private _runningTasks: Map = new Map() /** * Creates a new A2AExecutor. @@ -63,8 +75,13 @@ export class A2AExecutor implements AgentExecutor { * agent execution. Text deltas are streamed incrementally into a single * artifact; non-text content blocks (images, videos, documents) are each * published as separate complete artifacts. A final artifact with - * `lastChunk: true` signals the end of the text artifact, followed by a - * completed status update. + * `lastChunk: true` signals the end of the text artifact. + * + * The final task state depends on the agent's execution outcome: + * - Normal completion → `completed` + * - Agent interrupts (needs human input) → `input-required` + * - Agent throws an error → `failed` + * - Agent was cancelled → `canceled` * * @param context - The A2A request context containing the user message * @param eventBus - The event bus for publishing A2A artifact and status events @@ -83,12 +100,17 @@ export class A2AExecutor implements AgentExecutor { const artifactId = globalThis.crypto.randomUUID() let isFirstChunk = true + // Create an AbortController for this task so cancelTask() can signal cancellation + const abortController = new AbortController() + this._runningTasks.set(taskId, abortController) + try { // Forward the A2A RequestContext to the agent under a reserved key so // hooks and tools can correlate with the A2A request (taskId, contextId, // user message metadata). const stream = this._agent.stream(contentBlocks, { invocationState: { a2aRequestContext: context }, + cancelSignal: abortController.signal, }) let next = await stream.next() @@ -132,6 +154,9 @@ export class A2AExecutor implements AgentExecutor { next = await stream.next() } + // The stream is done — next.value is the AgentResult + const result = next.value + // Publish final artifact chunk to signal end of artifact eventBus.publish({ kind: 'artifact-update', @@ -140,26 +165,106 @@ export class A2AExecutor implements AgentExecutor { artifact: { artifactId, // If no deltas were streamed, publish the full result; otherwise empty to close the artifact - parts: [{ kind: 'text', text: isFirstChunk && next.value ? next.value.toString() : '' }], + parts: [{ kind: 'text', text: isFirstChunk && result ? result.toString() : '' }], }, append: !isFirstChunk, // false for new artifact, true to append to streamed chunks lastChunk: true, // Always true — this runs after the stream loop ends }) - eventBus.publish({ kind: 'status-update', taskId, contextId, status: { state: 'completed' }, final: true }) + // Determine final task state based on agent result + if (result.stopReason === 'interrupt') { + // Agent needs human input — transition to input-required + const interruptParts: Part[] = [] + if (result.interrupts && result.interrupts.length > 0) { + const interruptText = result.interrupts.map((i) => `[${i.name}]: ${i.reason ?? 'Input required'}`).join('\n') + interruptParts.push({ kind: 'text', text: interruptText }) + } else { + interruptParts.push({ kind: 'text', text: 'Agent requires additional input' }) + } + eventBus.publish({ + kind: 'status-update', + taskId, + contextId, + status: { + state: 'input-required', + message: { + kind: 'message', + messageId: globalThis.crypto.randomUUID(), + role: 'agent', + parts: interruptParts, + }, + }, + final: true, + }) + } else if (result.stopReason === 'cancelled') { + // Agent was cancelled cooperatively + eventBus.publish({ + kind: 'status-update', + taskId, + contextId, + status: { state: 'canceled' }, + final: true, + }) + } else { + // Normal completion (endTurn, maxTokens, etc.) + eventBus.publish({ kind: 'status-update', taskId, contextId, status: { state: 'completed' }, final: true }) + } } catch (error) { - logger.error(`task_id=<${taskId}> | error in streaming execution`, normalizeError(error)) - throw error + if (error instanceof CancelledError) { + // Agent cancellation via CancelledError — transition to canceled + logger.debug(`task_id=<${taskId}> | agent execution cancelled`) + eventBus.publish({ + kind: 'status-update', + taskId, + contextId, + status: { state: 'canceled' }, + final: true, + }) + } else { + // Agent execution failed — transition to failed state + logger.error(`task_id=<${taskId}> | agent execution failed`, normalizeError(error)) + eventBus.publish({ + kind: 'status-update', + taskId, + contextId, + status: { + state: 'failed', + message: { + kind: 'message', + messageId: globalThis.crypto.randomUUID(), + role: 'agent', + parts: [{ kind: 'text', text: 'Agent execution failed' }], + }, + }, + final: true, + }) + } + } finally { + this._runningTasks.delete(taskId) } } /** - * Cancels a running task. Not supported by this executor. + * Cancels a running task by signaling the agent to stop. + * + * Uses cooperative cancellation via AbortController. The agent will stop + * at the next cancellation checkpoint and the task transitions to `canceled`. + * If the task is not currently running, throws A2AError.taskNotCancelable. * * @param taskId - The ID of the task to cancel * @param eventBus - The event bus for publishing status events */ - async cancelTask(_taskId: string, _eventBus: ExecutionEventBus): Promise { - throw A2AError.unsupportedOperation('Task cancellation is not supported') + async cancelTask(taskId: string, _eventBus: ExecutionEventBus): Promise { + const abortController = this._runningTasks.get(taskId) + if (!abortController) { + throw A2AError.taskNotCancelable(taskId) + } + + // Signal cancellation — the agent will stop at the next checkpoint + abortController.abort() + logger.debug(`task_id=<${taskId}> | cancel signal sent`) + + // Note: The execute() method handles publishing the 'canceled' status + // when it detects the CancelledError or cancelled stopReason. } }