Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions strands-ts/src/a2a/__tests__/a2a-agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,102 @@ describe('A2AAgent', () => {
})
})

describe('task lifecycle state mapping', () => {
it.each([
{ state: 'completed', expectedStopReason: 'endTurn', expectedTaskState: 'completed' },
{ state: 'failed', expectedStopReason: 'endTurn', expectedTaskState: 'failed' },
{ state: 'canceled', expectedStopReason: 'cancelled', expectedTaskState: 'canceled' },
{ state: 'rejected', expectedStopReason: 'endTurn', expectedTaskState: 'rejected' },
{ state: 'input-required', expectedStopReason: 'interrupt', expectedTaskState: 'input-required' },
{ state: 'auth-required', expectedStopReason: 'interrupt', expectedTaskState: 'auth-required' },
{ state: 'unknown', expectedStopReason: 'endTurn', expectedTaskState: 'unknown' },
])('maps $state to stopReason=$expectedStopReason', async ({ state, expectedStopReason, expectedTaskState }) => {
const statusUpdate: TaskStatusUpdateEvent = {
kind: 'status-update',
taskId: 'task-1',
contextId: 'ctx-1',
status: {
state: state as TaskStatusUpdateEvent['status']['state'],
message: {
kind: 'message',
messageId: 'msg-1',
role: 'agent',
parts: [{ kind: 'text', text: `Task ${state}` }],
},
},
final: true,
}
mockSendMessageStream.mockReturnValue(mockStream(statusUpdate))

const agent = new A2AAgent({ url: 'http://localhost:9000' })
const result = await agent.invoke('Hello')

expect(result.stopReason).toBe(expectedStopReason)
expect(result.invocationState.a2aTaskState).toBe(expectedTaskState)
})

it('includes a2aTaskState in invocationState for completed Task response', async () => {
const agent = new A2AAgent({ url: 'http://localhost:9000' })
const result = await agent.invoke('Hello')

expect(result.invocationState.a2aTaskState).toBe('completed')
})

it('recognizes all terminal states as complete events (stops streaming)', async () => {
// Simulate: working → failed (terminal) — should not loop forever
const workingUpdate: TaskStatusUpdateEvent = {
kind: 'status-update',
taskId: 'task-1',
contextId: 'ctx-1',
status: { state: 'working' },
final: false,
}
const failedUpdate: TaskStatusUpdateEvent = {
kind: 'status-update',
taskId: 'task-1',
contextId: 'ctx-1',
status: {
state: 'failed',
message: { kind: 'message', messageId: 'msg-1', role: 'agent', parts: [{ kind: 'text', text: 'Error' }] },
},
final: true,
}
mockSendMessageStream.mockReturnValue(mockStream(workingUpdate, failedUpdate))

const agent = new A2AAgent({ url: 'http://localhost:9000' })
const result = await agent.invoke('Hello')

expect(result.stopReason).toBe('endTurn')
expect(result.invocationState.a2aTaskState).toBe('failed')
})

it('recognizes input-required as complete event', async () => {
const inputRequired: TaskStatusUpdateEvent = {
kind: 'status-update',
taskId: 'task-1',
contextId: 'ctx-1',
status: {
state: 'input-required',
message: {
kind: 'message',
messageId: 'msg-1',
role: 'agent',
parts: [{ kind: 'text', text: 'Need more info' }],
},
},
final: true,
}
mockSendMessageStream.mockReturnValue(mockStream(inputRequired))

const agent = new A2AAgent({ url: 'http://localhost:9000' })
const result = await agent.invoke('Hello')

expect(result.stopReason).toBe('interrupt')
expect(result.invocationState.a2aTaskState).toBe('input-required')
expect((result.lastMessage.content[0] as TextBlock).text).toBe('Need more info')
})
})

