Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions src/extension/xtab/node/xtabProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*--------------------------------------------------------------------------------------------*/

import { Raw } from '@vscode/prompt-tsx';
import { FetchStreamSource } from '../../../platform/chat/common/chatMLFetcher';
import { createInitialFetchErrorDetector, FetchStreamSource } from '../../../platform/chat/common/chatMLFetcher';
import { ChatFetchError, ChatFetchResponseType, ChatLocation, RESPONSE_CONTAINED_NO_CHOICES } from '../../../platform/chat/common/commonTypes';
import { ConfigKey, IConfigurationService, XTabProviderId } from '../../../platform/configuration/common/configurationService';
import { IDiffService } from '../../../platform/diff/common/diffService';
Expand Down Expand Up @@ -704,7 +704,7 @@ export class XtabProvider implements IStatelessNextEditProvider {

let ttft: number | undefined;

const firstTokenReceived = new DeferredPromise<void>();
const { wrapFinishedCb, getInitialFetchError } = createInitialFetchErrorDetector();

logContext.setHeaderRequestId(request.headerRequestId);

Expand All @@ -716,10 +716,7 @@ export class XtabProvider implements IStatelessNextEditProvider {
{
debugName: XtabProvider.ID,
messages,
finishedCb: async (text, _, delta) => {
if (!firstTokenReceived.isSettled) {
firstTokenReceived.complete();
}
finishedCb: wrapFinishedCb(async (text, _, delta) => {
if (ttft === undefined && text !== '') {
ttft = fetchRequestStopWatch.elapsed();
logContext.addLog(`TTFT ${ttft} ms`);
Expand All @@ -729,7 +726,7 @@ export class XtabProvider implements IStatelessNextEditProvider {
responseSoFar = text;
logContext.setResponse(responseSoFar);
return undefined;
},
}),
location: ChatLocation.Other,
source: undefined,
requestOptions: {
Expand All @@ -753,19 +750,19 @@ export class XtabProvider implements IStatelessNextEditProvider {
telemetryBuilder.setResponse(fetchResultPromise.then((response) => ({ response, ttft })));
logContext.setFullResponse(fetchResultPromise.then((response) => response.type === ChatFetchResponseType.Success ? response.value : undefined));

const fetchRes = await Promise.race([firstTokenReceived.p, fetchResultPromise]);
if (fetchRes && fetchRes.type !== ChatFetchResponseType.Success) {
if (fetchRes.type === ChatFetchResponseType.NotFound &&
const initialFetchError = await getInitialFetchError(fetchResultPromise);
if (initialFetchError) {
if (initialFetchError.type === ChatFetchResponseType.NotFound &&
!this.forceUseDefaultModel // if we haven't already forced using the default model; otherwise, this could cause an infinite loop
) {
this.forceUseDefaultModel = true;
return yield* this.doGetNextEdit(request, delaySession, tracer, logContext, cancellationToken, telemetryBuilder, opts.retryState); // use the same retry state
}
// diff-patch based model returns no choices if it has no edits to suggest
if (fetchRes.type === ChatFetchResponseType.Unknown && fetchRes.reason === RESPONSE_CONTAINED_NO_CHOICES) {
if (initialFetchError.type === ChatFetchResponseType.Unknown && initialFetchError.reason === RESPONSE_CONTAINED_NO_CHOICES) {
return new NoNextEditReason.NoSuggestions(request.documentBeforeEdits, editWindow);
}
return mapChatFetcherErrorToNoNextEditReason(fetchRes);
return mapChatFetcherErrorToNoNextEditReason(initialFetchError);
}

fetchResultPromise
Expand All @@ -781,10 +778,6 @@ export class XtabProvider implements IStatelessNextEditProvider {
}).finally(() => {
logContext.setFetchEndTime();

if (!firstTokenReceived.isSettled) {
firstTokenReceived.complete();
}

fetchStreamSource.resolve();

logContext.setResponse(responseSoFar);
Expand Down
61 changes: 59 additions & 2 deletions src/platform/chat/common/chatMLFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

import type { CancellationToken } from 'vscode';
import { createServiceIdentifier } from '../../../util/common/services';
import { AsyncIterableObject, AsyncIterableSource } from '../../../util/vs/base/common/async';
import { AsyncIterableObject, AsyncIterableSource, DeferredPromise } from '../../../util/vs/base/common/async';
import { Event } from '../../../util/vs/base/common/event';
import { FinishedCallback, IResponseDelta, OptionalChatRequestParams } from '../../networking/common/fetch';
import { IChatEndpoint, IMakeChatRequestOptions } from '../../networking/common/networking';
import { ChatResponse, ChatResponses } from './commonTypes';
import { ChatFetchError, ChatFetchResponseType, ChatResponse, ChatResponses } from './commonTypes';

export interface Source {
readonly extensionId?: string;
Expand Down Expand Up @@ -135,3 +135,60 @@ export class FetchStreamRecorder {
};
}
}

/**
* Creates a utility to detect if a streaming chat request fails before the first token is received.
*
* This is useful for handling HTTP errors (such as 404) that occur before any streaming data is
* returned. The utility works by racing the fetch result promise against a signal that resolves
* when the first streaming token arrives.
*
* Usage:
* ```typescript
* const { wrapFinishedCb, getInitialFetchError } = createInitialFetchErrorDetector();
*
* const fetchResultPromise = endpoint.makeChatRequest2({
* ...,
* finishedCb: wrapFinishedCb(myFinishedCb),
* }, token);
*
* const initialError = await getInitialFetchError(fetchResultPromise);
* if (initialError) {
* // handle early fetch failure (e.g., 404, rate limit)
* }
* ```
*/
export function createInitialFetchErrorDetector(): {
readonly wrapFinishedCb: (cb: FinishedCallback | undefined) => FinishedCallback;
readonly getInitialFetchError: (fetchResultPromise: Promise<ChatResponse>) => Promise<ChatFetchError | undefined>;
} {
const firstTokenReceived = new DeferredPromise<void>();

return {
wrapFinishedCb(cb: FinishedCallback | undefined): FinishedCallback {
return async (text: string, index: number, delta: IResponseDelta): Promise<number | undefined> => {
if (!firstTokenReceived.isSettled) {
firstTokenReceived.complete();
}
return cb ? cb(text, index, delta) : undefined;
};
},

async getInitialFetchError(fetchResultPromise: Promise<ChatResponse>): Promise<ChatFetchError | undefined> {
// Race: if the first token arrives before the fetch settles, streaming started OK.
// If the fetch settles first with an error, we catch it early before any tokens arrive.
const result = await Promise.race([firstTokenReceived.p, fetchResultPromise]);

// `firstTokenReceived.p` resolves to `undefined` (void) — no early error.
// `fetchResultPromise` resolves to a `ChatResponse`; only an error response is returned.
//
// Note: when `fetchResultPromise` wins the race due to an early error, `firstTokenReceived`
// remains unsettled. This is intentional — after this function returns, the deferred is no
// longer needed and will be garbage-collected along with the rest of the closure.
if (result !== undefined && result.type !== ChatFetchResponseType.Success) {
return result;
}
return undefined;
},
};
}
132 changes: 132 additions & 0 deletions src/platform/chat/test/common/chatMLFetcher.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

import { describe, expect, it } from 'vitest';
import { ChatFetchResponseType, ChatResponse } from '../../common/commonTypes';
import { createInitialFetchErrorDetector } from '../../common/chatMLFetcher';

function makeSuccessResponse(value = 'hello'): ChatResponse {
return {
type: ChatFetchResponseType.Success,
value,
requestId: 'req-1',
serverRequestId: 'srv-1',
usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 2, prompt_tokens_details: { cached_tokens: 0 } },
resolvedModel: 'test-model',
};
}

function makeErrorResponse(type = ChatFetchResponseType.NotFound): ChatResponse {
return {
type,
reason: 'Not found',
requestId: 'req-1',
serverRequestId: undefined,
} as ChatResponse;
}

describe('createInitialFetchErrorDetector', () => {

it('returns undefined when first token arrives before fetch completes', async () => {
const { wrapFinishedCb, getInitialFetchError } = createInitialFetchErrorDetector();

let resolveResult!: (r: ChatResponse) => void;
const fetchResultPromise = new Promise<ChatResponse>(resolve => { resolveResult = resolve; });

const wrappedCb = wrapFinishedCb(async () => undefined);

// Start the race first, then trigger the first token
const errorPromise = getInitialFetchError(fetchResultPromise);

// Simulate first token arriving
await wrappedCb('hello', 0, { text: 'hello' });

// Resolve the fetch result after the first token
resolveResult(makeSuccessResponse());

const result = await errorPromise;
expect(result).toBeUndefined();
});

it('returns the error when fetch fails before any tokens', async () => {
const { getInitialFetchError } = createInitialFetchErrorDetector();

const errorResponse = makeErrorResponse(ChatFetchResponseType.NotFound);
const fetchResultPromise = Promise.resolve(errorResponse);

const result = await getInitialFetchError(fetchResultPromise);
expect(result).toBe(errorResponse);
});

it('returns undefined when fetch succeeds before any tokens (unusual but not an error)', async () => {
const { getInitialFetchError } = createInitialFetchErrorDetector();

const successResponse = makeSuccessResponse();
const fetchResultPromise = Promise.resolve(successResponse);

const result = await getInitialFetchError(fetchResultPromise);
expect(result).toBeUndefined();
});

it('invokes the original finishedCb and returns its value', async () => {
const { wrapFinishedCb, getInitialFetchError } = createInitialFetchErrorDetector();

const calls: string[] = [];
const wrappedCb = wrapFinishedCb(async (text) => {
calls.push(text);
return undefined;
});

const fetchResultPromise = new Promise<ChatResponse>(() => { /* never resolves in this test */ });
const errorPromise = getInitialFetchError(fetchResultPromise);

await wrappedCb('chunk1', 0, { text: 'chunk1' });
await wrappedCb('chunk2', 0, { text: 'chunk2' });

expect(calls).toEqual(['chunk1', 'chunk2']);

// Clean up: resolve the promise so the race settles
const result = await Promise.race([errorPromise, Promise.resolve(undefined)]);
expect(result).toBeUndefined();
});

it('works correctly with undefined finishedCb', async () => {
const { wrapFinishedCb, getInitialFetchError } = createInitialFetchErrorDetector();

const wrappedCb = wrapFinishedCb(undefined);

const fetchResultPromise = new Promise<ChatResponse>(resolve => {
setTimeout(() => resolve(makeSuccessResponse()), 0);
});

const errorPromise = getInitialFetchError(fetchResultPromise);

// Simulate first token arriving
const cbResult = await wrappedCb('hello', 0, { text: 'hello' });
expect(cbResult).toBeUndefined();

const result = await errorPromise;
expect(result).toBeUndefined();
});

it('handles different error types correctly', async () => {
const errorTypes = [
ChatFetchResponseType.Failed,
ChatFetchResponseType.RateLimited,
ChatFetchResponseType.NetworkError,
ChatFetchResponseType.Unknown,
];

for (const errorType of errorTypes) {
const { getInitialFetchError } = createInitialFetchErrorDetector();

const errorResponse = makeErrorResponse(errorType);
const fetchResultPromise = Promise.resolve(errorResponse);

const result = await getInitialFetchError(fetchResultPromise);
expect(result).toBe(errorResponse);
}
});
});
Loading