Skip to content
This repository was archived by the owner on May 20, 2026. It is now read-only.

Commit c918766

Browse files
committed
fixing compaction
1 parent 8f25ff1 commit c918766

6 files changed

Lines changed: 710 additions & 23 deletions

File tree

src/extension/externalAgents/node/oaiLanguageModelServer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import type OpenAI from 'openai';
1010
import { IChatMLFetcher, Source } from '../../../platform/chat/common/chatMLFetcher';
1111
import { ChatLocation, ChatResponse } from '../../../platform/chat/common/commonTypes';
1212
import { CustomModel, EndpointEditToolName, IEndpointProvider } from '../../../platform/endpoint/common/endpointProvider';
13-
import { OpenAIResponsesProcessor, responseApiInputToRawMessagesForLogging } from '../../../platform/endpoint/node/responsesApi';
13+
import { getResponsesApiCompactionThresholdFromBody, OpenAIResponsesProcessor, responseApiInputToRawMessagesForLogging } from '../../../platform/endpoint/node/responsesApi';
1414
import { ILogService } from '../../../platform/log/common/logService';
1515
import { FinishedCallback, OptionalChatRequestParams } from '../../../platform/networking/common/fetch';
1616
import { Response } from '../../../platform/networking/common/fetcherService';
@@ -455,7 +455,7 @@ class StreamingPassThroughEndpoint implements IChatEndpoint {
455455
// We parse the stream just to return a correct ChatCompletion for logging the response and token usage details.
456456
const requestId = response.headers.get('X-Request-ID') ?? generateUuid();
457457
const ghRequestId = response.headers.get('x-github-request-id') ?? '';
458-
const processor = this.instantiationService.createInstance(OpenAIResponsesProcessor, telemetryData, requestId, ghRequestId);
458+
const processor = this.instantiationService.createInstance(OpenAIResponsesProcessor, telemetryData, telemetryService, requestId, ghRequestId, getResponsesApiCompactionThresholdFromBody(this.requestBody));
459459
const parser = new SSEParser((ev) => {
460460
try {
461461
logService.trace(`[StreamingPassThroughEndpoint] SSE: ${ev.data}`);

src/extension/prompt/node/chatMLFetcher.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import { IInteractionService } from '../../../platform/chat/common/interactionSe
1717
import { ConfigKey, HARD_TOOL_LIMIT, IConfigurationService } from '../../../platform/configuration/common/configurationService';
1818
import { ICAPIClientService } from '../../../platform/endpoint/common/capiClient';
1919
import { isAutoModel } from '../../../platform/endpoint/node/autoChatEndpoint';
20-
import { OpenAIResponsesProcessor, responseApiInputToRawMessagesForLogging, sendCompletionOutputTelemetry } from '../../../platform/endpoint/node/responsesApi';
20+
import { getResponsesApiCompactionThresholdFromBody, OpenAIResponsesProcessor, responseApiInputToRawMessagesForLogging, sendCompletionOutputTelemetry } from '../../../platform/endpoint/node/responsesApi';
2121
import { collectSingleLineErrorMessage, ILogService } from '../../../platform/log/common/logService';
2222
import { isAnthropicToolSearchEnabled } from '../../../platform/networking/common/anthropic';
2323
import { FinishedCallback, getRequestId, IResponseDelta, OptionalChatRequestParams, RequestId } from '../../../platform/networking/common/fetch';
@@ -1102,7 +1102,7 @@ export class ChatMLFetcherImpl extends AbstractChatMLFetcher {
11021102
const handle = connection.sendRequest(request, { userInitiated: !!userInitiatedRequest, turnId }, cancellationToken);
11031103

11041104
const extendedBaseTelemetryData = baseTelemetryData.extendedBy({ modelCallId });
1105-
const processor = this._instantiationService.createInstance(OpenAIResponsesProcessor, extendedBaseTelemetryData, modelRequestId.headerRequestId, modelRequestId.gitHubRequestId);
1105+
const processor = this._instantiationService.createInstance(OpenAIResponsesProcessor, extendedBaseTelemetryData, this._telemetryService, modelRequestId.headerRequestId, modelRequestId.gitHubRequestId, getResponsesApiCompactionThresholdFromBody(request));
11061106

11071107
// Set up streaming first so event listeners are registered before we
11081108
// await the first event — AsyncIterableObject runs its executor eagerly.

src/platform/endpoint/node/chatEndpoint.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import { isGeminiFamily } from '../common/chatModelCapabilities';
3434
import { IDomainService } from '../common/domainService';
3535
import { CustomModel, IChatModelInformation, ModelSupportedEndpoint } from '../common/endpointProvider';
3636
import { createMessagesRequestBody, processResponseFromMessagesEndpoint } from './messagesApi';
37-
import { createResponsesRequestBody, processResponseFromChatEndpoint } from './responsesApi';
37+
import { createResponsesRequestBody, getResponsesApiCompactionThreshold, processResponseFromChatEndpoint } from './responsesApi';
3838

3939
/**
4040
* The default processor for the stream format from CAPI
@@ -368,7 +368,8 @@ export class ChatEndpoint implements IChatEndpoint {
368368
cancellationToken?: CancellationToken | undefined
369369
): Promise<AsyncIterableObject<ChatCompletion>> {
370370
if (this.useResponsesApi) {
371-
return processResponseFromChatEndpoint(this._instantiationService, telemetryService, logService, response, expectedNumChoices, finishCallback, telemetryData);
371+
const compactionThreshold = getResponsesApiCompactionThreshold(this._configurationService, this._expService, this);
372+
return processResponseFromChatEndpoint(this._instantiationService, telemetryService, logService, response, expectedNumChoices, finishCallback, telemetryData, compactionThreshold);
372373
} else if (this.useMessagesApi) {
373374
return processResponseFromMessagesEndpoint(this._instantiationService, telemetryService, logService, response, finishCallback, telemetryData);
374375
} else if (!this._supportsStreaming) {

src/platform/endpoint/node/responsesApi.ts

Lines changed: 167 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import { ILogService } from '../../log/common/logService';
1919
import { FinishedCallback, IResponseDelta, OpenAiResponsesFunctionTool } from '../../networking/common/fetch';
2020
import { IChatEndpoint, ICreateEndpointBodyOptions, IEndpointBody } from '../../networking/common/networking';
2121
import { ChatCompletion, FinishedCompletionReason, modelsWithoutResponsesContextManagement, openAIContextManagementCompactionType, OpenAIContextManagementResponse, rawMessageToCAPI, TokenLogProb } from '../../networking/common/openai';
22-
import { sendEngineMessagesTelemetry } from '../../networking/node/chatStream';
22+
import { sendEngineMessagesTelemetry, sendResponsesApiCompactionTelemetry } from '../../networking/node/chatStream';
2323
import { IExperimentationService } from '../../telemetry/common/nullExperimentationService';
2424
import { ITelemetryService } from '../../telemetry/common/telemetry';
2525
import { TelemetryData } from '../../telemetry/common/telemetryData';
@@ -29,10 +29,22 @@ import { rawPartAsPhaseData } from '../common/phaseDataContainer';
2929
import { getStatefulMarkerAndIndex } from '../common/statefulMarkerContainer';
3030
import { rawPartAsThinkingData } from '../common/thinkingDataContainer';
3131

32+
export function getResponsesApiCompactionThreshold(configService: IConfigurationService, expService: IExperimentationService, endpoint: IChatEndpoint): number | undefined {
33+
const contextManagementEnabled = configService.getExperimentBasedConfig(ConfigKey.ResponsesApiContextManagementEnabled, expService) && !modelsWithoutResponsesContextManagement.has(endpoint.family);
34+
if (!contextManagementEnabled) {
35+
return undefined;
36+
}
37+
38+
return endpoint.modelMaxPromptTokens > 0
39+
? Math.floor(endpoint.modelMaxPromptTokens * 0.9)
40+
: 50000;
41+
}
42+
3243
export function createResponsesRequestBody(accessor: ServicesAccessor, options: ICreateEndpointBodyOptions, model: string, endpoint: IChatEndpoint): IEndpointBody {
3344
const configService = accessor.get(IConfigurationService);
3445
const expService = accessor.get(IExperimentationService);
3546
const verbosity = getVerbosityForModelSync(endpoint);
47+
const compactThreshold = getResponsesApiCompactionThreshold(configService, expService, endpoint);
3648
// compaction supported for all the models but works well for codex models and any future models after 5.3
3749

3850
const body: IEndpointBody = {
@@ -56,11 +68,7 @@ export function createResponsesRequestBody(accessor: ServicesAccessor, options:
5668
text: verbosity ? { verbosity } : undefined,
5769
};
5870

59-
const contextManagementEnabled = configService.getExperimentBasedConfig(ConfigKey.ResponsesApiContextManagementEnabled, expService) && !modelsWithoutResponsesContextManagement.has(endpoint.family);
60-
if (contextManagementEnabled) {
61-
const compactThreshold = endpoint.modelMaxPromptTokens > 0
62-
? Math.floor(endpoint.modelMaxPromptTokens * 0.9)
63-
: 50000;
71+
if (compactThreshold !== undefined) {
6472
body.context_management = [{
6573
'type': openAIContextManagementCompactionType,
6674
// Trigger compaction at 90% of the model max prompt context to keep headroom for active turns.
@@ -93,6 +101,21 @@ export function createResponsesRequestBody(accessor: ServicesAccessor, options:
93101
return body;
94102
}
95103

104+
export function getResponsesApiCompactionThresholdFromBody(body: Pick<IEndpointBody, 'context_management'>): number | undefined {
105+
const contextManagement = body.context_management;
106+
if (!Array.isArray(contextManagement)) {
107+
return undefined;
108+
}
109+
110+
for (const item of contextManagement) {
111+
if (item.type === openAIContextManagementCompactionType && typeof item.compact_threshold === 'number') {
112+
return item.compact_threshold;
113+
}
114+
}
115+
116+
return undefined;
117+
}
118+
96119
type ResponseOutputMessageWithPhase = OpenAI.Responses.ResponseOutputMessage & {
97120
phase?: string;
98121
};
@@ -101,17 +124,33 @@ interface ResponseOutputItemWithPhase {
101124
phase?: string;
102125
}
103126

127+
interface LatestCompactionOutput {
128+
readonly item: OpenAIContextManagementResponse;
129+
readonly outputIndex: number;
130+
}
131+
104132
function rawMessagesToResponseAPI(modelId: string, messages: readonly Raw.ChatMessage[], ignoreStatefulMarker: boolean): { input: OpenAI.Responses.ResponseInputItem[]; previous_response_id?: string } {
105133
const latestCompactionMessageIndex = getLatestCompactionMessageIndex(messages);
106-
if (latestCompactionMessageIndex !== undefined) {
107-
messages = messages.slice(latestCompactionMessageIndex);
108-
}
109-
134+
const latestCompactionMessage = latestCompactionMessageIndex !== undefined ? createCompactionRoundTripMessage(messages[latestCompactionMessageIndex]) : undefined;
110135
const statefulMarkerAndIndex = !ignoreStatefulMarker && getStatefulMarkerAndIndex(modelId, messages);
136+
111137
let previousResponseId: string | undefined;
112-
if (latestCompactionMessageIndex === undefined && statefulMarkerAndIndex) {
138+
if (statefulMarkerAndIndex) {
113139
previousResponseId = statefulMarkerAndIndex.statefulMarker;
140+
141+
// Requests that resume from previous_response_id send only post-marker history,
142+
// but they still need the latest compaction item even when that item predates
143+
// the marker. This keeps both websocket and non-websocket traffic aligned.
114144
messages = messages.slice(statefulMarkerAndIndex.index + 1);
145+
if (latestCompactionMessageIndex !== undefined) {
146+
if (latestCompactionMessageIndex > statefulMarkerAndIndex.index) {
147+
messages = messages.slice(latestCompactionMessageIndex - (statefulMarkerAndIndex.index + 1));
148+
} else if (latestCompactionMessage) {
149+
messages = [latestCompactionMessage, ...messages];
150+
}
151+
}
152+
} else if (latestCompactionMessageIndex !== undefined) {
153+
messages = messages.slice(latestCompactionMessageIndex);
115154
}
116155

117156
const input: OpenAI.Responses.ResponseInputItem[] = [];
@@ -174,6 +213,22 @@ function rawMessagesToResponseAPI(modelId: string, messages: readonly Raw.ChatMe
174213
return { input, previous_response_id: previousResponseId };
175214
}
176215

216+
function createCompactionRoundTripMessage(message: Raw.ChatMessage): Raw.ChatMessage | undefined {
217+
if (message.role !== Raw.ChatRole.Assistant) {
218+
return undefined;
219+
}
220+
221+
const content = message.content.filter(part => part.type === Raw.ChatCompletionContentPartKind.Opaque && rawPartAsCompactionData(part));
222+
if (!content.length) {
223+
return undefined;
224+
}
225+
226+
return {
227+
role: Raw.ChatRole.Assistant,
228+
content,
229+
};
230+
}
231+
177232
function getLatestCompactionMessageIndex(messages: readonly Raw.ChatMessage[]): number | undefined {
178233
for (let idx = messages.length - 1; idx >= 0; idx--) {
179234
const message = messages[idx];
@@ -426,11 +481,44 @@ function responseFunctionOutputToRawContents(output: string | OpenAI.Responses.R
426481
return coalesce(output.map(responseContentToRawContent));
427482
}
428483

429-
export async function processResponseFromChatEndpoint(instantiationService: IInstantiationService, telemetryService: ITelemetryService, logService: ILogService, response: Response, expectedNumChoices: number, finishCallback: FinishedCallback, telemetryData: TelemetryData): Promise<AsyncIterableObject<ChatCompletion>> {
484+
function isCompactionOutputItem(item: OpenAI.Responses.ResponseOutputItem): boolean {
485+
return item.type.toString() === openAIContextManagementCompactionType;
486+
}
487+
488+
function getLatestCompactionOutput(output: OpenAI.Responses.ResponseOutputItem[], preferredOutputIndex: number | undefined): LatestCompactionOutput | undefined {
489+
let latestCompactionOutput: LatestCompactionOutput | undefined;
490+
for (let idx = output.length - 1; idx >= 0; idx--) {
491+
const item = output[idx];
492+
if (isCompactionOutputItem(item)) {
493+
latestCompactionOutput = { item: item as unknown as OpenAIContextManagementResponse, outputIndex: idx };
494+
break;
495+
}
496+
}
497+
498+
if (preferredOutputIndex !== undefined) {
499+
const preferredItem = output[preferredOutputIndex];
500+
if (preferredItem && isCompactionOutputItem(preferredItem) && (!latestCompactionOutput || preferredOutputIndex >= latestCompactionOutput.outputIndex)) {
501+
return { item: preferredItem as unknown as OpenAIContextManagementResponse, outputIndex: preferredOutputIndex };
502+
}
503+
}
504+
505+
return latestCompactionOutput;
506+
}
507+
508+
function keepLatestCompactionOutput(output: OpenAI.Responses.ResponseOutputItem[], preferredOutputIndex: number | undefined): OpenAI.Responses.ResponseOutputItem[] {
509+
const latestCompactionOutput = getLatestCompactionOutput(output, preferredOutputIndex);
510+
if (!latestCompactionOutput) {
511+
return output;
512+
}
513+
514+
return output.filter((item, idx) => !isCompactionOutputItem(item) || idx === latestCompactionOutput.outputIndex);
515+
}
516+
517+
export async function processResponseFromChatEndpoint(instantiationService: IInstantiationService, telemetryService: ITelemetryService, logService: ILogService, response: Response, expectedNumChoices: number, finishCallback: FinishedCallback, telemetryData: TelemetryData, compactionThreshold?: number): Promise<AsyncIterableObject<ChatCompletion>> {
430518
return new AsyncIterableObject<ChatCompletion>(async feed => {
431519
const requestId = response.headers.get('X-Request-ID') ?? generateUuid();
432520
const ghRequestId = response.headers.get('x-github-request-id') ?? '';
433-
const processor = instantiationService.createInstance(OpenAIResponsesProcessor, telemetryData, requestId, ghRequestId);
521+
const processor = instantiationService.createInstance(OpenAIResponsesProcessor, telemetryData, telemetryService, requestId, ghRequestId, compactionThreshold);
434522
const parser = new SSEParser((ev) => {
435523
try {
436524
logService.trace(`SSE: ${ev.data}`);
@@ -472,13 +560,19 @@ interface CapiResponsesTextDeltaEvent extends Omit<OpenAI.Responses.ResponseText
472560
export class OpenAIResponsesProcessor {
473561
private textAccumulator: string = '';
474562
private hasReceivedReasoningSummary = false;
563+
private sawCompactionMessage = false;
564+
private latestCompactionOutputIndex: number | undefined;
565+
private latestCompactionItem: OpenAIContextManagementResponse | undefined;
475566
/** Maps output_index to { name, callId, arguments } for streaming tool call updates */
476567
private readonly toolCallInfo = new Map<number, { name: string; callId: string; arguments: string }>();
477568

478569
constructor(
479570
private readonly telemetryData: TelemetryData,
571+
private readonly telemetryService: ITelemetryService,
480572
private readonly requestId: string,
481573
private readonly ghRequestId: string,
574+
private readonly compactionThreshold: number | undefined,
575+
@ILogService private readonly logService: ILogService,
482576
) { }
483577

484578
public push(chunk: OpenAI.Responses.ResponseStreamEvent, _onProgress: FinishedCallback): ChatCompletion | undefined {
@@ -530,6 +624,12 @@ export class OpenAIResponsesProcessor {
530624
case 'response.output_item.done':
531625
if (chunk.item.type.toString() === openAIContextManagementCompactionType) {
532626
const compactionItem = chunk.item as unknown as OpenAIContextManagementResponse;
627+
if (this.latestCompactionOutputIndex !== undefined && chunk.output_index < this.latestCompactionOutputIndex) {
628+
return;
629+
}
630+
this.latestCompactionOutputIndex = chunk.output_index;
631+
this.latestCompactionItem = compactionItem;
632+
this.sawCompactionMessage = true;
533633
return onProgress({
534634
text: '',
535635
contextManagement: {
@@ -586,8 +686,58 @@ export class OpenAIResponsesProcessor {
586686
id: chunk.item_id
587687
}
588688
});
589-
case 'response.completed':
590-
onProgress({ text: '', statefulMarker: chunk.response.id });
689+
case 'response.completed': {
690+
const normalizedOutput = keepLatestCompactionOutput(chunk.response.output, this.latestCompactionOutputIndex);
691+
const latestCompactionOutput = getLatestCompactionOutput(normalizedOutput, this.latestCompactionOutputIndex);
692+
const latestCompactionItem = latestCompactionOutput?.item;
693+
const previousCompactionItem = this.latestCompactionItem;
694+
if (latestCompactionItem) {
695+
this.sawCompactionMessage = true;
696+
this.latestCompactionOutputIndex = latestCompactionOutput.outputIndex;
697+
}
698+
699+
const shouldEmitResolvedCompaction = latestCompactionItem && (
700+
!previousCompactionItem ||
701+
previousCompactionItem.id !== latestCompactionItem.id ||
702+
previousCompactionItem.encrypted_content !== latestCompactionItem.encrypted_content
703+
);
704+
if (latestCompactionItem) {
705+
this.latestCompactionItem = latestCompactionItem;
706+
}
707+
if (this.compactionThreshold !== undefined && this.sawCompactionMessage) {
708+
const promptTokens = chunk.response.usage?.input_tokens ?? 0;
709+
const totalTokens = chunk.response.usage?.total_tokens ?? 0;
710+
sendResponsesApiCompactionTelemetry(this.telemetryService, {
711+
outcome: 'compaction_returned',
712+
headerRequestId: this.requestId,
713+
gitHubRequestId: this.ghRequestId,
714+
model: chunk.response.model,
715+
}, {
716+
compactThreshold: this.compactionThreshold,
717+
promptTokens,
718+
totalTokens,
719+
});
720+
this.logService.debug(`[responsesAPI_compaction] Compaction enabled. headerRequestId=${this.requestId}`);
721+
} else if (this.compactionThreshold !== undefined && (chunk.response.usage?.input_tokens ?? 0) >= this.compactionThreshold) {
722+
const promptTokens = chunk.response.usage?.input_tokens ?? 0;
723+
const totalTokens = chunk.response.usage?.total_tokens ?? 0;
724+
sendResponsesApiCompactionTelemetry(this.telemetryService, {
725+
outcome: 'threshold_met_no_compaction',
726+
headerRequestId: this.requestId,
727+
gitHubRequestId: this.ghRequestId,
728+
model: chunk.response.model,
729+
}, {
730+
compactThreshold: this.compactionThreshold,
731+
promptTokens,
732+
totalTokens,
733+
});
734+
this.logService.debug(`[responsesAPI_compaction] Compaction enabled but context not compacted after threshold was met. headerRequestId=${this.requestId}, gitHubRequestId=${this.ghRequestId}, promptTokens=${promptTokens}, totalTokens=${totalTokens}`);
735+
}
736+
onProgress({
737+
text: '',
738+
statefulMarker: chunk.response.id,
739+
contextManagement: shouldEmitResolvedCompaction ? latestCompactionItem : undefined,
740+
});
591741
return {
592742
blockFinished: true,
593743
choiceIndex: 0,
@@ -611,7 +761,7 @@ export class OpenAIResponsesProcessor {
611761
finishReason: FinishedCompletionReason.Stop,
612762
message: {
613763
role: Raw.ChatRole.Assistant,
614-
content: chunk.response.output.map((item): Raw.ChatCompletionContentPart | undefined => {
764+
content: normalizedOutput.map((item): Raw.ChatCompletionContentPart | undefined => {
615765
if (item.type === 'message') {
616766
return { type: Raw.ChatCompletionContentPartKind.Text, text: item.content.map(c => c.type === 'output_text' ? c.text : c.refusal).join('') };
617767
} else if (item.type === 'image_generation_call' && item.result) {
@@ -620,6 +770,7 @@ export class OpenAIResponsesProcessor {
620770
}).filter(isDefined),
621771
}
622772
};
773+
}
623774
}
624775
}
625776
}

0 commit comments

Comments
 (0)