Skip to content

Commit 97f8422

Browse files
committed
fix(a2a-server): prevent stale awaiting_approval events from prematurely closing executor loop
1 parent 33dd56f commit 97f8422

1 file changed

Lines changed: 51 additions & 8 deletions

File tree

  • packages/a2a-server/src/agent

packages/a2a-server/src/agent/task.ts

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ export class Task {
115115

116116
// For tool waiting logic
117117
private pendingToolCalls: Map<string, string> = new Map(); //toolCallId --> status
118+
private toolsAlreadyConfirmed: Set<string> = new Set();
118119
private toolCompletionPromise?: Promise<void>;
119120
private toolCompletionNotifier?: {
120121
resolve: () => void;
@@ -555,6 +556,14 @@ export class Task {
555556

556557
private handleEventDrivenToolCall(tc: ToolCall): void {
557558
const callId = tc.request.callId;
559+
560+
// Do not process events for tools that have already been finalized.
561+
// This prevents duplicate completions if the state manager emits a snapshot containing
562+
// already resolved tools whose IDs were removed from pendingToolCalls.
563+
if (this.completedToolCalls.some((c) => c.request.callId === callId)) {
564+
return;
565+
}
566+
558567
const previousStatus = this.pendingToolCalls.get(callId);
559568
const hasChanged = previousStatus !== tc.status;
560569

@@ -569,6 +578,7 @@ export class Task {
569578
tc.status === 'error' ||
570579
tc.status === 'cancelled'
571580
) {
581+
this.toolsAlreadyConfirmed.delete(callId);
572582
if (hasChanged) {
573583
logger.info(
574584
`[Task] Tool call ${callId} completed with status: ${tc.status}`,
@@ -631,27 +641,41 @@ export class Task {
631641

632642
private checkInputRequiredState(): void {
633643
// 6. Handle Input Required State
634-
const allPendingStatuses = Array.from(this.pendingToolCalls.values());
635-
const isAwaitingApproval = allPendingStatuses.some(
636-
(status) => status === 'awaiting_approval',
637-
);
638-
const isExecuting = allPendingStatuses.some(
639-
(status) => status === 'executing',
640-
);
644+
let isAwaitingApproval = false;
645+
let isExecuting = false;
646+
647+
for (const [callId, status] of this.pendingToolCalls.entries()) {
648+
if (status === 'executing' || status === 'scheduled') {
649+
isExecuting = true;
650+
} else if (
651+
status === 'awaiting_approval' &&
652+
!this.toolsAlreadyConfirmed.has(callId)
653+
) {
654+
isAwaitingApproval = true;
655+
}
656+
}
641657

642658
if (
643659
isAwaitingApproval &&
644660
!isExecuting &&
645661
!this.skipFinalTrueAfterInlineEdit
646662
) {
647663
this.skipFinalTrueAfterInlineEdit = false;
664+
const wasAlreadyInputRequired = this.taskState === 'input-required';
665+
648666
this.setTaskStateAndPublishUpdate(
649667
'input-required',
650668
{ kind: CoderAgentEvent.StateChangeEvent },
651669
undefined,
652670
undefined,
653671
/*final*/ true,
654672
);
673+
674+
// Unblock waitForPendingTools to correctly end the executor loop and release the HTTP response stream.
675+
// The IDE client will open a new stream with the confirmation reply.
676+
if (!wasAlreadyInputRequired && this.toolCompletionNotifier) {
677+
this.toolCompletionNotifier.resolve();
678+
}
655679
}
656680
}
657681

@@ -865,7 +889,16 @@ export class Task {
865889
};
866890
this.setTaskStateAndPublishUpdate('working', stateChange);
867891

868-
await this.scheduler.schedule(updatedRequests, abortSignal);
892+
// Pre-register tools to ensure waitForPendingTools sees them as pending
893+
// before the async scheduler enqueues them and fires the event bus update.
894+
for (const req of updatedRequests) {
895+
if (!this.pendingToolCalls.has(req.callId)) {
896+
this._registerToolCall(req.callId, 'scheduled');
897+
}
898+
}
899+
900+
// Fire and forget so we don't block the executor loop before waitForPendingTools can be called
901+
void this.scheduler.schedule(updatedRequests, abortSignal);
869902
}
870903

871904
async acceptAgentMessage(event: ServerGeminiStreamEvent): Promise<void> {
@@ -991,9 +1024,15 @@ export class Task {
9911024
) {
9921025
return false;
9931026
}
1027+
if (!part.data['outcome']) {
1028+
return false;
1029+
}
9941030

9951031
const callId = part.data['callId'];
9961032
const outcomeString = part.data['outcome'];
1033+
1034+
this.toolsAlreadyConfirmed.add(callId);
1035+
9971036
let confirmationOutcome: ToolConfirmationOutcome | undefined;
9981037

9991038
if (outcomeString === 'proceed_once') {
@@ -1199,6 +1238,10 @@ export class Task {
11991238
if (confirmationHandled) {
12001239
anyConfirmationHandled = true;
12011240
// If a confirmation was handled, the scheduler will now run the tool (or cancel it).
1241+
// We resolve the toolCompletionPromise manually in checkInputRequiredState
1242+
// to break the original execution loop, so we must reset it here so the
1243+
// new loop correctly awaits the tool's final execution.
1244+
this._resetToolCompletionPromise();
12021245
// We don't send anything to the LLM for this part.
12031246
// The subsequent tool execution will eventually lead to resolveToolCall.
12041247
continue;

0 commit comments

Comments
 (0)