Skip to content

Commit 571a7d5

Browse files
committed
Resend all messages when WebSocket's previous response id does not match (#298236)
1 parent c00fafe commit 571a7d5

5 files changed

Lines changed: 105 additions & 12 deletions

File tree

extensions/copilot/src/platform/endpoint/common/statefulMarkerContainer.tsx

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,17 @@ export function getStatefulMarkerAndIndex(modelId: string, messages: readonly Ra
8282
}
8383
}
8484
return undefined;
85-
}
85+
}
86+
87+
/**
88+
* Finds the message index of a specific stateful marker value in the message history.
89+
* Returns the index if found, undefined otherwise.
90+
*/
91+
export function getIndexOfStatefulMarker(markerValue: string, messages: readonly Raw.ChatMessage[]): number | undefined {
92+
for (const entry of getAllStatefulMarkersAndIndicies(messages)) {
93+
if (entry.statefulMarker.marker === markerValue) {
94+
return entry.index;
95+
}
96+
}
97+
return undefined;
98+
}

extensions/copilot/src/platform/endpoint/node/responsesApi.ts

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ import { TelemetryData } from '../../telemetry/common/telemetryData';
2626
import { getVerbosityForModelSync } from '../common/chatModelCapabilities';
2727
import { rawPartAsCompactionData } from '../common/compactionDataContainer';
2828
import { rawPartAsPhaseData } from '../common/phaseDataContainer';
29-
import { getStatefulMarkerAndIndex } from '../common/statefulMarkerContainer';
29+
import { getIndexOfStatefulMarker, getStatefulMarkerAndIndex } from '../common/statefulMarkerContainer';
3030
import { rawPartAsThinkingData } from '../common/thinkingDataContainer';
31+
import { IChatWebSocketManager } from '../../networking/node/chatWebSocketManager';
3132

