Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions packages/cli/src/ui/hooks/useGeminiStream.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,135 @@ describe('useGeminiStream', () => {
});
});

it('drops a late tool result whose callId is already paired in chat.history (Race A dedup)', async () => {
// Race A repro: the chat-internal repair pass already synthesized a
// functionResponse for this callId on the Retry push (because the
// partial-tool_use turn was orphan when Ctrl+Y landed). The live
// scheduler's late real result must NOT also be submitted, otherwise
// the wire payload would carry two functionResponse parts for the
// same callId and the second one would land as an orphan tool_result.
// The dedup MUST run regardless of `isResponding`, because the
// scheduler's `onAllToolCallsComplete` is single-shot and would
// otherwise leave the tool stuck in `completed-but-not-submitted`.
const lateRealResult = {
request: {
callId: 'call_race_A',
name: 'read_file',
args: { path: '/tmp/x.txt' },
isClientInitiated: false,
prompt_id: 'prompt-race-a',
},
status: 'success',
responseSubmittedToGemini: false,
response: {
callId: 'call_race_A',
responseParts: [
{
functionResponse: {
id: 'call_race_A',
name: 'read_file',
response: { output: 'real file contents' },
},
},
],
resultDisplay: undefined,
error: undefined,
errorType: undefined,
},
tool: {
name: 'read_file',
displayName: 'ReadFile',
description: 'Read a file',
build: vi.fn(),
} as any,
invocation: {
getDescription: () => 'read /tmp/x.txt',
} as unknown as AnyToolInvocation,
} as unknown as TrackedCompletedToolCall;

const client = new MockedGeminiClientClass(mockConfig);
// Simulate the chat-internal repair pass having already planted a
// synthetic functionResponse for the same callId on the previous
// (Retry) push.
client.getHistory = vi.fn().mockReturnValue([
{ role: 'user', parts: [{ text: 'open /tmp/x.txt' }] },
{
role: 'model',
parts: [
{
functionCall: {
id: 'call_race_A',
name: 'read_file',
args: { path: '/tmp/x.txt' },
},
},
],
},
{
role: 'user',
parts: [
{ text: 'retry' },
{
functionResponse: {
id: 'call_race_A',
name: 'read_file',
response: {
error: 'Tool execution result was not recorded',
},
},
},
],
},
]);

let capturedOnComplete:
| ((completedTools: TrackedToolCall[]) => Promise<void>)
| null = null;
mockUseReactToolScheduler.mockImplementation((onComplete) => {
capturedOnComplete = onComplete;
return [[], mockScheduleToolCalls, mockMarkToolsAsSubmitted];
});

renderHook(() =>
useGeminiStream(
client,
[],
mockAddItem,
mockConfig,
mockLoadedSettings,
mockOnDebugMessage,
mockHandleSlashCommand,
false,
() => 'vscode' as EditorType,
() => {},
() => Promise.resolve(),
false,
() => {},
() => {},
() => {},
() => {},
80,
24,
),
);

await act(async () => {
if (capturedOnComplete) {
await capturedOnComplete([lateRealResult]);
}
});

await waitFor(() => {
// The dedup hit must `markToolsAsSubmitted` so the UI/scheduler is
// unblocked even though we drop the real result on the wire.
expect(mockMarkToolsAsSubmitted).toHaveBeenCalledWith(['call_race_A']);
});

// No follow-up submission: the synthetic in history already closes
// the tool_use ↔ tool_result pair.
expect(mockSendMessageStream).not.toHaveBeenCalled();
});