describe('stream', () => {
it('yields A2AStreamUpdateEvent for each A2A event and A2AResultEvent at the end', async () => {
const task = createMockTaskResponse()
Expand Down
236 changes: 226 additions & 10 deletions strands-ts/src/a2a/__tests__/executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import { ImageBlock, encodeBase64 } from '../../types/media.js'
import { ContentBlockEvent, ModelStreamUpdateEvent } from '../../hooks/events.js'
import { AgentResult } from '../../types/agent.js'
import { Message } from '../../types/messages.js'
import { CancelledError } from '../../errors.js'
import { Interrupt } from '../../interrupt.js'

function createMockEventBus(): ExecutionEventBus & { events: AgentExecutionEvent[] } {
const events: AgentExecutionEvent[] = []
Expand Down Expand Up @@ -135,7 +137,7 @@ describe('A2AExecutor', () => {

expect(agent.stream).toHaveBeenCalledWith(
[new TextBlock('Line 1'), new TextBlock('[File: file (file://test.txt)]'), new TextBlock('Line 2')],
{ invocationState: { a2aRequestContext: context } }
{ invocationState: { a2aRequestContext: context }, cancelSignal: expect.any(AbortSignal) }
)
})

Expand All @@ -154,18 +156,185 @@ describe('A2AExecutor', () => {
expect(options?.invocationState).toEqual({ a2aRequestContext: context })
})

it('re-throws when agent throws, publishing only the initial task event', async () => {
it('transitions to failed state when agent throws', async () => {
const model = new MockMessageModel().addTurn(new Error('Agent failed'))
const agent = new Agent({ model, printer: false })
const executor = new A2AExecutor(agent)
const eventBus = createMockEventBus()

await expect(executor.execute(createRequestContext('Hello'), eventBus)).rejects.toThrow('Agent failed')
await executor.execute(createRequestContext('Hello'), eventBus)

// Only the initial task registration event is published before the error
expect(eventBus.events).toStrictEqual([
{ kind: 'task', id: 'task-1', contextId: 'ctx-1', status: { state: 'working' } },
])
const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update')

// Should have a failed status event (not throw)
expect(statusEvents).toHaveLength(1)
expect(statusEvents[0]).toStrictEqual({
kind: 'status-update',
taskId: 'task-1',
contextId: 'ctx-1',
status: {
state: 'failed',
message: {
kind: 'message',
messageId: expect.any(String),
role: 'agent',
parts: [{ kind: 'text', text: 'Agent execution failed' }],
},
},
final: true,
})
})

it('does not leak error details in failed status message', async () => {
const model = new MockMessageModel().addTurn(new Error('Secret internal path /home/user/.env'))
const agent = new Agent({ model, printer: false })
const executor = new A2AExecutor(agent)
const eventBus = createMockEventBus()

await executor.execute(createRequestContext('Hello'), eventBus)

const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update')
const failedMessage = statusEvents[0]!.status.message!.parts[0]
expect(failedMessage).toStrictEqual({ kind: 'text', text: 'Agent execution failed' })
expect(JSON.stringify(statusEvents[0])).not.toContain('Secret internal path')
})

it('transitions to canceled state when CancelledError is thrown', async () => {
const mockAgent: InvokableAgent = {
id: 'test-agent',
name: 'Test Agent',
invoke: vi.fn(),
// eslint-disable-next-line require-yield
async *stream() {
throw new CancelledError()
},
}

const executor = new A2AExecutor(mockAgent)
const eventBus = createMockEventBus()

await executor.execute(createRequestContext('Hello'), eventBus)

const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update')
expect(statusEvents).toHaveLength(1)
expect(statusEvents[0]!.status.state).toBe('canceled')
expect(statusEvents[0]!.final).toBe(true)
})

it('transitions to canceled state when agent returns stopReason cancelled', async () => {
const mockAgent: InvokableAgent = {
id: 'test-agent',
name: 'Test Agent',
invoke: vi.fn(),
// eslint-disable-next-line require-yield
async *stream() {
return new AgentResult({
stopReason: 'cancelled',
lastMessage: new Message({ role: 'assistant', content: [new TextBlock('Cancelled')] }),
invocationState: {},
})
},
}

const executor = new A2AExecutor(mockAgent)
const eventBus = createMockEventBus()

await executor.execute(createRequestContext('Hello'), eventBus)

const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update')
expect(statusEvents).toHaveLength(1)
expect(statusEvents[0]!.status.state).toBe('canceled')
expect(statusEvents[0]!.final).toBe(true)
})

it('transitions to input-required state when agent returns stopReason interrupt', async () => {
const mockAgent: InvokableAgent = {
id: 'test-agent',
name: 'Test Agent',
invoke: vi.fn(),
// eslint-disable-next-line require-yield
async *stream() {
return new AgentResult({
stopReason: 'interrupt',
lastMessage: new Message({ role: 'assistant', content: [new TextBlock('Need input')] }),
invocationState: {},
interrupts: [new Interrupt({ id: 'int-1', name: 'confirm', reason: 'Please confirm the action' })],
})
},
}

const executor = new A2AExecutor(mockAgent)
const eventBus = createMockEventBus()

await executor.execute(createRequestContext('Do something'), eventBus)

const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update')
expect(statusEvents).toHaveLength(1)
expect(statusEvents[0]!.status.state).toBe('input-required')
expect(statusEvents[0]!.final).toBe(true)
expect(statusEvents[0]!.status.message!.parts[0]).toStrictEqual({
kind: 'text',
text: '[confirm]: Please confirm the action',
})
})

it('transitions to input-required with generic message when no interrupts provided', async () => {
const mockAgent: InvokableAgent = {
id: 'test-agent',
name: 'Test Agent',
invoke: vi.fn(),
// eslint-disable-next-line require-yield
async *stream() {
return new AgentResult({
stopReason: 'interrupt',
lastMessage: new Message({ role: 'assistant', content: [new TextBlock('Need input')] }),
invocationState: {},
})
},
}

const executor = new A2AExecutor(mockAgent)
const eventBus = createMockEventBus()

await executor.execute(createRequestContext('Do something'), eventBus)

const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update')
expect(statusEvents[0]!.status.state).toBe('input-required')
expect(statusEvents[0]!.status.message!.parts[0]).toStrictEqual({
kind: 'text',
text: 'Agent requires additional input',
})
})

it('transitions to input-required with multiple interrupts', async () => {
const mockAgent: InvokableAgent = {
id: 'test-agent',
name: 'Test Agent',
invoke: vi.fn(),
// eslint-disable-next-line require-yield
async *stream() {
return new AgentResult({
stopReason: 'interrupt',
lastMessage: new Message({ role: 'assistant', content: [new TextBlock('')] }),
invocationState: {},
interrupts: [
new Interrupt({ id: 'int-1', name: 'approve', reason: 'Approve deployment' }),
new Interrupt({ id: 'int-2', name: 'select_env', reason: 'Choose environment' }),
],
})
},
}

const executor = new A2AExecutor(mockAgent)
const eventBus = createMockEventBus()

await executor.execute(createRequestContext('Deploy'), eventBus)

const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update')
expect(statusEvents[0]!.status.message!.parts[0]).toStrictEqual({
kind: 'text',
text: '[approve]: Approve deployment\n[select_env]: Choose environment',
})
})

it('publishes image content blocks as separate file artifacts', async () => {
Expand Down Expand Up @@ -239,14 +408,61 @@ describe('A2AExecutor', () => {
})

describe('cancelTask', () => {
it('throws A2AError.unsupportedOperation', async () => {
it('throws taskNotCancelable when task is not running', async () => {
const model = new MockMessageModel().addTurn({ type: 'textBlock', text: '' })
const agent = new Agent({ model, printer: false })
const executor = new A2AExecutor(agent)
const eventBus = createMockEventBus()

await expect(executor.cancelTask('task-1', eventBus)).rejects.toThrow('Task cancellation is not supported')
expect(eventBus.events).toStrictEqual([])
await expect(executor.cancelTask('nonexistent-task', eventBus)).rejects.toThrow()
})

it('signals cancellation to a running task via AbortController', async () => {
let cancelSignalReceived: AbortSignal | undefined
const mockAgent: InvokableAgent = {
id: 'test-agent',
name: 'Test Agent',
invoke: vi.fn(),
// eslint-disable-next-line require-yield
async *stream(_args, options) {
cancelSignalReceived = options?.cancelSignal
// Simulate some work
await new Promise((resolve) => setTimeout(resolve, 50))
// Check if cancelled
if (cancelSignalReceived?.aborted) {
throw new CancelledError()
}
return new AgentResult({
stopReason: 'endTurn',
lastMessage: new Message({ role: 'assistant', content: [new TextBlock('Done')] }),
invocationState: {},
})
},
}

const executor = new A2AExecutor(mockAgent)
const eventBus = createMockEventBus()
const context = createRequestContext('Hello', 'cancel-task-1')

// Start execution in background
const execPromise = executor.execute(context, eventBus)

// Give it time to start
await new Promise((resolve) => setTimeout(resolve, 10))

// Cancel the task
await executor.cancelTask('cancel-task-1', eventBus)

// Wait for execution to complete
await execPromise

// Verify the cancel signal was passed to the agent
expect(cancelSignalReceived).toBeDefined()
expect(cancelSignalReceived!.aborted).toBe(true)

// Verify the task was transitioned to canceled
const statusEvents = eventBus.events.filter((e): e is TaskStatusUpdateEvent => e.kind === 'status-update')
expect(statusEvents.some((e) => e.status.state === 'canceled')).toBe(true)
})
})
})
Loading
Loading