3233
export function getResponsesApiCompactionThreshold(configService: IConfigurationService, expService: IExperimentationService, endpoint: IChatEndpoint): number | undefined {
3334
const contextManagementEnabled = configService.getExperimentBasedConfig(ConfigKey.ResponsesApiContextManagementEnabled, expService) && !modelsWithoutResponsesContextManagement.has(endpoint.family);
@@ -49,7 +50,7 @@ export function createResponsesRequestBody(accessor: ServicesAccessor, options:
4950

5051
const body: IEndpointBody = {
5152
model,
52-
...rawMessagesToResponseAPI(model, options.messages, !!options.ignoreStatefulMarker),
53+
...rawMessagesToResponseAPI(model, options.messages, !!options.ignoreStatefulMarker, resolveWebSocketStatefulMarker(accessor, options)),
5354
stream: true,
5455
tools: options.requestOptions?.tools?.map((tool): OpenAI.Responses.FunctionTool & OpenAiResponsesFunctionTool => ({
5556
...tool.function,
@@ -131,22 +132,43 @@ interface LatestCompactionOutput {
131132
readonly outputIndex: number;
132133
}
133134

134-
function rawMessagesToResponseAPI(modelId: string, messages: readonly Raw.ChatMessage[], ignoreStatefulMarker: boolean): { input: OpenAI.Responses.ResponseInputItem[]; previous_response_id?: string } {
135+
function resolveWebSocketStatefulMarker(accessor: ServicesAccessor, options: ICreateEndpointBodyOptions): string | undefined {
136+
if (options.ignoreStatefulMarker || !options.useWebSocket || !options.conversationId) {
137+
return undefined;
138+
}
139+
return accessor.get(IChatWebSocketManager).getStatefulMarker(options.conversationId);
140+
}
141+
142+
function rawMessagesToResponseAPI(modelId: string, messages: readonly Raw.ChatMessage[], ignoreStatefulMarker: boolean, webSocketStatefulMarker: string | undefined): { input: OpenAI.Responses.ResponseInputItem[]; previous_response_id?: string } {
135143
const latestCompactionMessageIndex = getLatestCompactionMessageIndex(messages);
136144
const latestCompactionMessage = latestCompactionMessageIndex !== undefined ? createCompactionRoundTripMessage(messages[latestCompactionMessageIndex]) : undefined;
137-
const statefulMarkerAndIndex = !ignoreStatefulMarker && getStatefulMarkerAndIndex(modelId, messages);
138145

139146
let previousResponseId: string | undefined;
140-
if (statefulMarkerAndIndex) {
141-
previousResponseId = statefulMarkerAndIndex.statefulMarker;
147+
let markerIndex: number | undefined;
148+
149+
if (webSocketStatefulMarker) {
150+
// WebSocket path: use the connection's current stateful marker if present in messages
151+
markerIndex = getIndexOfStatefulMarker(webSocketStatefulMarker, messages);
152+
if (markerIndex !== undefined) {
153+
previousResponseId = webSocketStatefulMarker;
154+
}
155+
} else if (!ignoreStatefulMarker) {
156+
// HTTP path: look up the latest marker for this model from messages
157+
const statefulMarkerAndIndex = getStatefulMarkerAndIndex(modelId, messages);
158+
if (statefulMarkerAndIndex) {
159+
previousResponseId = statefulMarkerAndIndex.statefulMarker;
160+
markerIndex = statefulMarkerAndIndex.index;
161+
}
162+
}
142163

164+
if (markerIndex !== undefined) {
143165
// Requests that resume from previous_response_id send only post-marker history,
144166
// but they still need the latest compaction item even when that item predates
145167
// the marker. This keeps both websocket and non-websocket traffic aligned.
146-
messages = messages.slice(statefulMarkerAndIndex.index + 1);
168+
messages = messages.slice(markerIndex + 1);
147169
if (latestCompactionMessageIndex !== undefined) {
148-
if (latestCompactionMessageIndex > statefulMarkerAndIndex.index) {
149-
messages = messages.slice(latestCompactionMessageIndex - (statefulMarkerAndIndex.index + 1));
170+
if (latestCompactionMessageIndex > markerIndex) {
171+
messages = messages.slice(latestCompactionMessageIndex - (markerIndex + 1));
150172
} else if (latestCompactionMessage) {
151173
messages = [latestCompactionMessage, ...messages];
152174
}

extensions/copilot/src/platform/endpoint/node/test/responsesApi.spec.ts

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { ILogService } from '../../../log/common/logService';
1212
import { isOpenAIContextManagementResponse } from '../../../networking/common/fetch';
1313
import { IChatEndpoint, ICreateEndpointBodyOptions } from '../../../networking/common/networking';
1414
import { openAIContextManagementCompactionType, OpenAIContextManagementResponse } from '../../../networking/common/openai';
15+
import { IChatWebSocketManager, NullChatWebSocketManager } from '../../../networking/node/chatWebSocketManager';
1516
import { TelemetryData } from '../../../telemetry/common/telemetryData';
1617
import { SpyingTelemetryService } from '../../../telemetry/node/spyingTelemetryService';
1718
import { createFakeStreamResponse } from '../../../test/node/fetcher';
@@ -307,6 +308,9 @@ describe('createResponsesRequestBody', () => {
307308

308309
it('still slices websocket requests by stateful marker index when compaction is disabled', () => {
309310
const services = createPlatformServices();
311+
const wsManager = new NullChatWebSocketManager();
312+
wsManager.getStatefulMarker = () => 'resp-prev';
313+
services.set(IChatWebSocketManager, wsManager);
310314
const accessor = services.createTestingAccessor();
311315
const instantiationService = accessor.get(IInstantiationService);
312316
const endpointWithoutCompaction = { ...testEndpoint, family: 'gpt-5' as const };
@@ -322,7 +326,7 @@ describe('createResponsesRequestBody', () => {
322326
},
323327
];
324328

325-
const webSocketBody = instantiationService.invokeFunction(servicesAccessor => createResponsesRequestBody(servicesAccessor, createRequestOptions(messages, true), endpointWithoutCompaction.model, endpointWithoutCompaction));
329+
const webSocketBody = instantiationService.invokeFunction(servicesAccessor => createResponsesRequestBody(servicesAccessor, { ...createRequestOptions(messages, true), conversationId: 'conv-1' }, endpointWithoutCompaction.model, endpointWithoutCompaction));
326330

327331
expect(webSocketBody.previous_response_id).toBe('resp-prev');
328332
expect(webSocketBody.input).toHaveLength(1);
@@ -337,6 +341,9 @@ describe('createResponsesRequestBody', () => {
337341

338342
it('includes the newest compaction item in websocket requests when it predates the stateful marker', () => {
339343
const services = createPlatformServices();
344+
const wsManager = new NullChatWebSocketManager();
345+
wsManager.getStatefulMarker = () => 'resp-prev';
346+
services.set(IChatWebSocketManager, wsManager);
340347
const accessor = services.createTestingAccessor();
341348
const instantiationService = accessor.get(IInstantiationService);
342349
const latestCompaction = createCompactionResponse('cmp_ws', 'enc_ws');
@@ -353,7 +360,7 @@ describe('createResponsesRequestBody', () => {
353360
},
354361
];
355362

356-
const webSocketBody = instantiationService.invokeFunction(servicesAccessor => createResponsesRequestBody(servicesAccessor, createRequestOptions(messages, true), testEndpoint.model, testEndpoint));
363+
const webSocketBody = instantiationService.invokeFunction(servicesAccessor => createResponsesRequestBody(servicesAccessor, { ...createRequestOptions(messages, true), conversationId: 'conv-1' }, testEndpoint.model, testEndpoint));
357364

358365
expect(webSocketBody.previous_response_id).toBe('resp-prev');
359366
expect(webSocketBody.input).toContainEqual({
@@ -370,6 +377,42 @@ describe('createResponsesRequestBody', () => {
370377
services.dispose();
371378
});
372379

380+
it('sends all messages when the websocket stateful marker is not in the current messages', () => {
381+
const services = createPlatformServices();
382+
const wsManager = new NullChatWebSocketManager();
383+
wsManager.getStatefulMarker = () => 'resp-stale';
384+
services.set(IChatWebSocketManager, wsManager);
385+
const accessor = services.createTestingAccessor();
386+
const instantiationService = accessor.get(IInstantiationService);
387+
const messages: Raw.ChatMessage[] = [
388+
{
389+
role: Raw.ChatRole.User,
390+
content: [{ type: Raw.ChatCompletionContentPartKind.Text, text: 'first message' }],
391+
},
392+
createStatefulMarkerMessage(testEndpoint.model, 'resp-different'),
393+
{
394+
role: Raw.ChatRole.User,
395+
content: [{ type: Raw.ChatCompletionContentPartKind.Text, text: 'second message' }],
396+
},
397+
];
398+
399+
const body = instantiationService.invokeFunction(servicesAccessor => createResponsesRequestBody(servicesAccessor, { ...createRequestOptions(messages, true), conversationId: 'conv-1' }, testEndpoint.model, testEndpoint));
400+
401+
expect(body.previous_response_id).toBeUndefined();
402+
expect(body.input).toHaveLength(2);
403+
expect(body.input?.[0]).toMatchObject({
404+
role: 'user',
405+
content: [{ type: 'input_text', text: 'first message' }],
406+
});
407+
expect(body.input?.[1]).toMatchObject({
408+
role: 'user',
409+
content: [{ type: 'input_text', text: 'second message' }],
410+
});
411+
412+
accessor.dispose();
413+
services.dispose();
414+
});
415+
373416
it('includes the newest compaction item in non-websocket requests when it predates the stateful marker', () => {
374417
const services = createPlatformServices();
375418
const accessor = services.createTestingAccessor();

extensions/copilot/src/platform/networking/node/chatWebSocketManager.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ export interface IChatWebSocketManager {
3838
*/
3939
hasActiveConnection(conversationId: string): boolean;
4040

41+
/**
42+
* Returns the stateful marker (last completed response ID) for the given
43+
* conversation's active WebSocket connection, or undefined if there is
44+
* no active connection or no marker yet.
45+
*/
46+
getStatefulMarker(conversationId: string): string | undefined;
47+
4148
/**
4249
* Closes and removes the connection for a specific conversation.
4350
*/
@@ -58,6 +65,7 @@ export class NullChatWebSocketManager implements IChatWebSocketManager {
5865
throw new Error('WebSocket not available');
5966
}
6067
hasActiveConnection(_conversationId: string): boolean { return false; }
68+
getStatefulMarker(_conversationId: string): string | undefined { return undefined; }
6169
closeConnection(_conversationId: string): void { }
6270
closeAll(): void { }
6371
}
@@ -201,6 +209,11 @@ export class ChatWebSocketManager extends Disposable implements IChatWebSocketMa
201209
return !!connection?.isOpen;
202210
}
203211

212+
getStatefulMarker(conversationId: string): string | undefined {
213+
const connection = this._connections.get(conversationId);
214+
return connection?.isOpen ? connection.statefulMarker : undefined;
215+
}
216+
204217
closeConnection(conversationId: string): void {
205218
const connection = this._connections.get(conversationId);
206219
if (connection) {

extensions/copilot/src/platform/test/node/services.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ import { IScopeSelector } from '../../scopeSelection/common/scopeSelection';
8181
import { ISearchService } from '../../search/common/searchService';
8282
import { ISimulationTestContext, NulSimulationTestContext } from '../../simulationTestContext/common/simulationTestContext';
8383
import { ISnippyService, NullSnippyService } from '../../snippy/common/snippyService';
84+
import { IChatWebSocketManager, NullChatWebSocketManager } from '../../networking/node/chatWebSocketManager';
8485
import { ISurveyService, NullSurveyService } from '../../survey/common/surveyService';
8586
import { ITabsAndEditorsService } from '../../tabs/common/tabsAndEditorsService';
8687
import { ITasksService } from '../../tasks/common/tasksService';
@@ -278,6 +279,7 @@ export function createPlatformServices(disposables: Pick<DisposableStore, 'add'>
278279
}
279280
}));
280281
testingServiceCollection.define(ISnippyService, new SyncDescriptor(NullSnippyService));
282+
testingServiceCollection.define(IChatWebSocketManager, new SyncDescriptor(NullChatWebSocketManager));
281283
testingServiceCollection.define(IInteractiveSessionService, new SyncDescriptor(class implements IInteractiveSessionService {
282284
_serviceBrand: undefined;
283285
async transferActiveChat(workspaceUri: Uri): Promise<void> {

0 commit comments

Comments
 (0)