Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions packages/a2a-server/src/agent/race-condition.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/**
* @license
* Copyright 2026 Google LLC
* SPDX-License-Identifier: Apache-2.0
*/

import { describe, it, expect, vi, beforeEach, type Mock } from 'vitest';
import { Task } from './task.js';
import {
MessageBusType,
CoreToolCallStatus,
type Config,
type MessageBus,
} from '@google/gemini-cli-core';
import { createMockConfig } from '../utils/testing_utils.js';
import type { RequestContext } from '@a2a-js/sdk/server';

describe('Task Race Condition', () => {
let mockConfig: Config;
let messageBus: MessageBus;

beforeEach(() => {
messageBus = {
subscribe: vi.fn(),
unsubscribe: vi.fn(),
publish: vi.fn(),
} as unknown as MessageBus;
mockConfig = createMockConfig({
messageBus,
}) as Config;
});

it('should not hang when multiple tool confirmations are processed while waiting', async () => {
// @ts-expect-error - private constructor
const task = new Task('task-id', 'context-id', mockConfig);

// 1. Register two tools as scheduled
task['_registerToolCall']('tool-1', 'scheduled');
task['_registerToolCall']('tool-2', 'scheduled');

// 2. Both transition to awaiting_approval
const updateHandler = (messageBus.subscribe as Mock).mock.calls.find(
(c: unknown[]) => c[0] === MessageBusType.TOOL_CALLS_UPDATE,
)?.[1];

updateHandler({
type: MessageBusType.TOOL_CALLS_UPDATE,
schedulerId: 'task-id',
toolCalls: [
{
request: { callId: 'tool-1', name: 't1' },
status: CoreToolCallStatus.AwaitingApproval,
correlationId: 'corr-1',
confirmationDetails: { type: 'info' },
},
{
request: { callId: 'tool-2', name: 't2' },
status: CoreToolCallStatus.AwaitingApproval,
correlationId: 'corr-2',
confirmationDetails: { type: 'info' },
},
],
});

// 3. Confirm Tool 1. This makes isAwaitingApprovalOnly() return false.
for await (const _ of task.acceptUserMessage(
{
userMessage: {
parts: [
{
kind: 'data',
data: { callId: 'tool-1', outcome: 'proceed_once' },
},
],
},
} as unknown as RequestContext,
new AbortController().signal,
)) {
// consume generator
}

// 4. Start waiting. This should now block because Tool 1 is confirmed (so we are waiting for its execution).
const waitPromise = task.waitForPendingTools();

// 5. Confirm Tool 2 while waiting.
for await (const _ of task.acceptUserMessage(
{
userMessage: {
parts: [
{
kind: 'data',
data: { callId: 'tool-2', outcome: 'proceed_once' },
},
],
},
} as unknown as RequestContext,
new AbortController().signal,
)) {
// consume generator
}

// 6. Both tools complete successfully
updateHandler({
type: MessageBusType.TOOL_CALLS_UPDATE,
schedulerId: 'task-id',
toolCalls: [
{
request: { callId: 'tool-1', name: 't1' },
status: CoreToolCallStatus.Success,
response: { responseParts: [] },
},
{
request: { callId: 'tool-2', name: 't2' },
status: CoreToolCallStatus.Success,
response: { responseParts: [] },
},
],
});

// 7. Verify that the original waitPromise resolves.
await expect(waitPromise).resolves.toBeUndefined();
});

it('should reject waitForPendingTools when tools are cancelled', async () => {
// @ts-expect-error - private constructor
const task = new Task('task-id', 'context-id', mockConfig);

// 1. Register a tool
task['_registerToolCall']('tool-1', 'scheduled');

// 2. Start waiting
const waitPromise = task.waitForPendingTools();

// 3. Cancel pending tools
task.cancelPendingTools('User requested cancellation');

// 4. Verify waitPromise rejects with the reason
await expect(waitPromise).rejects.toThrow('User requested cancellation');
});

it('should handle concurrent tool scheduling correctly', async () => {
// @ts-expect-error - private constructor
const task = new Task('task-id', 'context-id', mockConfig);

// 1. Register a tool and start waiting
task['_registerToolCall']('tool-1', 'scheduled');
const waitPromise = task.waitForPendingTools();

// 2. Schedule another tool concurrently (e.g. from a secondary user message)
// This should NOT resolve the current waitPromise until both are done
await task.scheduleToolCalls(
[{ callId: 'tool-2', name: 't2', args: {} }],
new AbortController().signal,
);

expect(task['pendingToolCalls'].size).toBe(2);

// 3. Resolve tool 1
task['_resolveToolCall']('tool-1');

// 4. Verify waitPromise is still pending
let resolved = false;
waitPromise.then(() => (resolved = true));
await new Promise((resolve) => setTimeout(resolve, 10));
expect(resolved).toBe(false);

// 5. Resolve tool 2
task['_resolveToolCall']('tool-2');

// 6. Now it should resolve
await expect(waitPromise).resolves.toBeUndefined();
});
});
69 changes: 69 additions & 0 deletions packages/a2a-server/src/agent/task-event-driven.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
ApprovalMode,
Scheduler,
type MessageBus,
type ToolLiveOutput,
} from '@google/gemini-cli-core';
import { createMockConfig } from '../utils/testing_utils.js';
import type { ExecutionEventBus } from '@a2a-js/sdk/server';
Expand Down Expand Up @@ -608,6 +609,74 @@ describe('Task Event-Driven Scheduler', () => {
);
});

it('should handle multi-turn tool resolution correctly', async () => {
// @ts-expect-error - Calling private constructor
const task = new Task('task-id', 'context-id', mockConfig);

task['_registerToolCall']('1', 'scheduled');
task['_registerToolCall']('2', 'scheduled');

const handler = (messageBus.subscribe as Mock).mock.calls.find(
(call: unknown[]) => call[0] === MessageBusType.TOOL_CALLS_UPDATE,
)?.[1];

// Turn 1: Resolve tool 1
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [
{
request: { callId: '1', name: 't1' },
status: 'success',
response: { responseParts: [] },
},
],
schedulerId: 'task-id',
});

expect(task['pendingToolCalls'].size).toBe(1);
expect(task['pendingToolCalls'].has('2')).toBe(true);

// Turn 2: Resolve tool 2
handler({
type: MessageBusType.TOOL_CALLS_UPDATE,
toolCalls: [
{
request: { callId: '2', name: 't2' },
status: 'success',
response: { responseParts: [] },
},
],
schedulerId: 'task-id',
});

expect(task['pendingToolCalls'].size).toBe(0);
});

it('should handle subagent progress events from the scheduler', async () => {
// @ts-expect-error - Calling private constructor
const task = new Task('task-id', 'context-id', mockConfig, mockEventBus);

// Trigger _schedulerOutputUpdate with subagent progress
task['_schedulerOutputUpdate']('tool-1', {
isSubagentProgress: true,
agentName: 'researcher',
recentActivity: [],
} as ToolLiveOutput);

expect(mockEventBus.publish).toHaveBeenCalledWith(
expect.objectContaining({
kind: 'artifact-update',
artifact: expect.objectContaining({
parts: [
expect.objectContaining({
text: expect.stringContaining('researcher'),
}),
],
}),
}),
);
});

it('should wait for executing tools before transitioning to input-required state', async () => {
// @ts-expect-error - Calling private constructor
const task = new Task('task-id', 'context-id', mockConfig, mockEventBus);
Expand Down
Loading
Loading