it('should not flicker streaming state to Idle between tool completion and submission', async () => {
const toolCallResponseParts: PartListUnion = [
{ text: 'tool 1 final response' },
Expand Down
51 changes: 46 additions & 5 deletions packages/cli/src/ui/hooks/useGeminiStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1968,10 +1968,6 @@ export const useGeminiStream = (

const handleCompletedTools = useCallback(
async (completedToolCallsFromScheduler: TrackedToolCall[]) => {
if (isResponding) {
return;
}

const completedAndReadyToSubmitTools =
completedToolCallsFromScheduler.filter(
(
Expand All @@ -1994,6 +1990,49 @@ export const useGeminiStream = (
},
);

// History-based dedup MUST run before the `isResponding` early-return.
// If a synthetic `functionResponse` for this callId is already in
// chat.history (planted on session-load by
// `client.repairOrphanedToolUseTurnsInHistory` or on every
// `chat.sendMessageStream` push by the inline repair pass), the
// in-flight scheduler result must be marked submitted NOW —
// `useReactToolScheduler.allToolCallsCompleteHandler` is single-shot
// per batch, so a later isResponding=true early-return would leave
// the tool stuck in `completed-but-not-submitted` forever (Race A
// surfaced in PR #4176 review). The real result is dropped on the
// wire — same trade-off upstream Claude Code makes when its
// `StreamingToolExecutor.discard()` follows a
// `yieldMissingToolResultBlocks` synthesis (`query.ts:733` + `:984`).
const historyCallIdsWithResponse = new Set<string>();
// Guard the call: some test harnesses build a partial GeminiClient
// mock without `getHistory`. Skipping dedup in that case is safe —
// it just means tests that never set up the repair pre-condition
// run with the original (pre-dedup) submission shape.
if (geminiClient && typeof geminiClient.getHistory === 'function') {
for (const entry of geminiClient.getHistory()) {
if (entry.role !== 'user') continue;
for (const part of entry.parts ?? []) {
const id = part.functionResponse?.id;
if (id) historyCallIdsWithResponse.add(id);
}
}
}
const dedupedCallIds = completedAndReadyToSubmitTools
.filter((tc) => historyCallIdsWithResponse.has(tc.request.callId))
.map((tc) => tc.request.callId);
if (dedupedCallIds.length > 0) {
debugLogger.warn(
`[REPAIR] Dropping ${dedupedCallIds.length} late tool result(s) ` +
`whose callId already has a functionResponse in history: ` +
`${dedupedCallIds.join(', ')}`,
);
markToolsAsSubmitted(dedupedCallIds);
}

if (isResponding) {
return;
}
Comment on lines +1993 to +2034

// Finalize any client-initiated tools as soon as they are done.
const clientTools = completedAndReadyToSubmitTools.filter(
(t) => t.request.isClientInitiated,
Expand All @@ -2020,7 +2059,9 @@ export const useGeminiStream = (
}

const geminiTools = completedAndReadyToSubmitTools.filter(
(t) => !t.request.isClientInitiated,
(t) =>
!t.request.isClientInitiated &&
!historyCallIdsWithResponse.has(t.request.callId),
);

for (const toolCall of geminiTools) {
Expand Down
77 changes: 77 additions & 0 deletions packages/core/src/core/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,80 @@ describe('Gemini Client (client.ts)', () => {
});
});

describe('startChat — repair orphan tool_use on resume', () => {
it('synthesizes a functionResponse for a transcript ending in a dangling model[functionCall]', async () => {
// --resume of a session that crashed (OOM / SIGKILL / process exit)
// between the partial-tool_use push in `processStreamResponse` and
// the React scheduler's `submitQuery(ToolResult)`. The persisted
// JSONL ends with `model[functionCall]` and no matching user
// `functionResponse`. Without the repair pass running at session
// load, the first API call after `--resume` would 400 with
// "tool_use_id ... must have a corresponding tool_use block in
// the previous message" — exactly the wedge this PR is supposed
// to escape. Covers the only resume-time integration point for
// the repair, so a future reorder/removal of the call in
// `startChat()` regresses this test.
await client.startChat([
{
role: 'user',
parts: [{ text: 'open /tmp/crash.txt' }],
},
{
role: 'model',
parts: [
{
functionCall: {
id: 'call_crash_resume',
name: 'read_file',
args: { path: '/tmp/crash.txt' },
},
} as never,
],
},
]);

const history = client.getHistory();
// startChat prepends a mocked env-context user/model pair, then
// appends the supplied extraHistory; the repair pass must then
// splice a synthetic user[functionResponse] AFTER the dangling
// model[fc]. Locate the dangling model entry by its callId and
// verify the immediately-following entry carries the synthetic.
const danglingIdx = history.findIndex(
(h) =>
h.role === 'model' &&
h.parts?.some((p) => p.functionCall?.id === 'call_crash_resume'),
);
expect(danglingIdx).toBeGreaterThanOrEqual(0);
const userAfter = history[danglingIdx + 1];
expect(userAfter?.role).toBe('user');
const fr = userAfter?.parts!.find((p) => p.functionResponse);
expect(fr?.functionResponse?.id).toBe('call_crash_resume');
expect(fr?.functionResponse?.name).toBe('read_file');
expect(
(fr?.functionResponse?.response as { error?: string })?.error,
).toMatch(/interrupted/i);
});

it('is a no-op when the resumed transcript has no dangling tool_use', async () => {
// Happy resume path: don't inject a synthetic functionResponse
// into a transcript whose tool_use pairing is already valid (or,
// as here, has no tool_use at all). Defends against a future
// regression where the repair pass starts spuriously injecting on
// perfectly-formed history.
await client.startChat([
{ role: 'user', parts: [{ text: 'q' }] },
{ role: 'model', parts: [{ text: 'plain text reply' }] },
]);

const history = client.getHistory();
// No functionResponse anywhere — repair did nothing.
const hasAnyFunctionResponse = history.some((h) =>
h.parts?.some((p) => p.functionResponse),
);
expect(hasAnyFunctionResponse).toBe(false);
});
});

describe('setTools — system instruction refresh', () => {
// Regression coverage for the progressive-MCP wiring bug: when MCP
// discovery completes AFTER startChat() (the new default), `setTools()`
Expand Down Expand Up @@ -1453,6 +1527,7 @@ describe('Gemini Client (client.ts)', () => {
getHistory: vi.fn().mockReturnValue([]),
getHistoryLength,
stripOrphanedUserEntriesFromHistory,
repairOrphanedToolUseTurns: vi.fn().mockReturnValue({ injected: [] }),
} as unknown as GeminiChat;
mockTurnRunFn.mockReturnValue(
(async function* () {
Expand Down Expand Up @@ -4205,6 +4280,7 @@ Other open files:
getHistoryLength: vi.fn().mockReturnValueOnce(3).mockReturnValue(2),
setHistory: vi.fn(),
stripOrphanedUserEntriesFromHistory: vi.fn(),
repairOrphanedToolUseTurns: vi.fn().mockReturnValue({ injected: [] }),
};
client['chat'] = mockChat as GeminiChat;

Expand Down Expand Up @@ -4237,6 +4313,7 @@ Other open files:
getHistoryLength: vi.fn().mockReturnValue(0),
setHistory: vi.fn(),
stripOrphanedUserEntriesFromHistory: vi.fn(),
repairOrphanedToolUseTurns: vi.fn().mockReturnValue({ injected: [] }),
};
client['chat'] = mockChat as GeminiChat;

Expand Down
60 changes: 60 additions & 0 deletions packages/core/src/core/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,44 @@ export class GeminiClient {
this.forceFullIdeContext = true;
}

/**
* Synthesize a `functionResponse` for every dangling `model[functionCall]`
* in chat history whose corresponding tool_result never landed. Inverse of
* {@link stripOrphanedUserEntriesFromHistory}, which only handles trailing
* `user` entries.
*
* Called from three points:
* 1. After {@link startChat} loads transcript (covers `--resume` of a
* session that crashed between partial-tool_use push and tool
* completion).
* 2. After `stripOrphanedUserEntriesFromHistory` on the Retry submit path
* (covers Ctrl+Y race — user retries while an in-flight tool's
* `tool_result` has not yet been submitted, leaving a trailing
* `model[functionCall]` without matching `functionResponse`).
* 3. Defensively at the start of UserQuery / Cron sends, so any state
* that slipped past 1+2 still gets fixed before hitting the wire.
*
* Synthesizes an `error` `functionResponse`. The React tool scheduler
* (`useGeminiStream.handleCompletedTools`) MUST dedupe by `callId` against
* the live history before submitting its own `tool_result` — otherwise a
* late real result lands as a second `user[tool_result]` block (orphan
* because the synthetic already consumed the matching `tool_use`).
*/
repairOrphanedToolUseTurnsInHistory(reason?: string): {
injected: Array<{ callId: string; name: string }>;
} {
const result = this.getChat().repairOrphanedToolUseTurns(reason);
if (result.injected.length > 0) {
debugLogger.warn(
`[REPAIR] Synthesized ${result.injected.length} functionResponse(s) ` +
`for dangling tool_use(s): ${result.injected
.map((e) => `${e.name}(${e.callId})`)
.join(', ')}`,
);
}
return result;
}

setHistory(history: Content[]) {
this.getChat().setHistory(history);
// Replacing history wholesale drops any prior read_file tool
Expand Down Expand Up @@ -656,6 +694,21 @@ export class GeminiClient {
uiTelemetryService,
);

// Repair any dangling `model[functionCall]` whose `functionResponse`
// never made it back into the transcript before we wrote the JSONL.
// The common cause is a process crash / OOM / SIGKILL between the
// partial-tool_use push (see `processStreamResponse`) and the React
// scheduler's tool_result submission. Without this pass, the first
// API call on a resumed session would 400 with the same
// `tool_use_id ... corresponding tool_use` error this whole
// subsystem is trying to escape. (Belt-and-suspenders: the same
// helper runs again inside `chat.sendMessageStream` after the user
// content is pushed, so a dangling left here by setHistory /
// compaction reordering is also caught — but doing it here keeps
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] This new startChat() wiring is the only resume-time integration point that repairs a dangling model[functionCall] before the first resumed send, but the tests only cover the helper directly and mock this method in client.test.ts. A future refactor could remove or reorder this call while the helper tests still pass, regressing the --resume recovery path this PR is meant to protect.

Please add a client-level test that starts a chat with extraHistory ending in a model turn containing a functionCall, then asserts the initialized chat history contains the synthetic user[functionResponse] before any send occurs.

— gpt-5.5 via Qwen Code /review

// any pre-send code reading `chat.history` from seeing a malformed
// shape.)
this.repairOrphanedToolUseTurnsInHistory();

const sessionStartAdditionalContext =
await this.fireSessionStartHook(sessionStartSource);
this.lastSessionStartContext = sessionStartAdditionalContext;
Expand Down Expand Up @@ -1043,6 +1096,13 @@ export class GeminiClient {

if (messageType === SendMessageType.Retry) {
this.stripOrphanedUserEntriesFromHistory();
// The matching dangling-`functionCall` repair runs inside
// `chat.sendMessageStream` AFTER the user content is pushed, so any
// tool_result the user is supplying (Retry of a ToolResult
// submission, lastPrompt === fr parts) closes the pair via the real
// `functionResponse` before we synthesize an error one. Doing the
// repair here would happen pre-push and race against the user
// content's own pairing — see PR #4176 review for the corner.
}

// Fire UserPromptSubmit hook through MessageBus (only if hooks are enabled)
Expand Down
Loading
Loading