Skip to content
This repository was archived by the owner on May 20, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/extension/externalAgents/node/oaiLanguageModelServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type OpenAI from 'openai';
import { IChatMLFetcher, Source } from '../../../platform/chat/common/chatMLFetcher';
import { ChatLocation, ChatResponse } from '../../../platform/chat/common/commonTypes';
import { CustomModel, EndpointEditToolName, IEndpointProvider } from '../../../platform/endpoint/common/endpointProvider';
import { OpenAIResponsesProcessor, responseApiInputToRawMessagesForLogging } from '../../../platform/endpoint/node/responsesApi';
import { getResponsesApiCompactionThresholdFromBody, OpenAIResponsesProcessor, responseApiInputToRawMessagesForLogging } from '../../../platform/endpoint/node/responsesApi';
import { ILogService } from '../../../platform/log/common/logService';
import { FinishedCallback, OptionalChatRequestParams } from '../../../platform/networking/common/fetch';
import { Response } from '../../../platform/networking/common/fetcherService';
Expand Down Expand Up @@ -455,7 +455,7 @@ class StreamingPassThroughEndpoint implements IChatEndpoint {
// We parse the stream just to return a correct ChatCompletion for logging the response and token usage details.
const requestId = response.headers.get('X-Request-ID') ?? generateUuid();
const ghRequestId = response.headers.get('x-github-request-id') ?? '';
const processor = this.instantiationService.createInstance(OpenAIResponsesProcessor, telemetryData, requestId, ghRequestId);
const processor = this.instantiationService.createInstance(OpenAIResponsesProcessor, telemetryData, telemetryService, requestId, ghRequestId, getResponsesApiCompactionThresholdFromBody(this.requestBody));
const parser = new SSEParser((ev) => {
try {
logService.trace(`[StreamingPassThroughEndpoint] SSE: ${ev.data}`);
Expand Down
4 changes: 2 additions & 2 deletions src/extension/prompt/node/chatMLFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { IInteractionService } from '../../../platform/chat/common/interactionSe
import { ConfigKey, HARD_TOOL_LIMIT, IConfigurationService } from '../../../platform/configuration/common/configurationService';
import { ICAPIClientService } from '../../../platform/endpoint/common/capiClient';
import { isAutoModel } from '../../../platform/endpoint/node/autoChatEndpoint';
import { OpenAIResponsesProcessor, responseApiInputToRawMessagesForLogging, sendCompletionOutputTelemetry } from '../../../platform/endpoint/node/responsesApi';
import { getResponsesApiCompactionThresholdFromBody, OpenAIResponsesProcessor, responseApiInputToRawMessagesForLogging, sendCompletionOutputTelemetry } from '../../../platform/endpoint/node/responsesApi';
import { collectSingleLineErrorMessage, ILogService } from '../../../platform/log/common/logService';
import { isAnthropicToolSearchEnabled } from '../../../platform/networking/common/anthropic';
import { FinishedCallback, getRequestId, IResponseDelta, OptionalChatRequestParams, RequestId } from '../../../platform/networking/common/fetch';
Expand Down Expand Up @@ -1102,7 +1102,7 @@ export class ChatMLFetcherImpl extends AbstractChatMLFetcher {
const handle = connection.sendRequest(request, { userInitiated: !!userInitiatedRequest, turnId }, cancellationToken);

const extendedBaseTelemetryData = baseTelemetryData.extendedBy({ modelCallId });
const processor = this._instantiationService.createInstance(OpenAIResponsesProcessor, extendedBaseTelemetryData, modelRequestId.headerRequestId, modelRequestId.gitHubRequestId);
const processor = this._instantiationService.createInstance(OpenAIResponsesProcessor, extendedBaseTelemetryData, this._telemetryService, modelRequestId.headerRequestId, modelRequestId.gitHubRequestId, getResponsesApiCompactionThresholdFromBody(request));

// Set up streaming first so event listeners are registered before we
// await the first event — AsyncIterableObject runs its executor eagerly.
Expand Down
5 changes: 3 additions & 2 deletions src/platform/endpoint/node/chatEndpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import { isGeminiFamily } from '../common/chatModelCapabilities';
import { IDomainService } from '../common/domainService';
import { CustomModel, IChatModelInformation, ModelSupportedEndpoint } from '../common/endpointProvider';
import { createMessagesRequestBody, processResponseFromMessagesEndpoint } from './messagesApi';
import { createResponsesRequestBody, processResponseFromChatEndpoint } from './responsesApi';
import { createResponsesRequestBody, getResponsesApiCompactionThreshold, processResponseFromChatEndpoint } from './responsesApi';

/**
* The default processor for the stream format from CAPI
Expand Down Expand Up @@ -368,7 +368,8 @@ export class ChatEndpoint implements IChatEndpoint {
cancellationToken?: CancellationToken | undefined
): Promise<AsyncIterableObject<ChatCompletion>> {
if (this.useResponsesApi) {
return processResponseFromChatEndpoint(this._instantiationService, telemetryService, logService, response, expectedNumChoices, finishCallback, telemetryData);
const compactionThreshold = getResponsesApiCompactionThreshold(this._configurationService, this._expService, this);
return processResponseFromChatEndpoint(this._instantiationService, telemetryService, logService, response, expectedNumChoices, finishCallback, telemetryData, compactionThreshold);
} else if (this.useMessagesApi) {
return processResponseFromMessagesEndpoint(this._instantiationService, telemetryService, logService, response, finishCallback, telemetryData);
} else if (!this._supportsStreaming) {
Expand Down
183 changes: 167 additions & 16 deletions src/platform/endpoint/node/responsesApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { ILogService } from '../../log/common/logService';
import { FinishedCallback, IResponseDelta, OpenAiResponsesFunctionTool } from '../../networking/common/fetch';
import { IChatEndpoint, ICreateEndpointBodyOptions, IEndpointBody } from '../../networking/common/networking';
import { ChatCompletion, FinishedCompletionReason, modelsWithoutResponsesContextManagement, openAIContextManagementCompactionType, OpenAIContextManagementResponse, rawMessageToCAPI, TokenLogProb } from '../../networking/common/openai';
import { sendEngineMessagesTelemetry } from '../../networking/node/chatStream';
import { sendEngineMessagesTelemetry, sendResponsesApiCompactionTelemetry } from '../../networking/node/chatStream';
import { IExperimentationService } from '../../telemetry/common/nullExperimentationService';
import { ITelemetryService } from '../../telemetry/common/telemetry';
import { TelemetryData } from '../../telemetry/common/telemetryData';
Expand All @@ -29,10 +29,22 @@ import { rawPartAsPhaseData } from '../common/phaseDataContainer';
import { getStatefulMarkerAndIndex } from '../common/statefulMarkerContainer';
import { rawPartAsThinkingData } from '../common/thinkingDataContainer';

export function getResponsesApiCompactionThreshold(configService: IConfigurationService, expService: IExperimentationService, endpoint: IChatEndpoint): number | undefined {
const contextManagementEnabled = configService.getExperimentBasedConfig(ConfigKey.ResponsesApiContextManagementEnabled, expService) && !modelsWithoutResponsesContextManagement.has(endpoint.family);
if (!contextManagementEnabled) {
return undefined;
}

return endpoint.modelMaxPromptTokens > 0
? Math.floor(endpoint.modelMaxPromptTokens * 0.9)
: 50000;
}

export function createResponsesRequestBody(accessor: ServicesAccessor, options: ICreateEndpointBodyOptions, model: string, endpoint: IChatEndpoint): IEndpointBody {
const configService = accessor.get(IConfigurationService);
const expService = accessor.get(IExperimentationService);
const verbosity = getVerbosityForModelSync(endpoint);
const compactThreshold = getResponsesApiCompactionThreshold(configService, expService, endpoint);
// compaction supported for all the models but works well for codex models and any future models after 5.3

const body: IEndpointBody = {
Expand All @@ -56,11 +68,7 @@ export function createResponsesRequestBody(accessor: ServicesAccessor, options:
text: verbosity ? { verbosity } : undefined,
};

const contextManagementEnabled = configService.getExperimentBasedConfig(ConfigKey.ResponsesApiContextManagementEnabled, expService) && !modelsWithoutResponsesContextManagement.has(endpoint.family);
if (contextManagementEnabled) {
const compactThreshold = endpoint.modelMaxPromptTokens > 0
? Math.floor(endpoint.modelMaxPromptTokens * 0.9)
: 50000;
if (compactThreshold !== undefined) {
body.context_management = [{
'type': openAIContextManagementCompactionType,
// Trigger compaction at 90% of the model max prompt context to keep headroom for active turns.
Expand Down Expand Up @@ -93,6 +101,21 @@ export function createResponsesRequestBody(accessor: ServicesAccessor, options:
return body;
}

export function getResponsesApiCompactionThresholdFromBody(body: Pick<IEndpointBody, 'context_management'>): number | undefined {
const contextManagement = body.context_management;
if (!Array.isArray(contextManagement)) {
return undefined;
}

for (const item of contextManagement) {
if (item.type === openAIContextManagementCompactionType && typeof item.compact_threshold === 'number') {
return item.compact_threshold;
}
}

return undefined;
}

type ResponseOutputMessageWithPhase = OpenAI.Responses.ResponseOutputMessage & {
phase?: string;
};
Expand All @@ -101,17 +124,33 @@ interface ResponseOutputItemWithPhase {
phase?: string;
}

interface LatestCompactionOutput {
readonly item: OpenAIContextManagementResponse;
readonly outputIndex: number;
}

function rawMessagesToResponseAPI(modelId: string, messages: readonly Raw.ChatMessage[], ignoreStatefulMarker: boolean): { input: OpenAI.Responses.ResponseInputItem[]; previous_response_id?: string } {
const latestCompactionMessageIndex = getLatestCompactionMessageIndex(messages);
if (latestCompactionMessageIndex !== undefined) {
messages = messages.slice(latestCompactionMessageIndex);
}

const latestCompactionMessage = latestCompactionMessageIndex !== undefined ? createCompactionRoundTripMessage(messages[latestCompactionMessageIndex]) : undefined;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looks like this could be moved into the if block below

const statefulMarkerAndIndex = !ignoreStatefulMarker && getStatefulMarkerAndIndex(modelId, messages);

let previousResponseId: string | undefined;
if (latestCompactionMessageIndex === undefined && statefulMarkerAndIndex) {
if (statefulMarkerAndIndex) {
previousResponseId = statefulMarkerAndIndex.statefulMarker;

// Requests that resume from previous_response_id send only post-marker history,
// but they still need the latest compaction item even when that item predates
// the marker. This keeps both websocket and non-websocket traffic aligned.
messages = messages.slice(statefulMarkerAndIndex.index + 1);
if (latestCompactionMessageIndex !== undefined) {
if (latestCompactionMessageIndex > statefulMarkerAndIndex.index) {
messages = messages.slice(latestCompactionMessageIndex - (statefulMarkerAndIndex.index + 1));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still set previousResponseId in this case?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is no longer valid. I had to leave this, thought I closed it and made some changes in this commit: microsoft/vscode@5207a1a

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validated the changes and for websockets compaction is being sent.

} else if (latestCompactionMessage) {
messages = [latestCompactionMessage, ...messages];
}
}
} else if (latestCompactionMessageIndex !== undefined) {
messages = messages.slice(latestCompactionMessageIndex);
}

const input: OpenAI.Responses.ResponseInputItem[] = [];
Expand Down Expand Up @@ -174,6 +213,22 @@ function rawMessagesToResponseAPI(modelId: string, messages: readonly Raw.ChatMe
return { input, previous_response_id: previousResponseId };
}

function createCompactionRoundTripMessage(message: Raw.ChatMessage): Raw.ChatMessage | undefined {
if (message.role !== Raw.ChatRole.Assistant) {
return undefined;
}

const content = message.content.filter(part => part.type === Raw.ChatCompletionContentPartKind.Opaque && rawPartAsCompactionData(part));
if (!content.length) {
return undefined;
}

return {
role: Raw.ChatRole.Assistant,
content,
};
}

function getLatestCompactionMessageIndex(messages: readonly Raw.ChatMessage[]): number | undefined {
for (let idx = messages.length - 1; idx >= 0; idx--) {
const message = messages[idx];
Expand Down Expand Up @@ -426,11 +481,44 @@ function responseFunctionOutputToRawContents(output: string | OpenAI.Responses.R
return coalesce(output.map(responseContentToRawContent));
}

export async function processResponseFromChatEndpoint(instantiationService: IInstantiationService, telemetryService: ITelemetryService, logService: ILogService, response: Response, expectedNumChoices: number, finishCallback: FinishedCallback, telemetryData: TelemetryData): Promise<AsyncIterableObject<ChatCompletion>> {
function isCompactionOutputItem(item: OpenAI.Responses.ResponseOutputItem): boolean {
return item.type.toString() === openAIContextManagementCompactionType;
}

function getLatestCompactionOutput(output: OpenAI.Responses.ResponseOutputItem[], preferredOutputIndex: number | undefined): LatestCompactionOutput | undefined {
let latestCompactionOutput: LatestCompactionOutput | undefined;
for (let idx = output.length - 1; idx >= 0; idx--) {
const item = output[idx];
if (isCompactionOutputItem(item)) {
latestCompactionOutput = { item: item as unknown as OpenAIContextManagementResponse, outputIndex: idx };
break;
}
}

if (preferredOutputIndex !== undefined) {
const preferredItem = output[preferredOutputIndex];
if (preferredItem && isCompactionOutputItem(preferredItem) && (!latestCompactionOutput || preferredOutputIndex >= latestCompactionOutput.outputIndex)) {
return { item: preferredItem as unknown as OpenAIContextManagementResponse, outputIndex: preferredOutputIndex };
}
}

return latestCompactionOutput;
}

function keepLatestCompactionOutput(output: OpenAI.Responses.ResponseOutputItem[], preferredOutputIndex: number | undefined): OpenAI.Responses.ResponseOutputItem[] {
const latestCompactionOutput = getLatestCompactionOutput(output, preferredOutputIndex);
if (!latestCompactionOutput) {
return output;
}

return output.filter((item, idx) => !isCompactionOutputItem(item) || idx === latestCompactionOutput.outputIndex);
}

export async function processResponseFromChatEndpoint(instantiationService: IInstantiationService, telemetryService: ITelemetryService, logService: ILogService, response: Response, expectedNumChoices: number, finishCallback: FinishedCallback, telemetryData: TelemetryData, compactionThreshold?: number): Promise<AsyncIterableObject<ChatCompletion>> {
return new AsyncIterableObject<ChatCompletion>(async feed => {
const requestId = response.headers.get('X-Request-ID') ?? generateUuid();
const ghRequestId = response.headers.get('x-github-request-id') ?? '';
const processor = instantiationService.createInstance(OpenAIResponsesProcessor, telemetryData, requestId, ghRequestId);
const processor = instantiationService.createInstance(OpenAIResponsesProcessor, telemetryData, telemetryService, requestId, ghRequestId, compactionThreshold);
const parser = new SSEParser((ev) => {
try {
logService.trace(`SSE: ${ev.data}`);
Expand Down Expand Up @@ -472,13 +560,19 @@ interface CapiResponsesTextDeltaEvent extends Omit<OpenAI.Responses.ResponseText
export class OpenAIResponsesProcessor {
private textAccumulator: string = '';
private hasReceivedReasoningSummary = false;
private sawCompactionMessage = false;
private latestCompactionOutputIndex: number | undefined;
private latestCompactionItem: OpenAIContextManagementResponse | undefined;
/** Maps output_index to { name, callId, arguments } for streaming tool call updates */
private readonly toolCallInfo = new Map<number, { name: string; callId: string; arguments: string }>();

constructor(
private readonly telemetryData: TelemetryData,
private readonly telemetryService: ITelemetryService,
private readonly requestId: string,
private readonly ghRequestId: string,
private readonly compactionThreshold: number | undefined,
@ILogService private readonly logService: ILogService,
) { }

public push(chunk: OpenAI.Responses.ResponseStreamEvent, _onProgress: FinishedCallback): ChatCompletion | undefined {
Expand Down Expand Up @@ -530,6 +624,12 @@ export class OpenAIResponsesProcessor {
case 'response.output_item.done':
if (chunk.item.type.toString() === openAIContextManagementCompactionType) {
const compactionItem = chunk.item as unknown as OpenAIContextManagementResponse;
if (this.latestCompactionOutputIndex !== undefined && chunk.output_index < this.latestCompactionOutputIndex) {
return;
}
this.latestCompactionOutputIndex = chunk.output_index;
this.latestCompactionItem = compactionItem;
this.sawCompactionMessage = true;
return onProgress({
text: '',
contextManagement: {
Expand Down Expand Up @@ -586,8 +686,58 @@ export class OpenAIResponsesProcessor {
id: chunk.item_id
}
});
case 'response.completed':
onProgress({ text: '', statefulMarker: chunk.response.id });
case 'response.completed': {
const normalizedOutput = keepLatestCompactionOutput(chunk.response.output, this.latestCompactionOutputIndex);
const latestCompactionOutput = getLatestCompactionOutput(normalizedOutput, this.latestCompactionOutputIndex);
const latestCompactionItem = latestCompactionOutput?.item;
const previousCompactionItem = this.latestCompactionItem;
if (latestCompactionItem) {
this.sawCompactionMessage = true;
this.latestCompactionOutputIndex = latestCompactionOutput.outputIndex;
}

const shouldEmitResolvedCompaction = latestCompactionItem && (
!previousCompactionItem ||
previousCompactionItem.id !== latestCompactionItem.id ||
previousCompactionItem.encrypted_content !== latestCompactionItem.encrypted_content
);
if (latestCompactionItem) {
this.latestCompactionItem = latestCompactionItem;
}
if (this.compactionThreshold !== undefined && this.sawCompactionMessage) {
const promptTokens = chunk.response.usage?.input_tokens ?? 0;
const totalTokens = chunk.response.usage?.total_tokens ?? 0;
sendResponsesApiCompactionTelemetry(this.telemetryService, {
outcome: 'compaction_returned',
headerRequestId: this.requestId,
gitHubRequestId: this.ghRequestId,
model: chunk.response.model,
}, {
compactThreshold: this.compactionThreshold,
promptTokens,
totalTokens,
});
this.logService.debug(`[responsesAPI_compaction] Compaction enabled. headerRequestId=${this.requestId}`);
} else if (this.compactionThreshold !== undefined && (chunk.response.usage?.input_tokens ?? 0) >= this.compactionThreshold) {
Comment thread
dileepyavan marked this conversation as resolved.
const promptTokens = chunk.response.usage?.input_tokens ?? 0;
const totalTokens = chunk.response.usage?.total_tokens ?? 0;
sendResponsesApiCompactionTelemetry(this.telemetryService, {
outcome: 'threshold_met_no_compaction',
headerRequestId: this.requestId,
gitHubRequestId: this.ghRequestId,
model: chunk.response.model,
}, {
compactThreshold: this.compactionThreshold,
promptTokens,
totalTokens,
});
this.logService.debug(`[responsesAPI_compaction] Compaction enabled but context not compacted after threshold was met. headerRequestId=${this.requestId}, gitHubRequestId=${this.ghRequestId}, promptTokens=${promptTokens}, totalTokens=${totalTokens}`);
}
onProgress({
text: '',
statefulMarker: chunk.response.id,
contextManagement: shouldEmitResolvedCompaction ? latestCompactionItem : undefined,
});
return {
blockFinished: true,
choiceIndex: 0,
Expand All @@ -611,7 +761,7 @@ export class OpenAIResponsesProcessor {
finishReason: FinishedCompletionReason.Stop,
message: {
role: Raw.ChatRole.Assistant,
content: chunk.response.output.map((item): Raw.ChatCompletionContentPart | undefined => {
content: normalizedOutput.map((item): Raw.ChatCompletionContentPart | undefined => {
if (item.type === 'message') {
return { type: Raw.ChatCompletionContentPartKind.Text, text: item.content.map(c => c.type === 'output_text' ? c.text : c.refusal).join('') };
} else if (item.type === 'image_generation_call' && item.result) {
Expand All @@ -620,6 +770,7 @@ export class OpenAIResponsesProcessor {
}).filter(isDefined),
}
};
}
}
}
}
Expand Down
Loading
Loading