Skip to content

Commit 5207a1a

Browse files
dileepyavanCopilot
andauthored
Update Responses API compaction and telemetry (#308436)
* fixing compaction * Update extensions/copilot/src/platform/networking/node/chatStream.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent e62187b commit 5207a1a

6 files changed

Lines changed: 710 additions & 23 deletions

File tree

extensions/copilot/src/extension/externalAgents/node/oaiLanguageModelServer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import type OpenAI from 'openai';
1010
import { IChatMLFetcher, Source } from '../../../platform/chat/common/chatMLFetcher';
1111
import { ChatLocation, ChatResponse } from '../../../platform/chat/common/commonTypes';
1212
import { CustomModel, EndpointEditToolName, IEndpointProvider } from '../../../platform/endpoint/common/endpointProvider';
13-
import { OpenAIResponsesProcessor, responseApiInputToRawMessagesForLogging } from '../../../platform/endpoint/node/responsesApi';
13+
import { getResponsesApiCompactionThresholdFromBody, OpenAIResponsesProcessor, responseApiInputToRawMessagesForLogging } from '../../../platform/endpoint/node/responsesApi';
1414
import { ILogService } from '../../../platform/log/common/logService';
1515
import { FinishedCallback, OptionalChatRequestParams } from '../../../platform/networking/common/fetch';
1616
import { Response } from '../../../platform/networking/common/fetcherService';
@@ -455,7 +455,7 @@ class StreamingPassThroughEndpoint implements IChatEndpoint {
455455
// We parse the stream just to return a correct ChatCompletion for logging the response and token usage details.
456456
const requestId = response.headers.get('X-Request-ID') ?? generateUuid();
457457
const ghRequestId = response.headers.get('x-github-request-id') ?? '';
458-
const processor = this.instantiationService.createInstance(OpenAIResponsesProcessor, telemetryData, requestId, ghRequestId);
458+
const processor = this.instantiationService.createInstance(OpenAIResponsesProcessor, telemetryData, telemetryService, requestId, ghRequestId, getResponsesApiCompactionThresholdFromBody(this.requestBody));
459459
const parser = new SSEParser((ev) => {
460460
try {
461461
logService.trace(`[StreamingPassThroughEndpoint] SSE: ${ev.data}`);

extensions/copilot/src/extension/prompt/node/chatMLFetcher.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import { IInteractionService } from '../../../platform/chat/common/interactionSe
1717
import { ConfigKey, HARD_TOOL_LIMIT, IConfigurationService } from '../../../platform/configuration/common/configurationService';
1818
import { ICAPIClientService } from '../../../platform/endpoint/common/capiClient';
1919
import { isAutoModel } from '../../../platform/endpoint/node/autoChatEndpoint';
20-
import { OpenAIResponsesProcessor, responseApiInputToRawMessagesForLogging, sendCompletionOutputTelemetry } from '../../../platform/endpoint/node/responsesApi';
20+
import { getResponsesApiCompactionThresholdFromBody, OpenAIResponsesProcessor, responseApiInputToRawMessagesForLogging, sendCompletionOutputTelemetry } from '../../../platform/endpoint/node/responsesApi';
2121
import { collectSingleLineErrorMessage, ILogService } from '../../../platform/log/common/logService';
2222
import { isAnthropicToolSearchEnabled } from '../../../platform/networking/common/anthropic';
2323
import { FinishedCallback, getRequestId, IResponseDelta, OptionalChatRequestParams, RequestId } from '../../../platform/networking/common/fetch';
@@ -1103,7 +1103,7 @@ export class ChatMLFetcherImpl extends AbstractChatMLFetcher {
11031103
const handle = connection.sendRequest(request, { userInitiated: !!userInitiatedRequest, turnId, requestId: ourRequestId }, cancellationToken);
11041104

11051105
const extendedBaseTelemetryData = baseTelemetryData.extendedBy({ modelCallId });
1106-
const processor = this._instantiationService.createInstance(OpenAIResponsesProcessor, extendedBaseTelemetryData, modelRequestId.headerRequestId, modelRequestId.gitHubRequestId);
1106+
const processor = this._instantiationService.createInstance(OpenAIResponsesProcessor, extendedBaseTelemetryData, this._telemetryService, modelRequestId.headerRequestId, modelRequestId.gitHubRequestId, getResponsesApiCompactionThresholdFromBody(request));
11071107

11081108
// Set up streaming first so event listeners are registered before we
11091109
// await the first event — AsyncIterableObject runs its executor eagerly.

extensions/copilot/src/platform/endpoint/node/chatEndpoint.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import { isGeminiFamily, modelSupportsContextEditing, modelSupportsToolSearch }
3434
import { IDomainService } from '../common/domainService';
3535
import { CustomModel, IChatModelInformation, ModelSupportedEndpoint } from '../common/endpointProvider';
3636
import { createMessagesRequestBody, processResponseFromMessagesEndpoint } from './messagesApi';
37-
import { createResponsesRequestBody, processResponseFromChatEndpoint } from './responsesApi';
37+
import { createResponsesRequestBody, getResponsesApiCompactionThreshold, processResponseFromChatEndpoint } from './responsesApi';
3838

3939
/**
4040
* The default processor for the stream format from CAPI
@@ -366,7 +366,8 @@ export class ChatEndpoint implements IChatEndpoint {
366366
cancellationToken?: CancellationToken | undefined
367367
): Promise<AsyncIterableObject<ChatCompletion>> {
368368
if (this.useResponsesApi) {
369-
return processResponseFromChatEndpoint(this._instantiationService, telemetryService, logService, response, expectedNumChoices, finishCallback, telemetryData);
369+
const compactionThreshold = getResponsesApiCompactionThreshold(this._configurationService, this._expService, this);
370+
return processResponseFromChatEndpoint(this._instantiationService, telemetryService, logService, response, expectedNumChoices, finishCallback, telemetryData, compactionThreshold);
370371
} else if (this.useMessagesApi) {
371372
return processResponseFromMessagesEndpoint(this._instantiationService, telemetryService, logService, response, finishCallback, telemetryData);
372373
} else if (!this._supportsStreaming) {

extensions/copilot/src/platform/endpoint/node/responsesApi.ts

Lines changed: 167 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import { ILogService } from '../../log/common/logService';
1919
import { FinishedCallback, IResponseDelta, OpenAiResponsesFunctionTool } from '../../networking/common/fetch';
2020
import { IChatEndpoint, ICreateEndpointBodyOptions, IEndpointBody } from '../../networking/common/networking';
2121
import { ChatCompletion, FinishedCompletionReason, modelsWithoutResponsesContextManagement, openAIContextManagementCompactionType, OpenAIContextManagementResponse, rawMessageToCAPI, TokenLogProb } from '../../networking/common/openai';
22-
import { sendEngineMessagesTelemetry } from '../../networking/node/chatStream';
22+
import { sendEngineMessagesTelemetry, sendResponsesApiCompactionTelemetry } from '../../networking/node/chatStream';
2323
import { IExperimentationService } from '../../telemetry/common/nullExperimentationService';
2424
import { ITelemetryService } from '../../telemetry/common/telemetry';
2525
import { TelemetryData } from '../../telemetry/common/telemetryData';
@@ -29,10 +29,22 @@ import { rawPartAsPhaseData } from '../common/phaseDataContainer';
2929
import { getStatefulMarkerAndIndex } from '../common/statefulMarkerContainer';
3030
import { rawPartAsThinkingData } from '../common/thinkingDataContainer';
3131

32+
export function getResponsesApiCompactionThreshold(configService: IConfigurationService, expService: IExperimentationService, endpoint: IChatEndpoint): number | undefined {
33+
const contextManagementEnabled = configService.getExperimentBasedConfig(ConfigKey.ResponsesApiContextManagementEnabled, expService) && !modelsWithoutResponsesContextManagement.has(endpoint.family);
34+
if (!contextManagementEnabled) {
35+
return undefined;
36+
}
37+
38+
return endpoint.modelMaxPromptTokens > 0
39+
? Math.floor(endpoint.modelMaxPromptTokens * 0.9)
40+
: 50000;
41+
}
42+
3243
export function createResponsesRequestBody(accessor: ServicesAccessor, options: ICreateEndpointBodyOptions, model: string, endpoint: IChatEndpoint): IEndpointBody {
3344
const configService = accessor.get(IConfigurationService);
3445
const expService = accessor.get(IExperimentationService);
3546
const verbosity = getVerbosityForModelSync(endpoint);
47+
const compactThreshold = getResponsesApiCompactionThreshold(configService, expService, endpoint);
3648
// compaction supported for all the models but works well for codex models and any future models after 5.3
3749

3850
const body: IEndpointBody = {
@@ -56,11 +68,7 @@ export function createResponsesRequestBody(accessor: ServicesAccessor, options:
5668
text: verbosity ? { verbosity } : undefined,
5769
};
5870

59-
const contextManagementEnabled = configService.getExperimentBasedConfig(ConfigKey.ResponsesApiContextManagementEnabled, expService) && !modelsWithoutResponsesContextManagement.has(endpoint.family);
60-
if (contextManagementEnabled) {
61-
const compactThreshold = endpoint.modelMaxPromptTokens > 0
62-
? Math.floor(endpoint.modelMaxPromptTokens * 0.9)
63-
: 50000;
71+
if (compactThreshold !== undefined) {
6472
body.context_management = [{
6573
'type': openAIContextManagementCompactionType,
6674
// Trigger compaction at 90% of the model max prompt context to keep headroom for active turns.
@@ -95,6 +103,21 @@ export function createResponsesRequestBody(accessor: ServicesAccessor, options:
95103
return body;
96104
}
97105

106+
export function getResponsesApiCompactionThresholdFromBody(body: Pick<IEndpointBody, 'context_management'>): number | undefined {
107+
const contextManagement = body.context_management;
108+
if (!Array.isArray(contextManagement)) {
109+
return undefined;
110+
}
111+
112+
for (const item of contextManagement) {
113+
if (item.type === openAIContextManagementCompactionType && typeof item.compact_threshold === 'number') {
114+
return item.compact_threshold;
115+
}
116+
}
117+
118+
return undefined;
119+
}
120+
98121
type ResponseOutputMessageWithPhase = OpenAI.Responses.ResponseOutputMessage & {
99122
phase?: string;
100123
};
@@ -103,17 +126,33 @@ interface ResponseOutputItemWithPhase {
103126
phase?: string;
104127
}
105128

129+
interface LatestCompactionOutput {
130+
readonly item: OpenAIContextManagementResponse;
131+
readonly outputIndex: number;
132+
}
133+
106134
function rawMessagesToResponseAPI(modelId: string, messages: readonly Raw.ChatMessage[], ignoreStatefulMarker: boolean): { input: OpenAI.Responses.ResponseInputItem[]; previous_response_id?: string } {
107135
const latestCompactionMessageIndex = getLatestCompactionMessageIndex(messages);
108-
if (latestCompactionMessageIndex !== undefined) {
109-
messages = messages.slice(latestCompactionMessageIndex);
110-
}
111-
136+
const latestCompactionMessage = latestCompactionMessageIndex !== undefined ? createCompactionRoundTripMessage(messages[latestCompactionMessageIndex]) : undefined;
112137
const statefulMarkerAndIndex = !ignoreStatefulMarker && getStatefulMarkerAndIndex(modelId, messages);
138+
113139
let previousResponseId: string | undefined;
114-
if (latestCompactionMessageIndex === undefined && statefulMarkerAndIndex) {
140+
if (statefulMarkerAndIndex) {
115141
previousResponseId = statefulMarkerAndIndex.statefulMarker;
142+
143+
// Requests that resume from previous_response_id send only post-marker history,
144+
// but they still need the latest compaction item even when that item predates
145+
// the marker. This keeps both websocket and non-websocket traffic aligned.
116146
messages = messages.slice(statefulMarkerAndIndex.index + 1);
147+
if (latestCompactionMessageIndex !== undefined) {
148+
if (latestCompactionMessageIndex > statefulMarkerAndIndex.index) {
149+
messages = messages.slice(latestCompactionMessageIndex - (statefulMarkerAndIndex.index + 1));
150+
} else if (latestCompactionMessage) {
151+
messages = [latestCompactionMessage, ...messages];
152+
}
153+
}
154+
} else if (latestCompactionMessageIndex !== undefined) {
155+
messages = messages.slice(latestCompactionMessageIndex);
117156
}
118157

119158
const input: OpenAI.Responses.ResponseInputItem[] = [];
@@ -176,6 +215,22 @@ function rawMessagesToResponseAPI(modelId: string, messages: readonly Raw.ChatMe
176215
return { input, previous_response_id: previousResponseId };
177216
}
178217

218+
function createCompactionRoundTripMessage(message: Raw.ChatMessage): Raw.ChatMessage | undefined {
219+
if (message.role !== Raw.ChatRole.Assistant) {
220+
return undefined;
221+
}
222+
223+
const content = message.content.filter(part => part.type === Raw.ChatCompletionContentPartKind.Opaque && rawPartAsCompactionData(part));
224+
if (!content.length) {
225+
return undefined;
226+
}
227+
228+
return {
229+
role: Raw.ChatRole.Assistant,
230+
content,
231+
};
232+
}
233+
179234
function getLatestCompactionMessageIndex(messages: readonly Raw.ChatMessage[]): number | undefined {
180235
for (let idx = messages.length - 1; idx >= 0; idx--) {
181236
const message = messages[idx];
@@ -428,11 +483,44 @@ function responseFunctionOutputToRawContents(output: string | OpenAI.Responses.R
428483
return coalesce(output.map(responseContentToRawContent));
429484
}
430485

431-
export async function processResponseFromChatEndpoint(instantiationService: IInstantiationService, telemetryService: ITelemetryService, logService: ILogService, response: Response, expectedNumChoices: number, finishCallback: FinishedCallback, telemetryData: TelemetryData): Promise<AsyncIterableObject<ChatCompletion>> {
486+
function isCompactionOutputItem(item: OpenAI.Responses.ResponseOutputItem): boolean {
487+
return item.type.toString() === openAIContextManagementCompactionType;
488+
}
489+
490+
function getLatestCompactionOutput(output: OpenAI.Responses.ResponseOutputItem[], preferredOutputIndex: number | undefined): LatestCompactionOutput | undefined {
491+
let latestCompactionOutput: LatestCompactionOutput | undefined;
492+
for (let idx = output.length - 1; idx >= 0; idx--) {
493+
const item = output[idx];
494+
if (isCompactionOutputItem(item)) {
495+
latestCompactionOutput = { item: item as unknown as OpenAIContextManagementResponse, outputIndex: idx };
496+
break;
497+
}
498+
}
499+
500+
if (preferredOutputIndex !== undefined) {
501+
const preferredItem = output[preferredOutputIndex];
502+
if (preferredItem && isCompactionOutputItem(preferredItem) && (!latestCompactionOutput || preferredOutputIndex >= latestCompactionOutput.outputIndex)) {
503+
return { item: preferredItem as unknown as OpenAIContextManagementResponse, outputIndex: preferredOutputIndex };
504+
}
505+
}
506+
507+
return latestCompactionOutput;
508+
}
509+
510+
function keepLatestCompactionOutput(output: OpenAI.Responses.ResponseOutputItem[], preferredOutputIndex: number | undefined): OpenAI.Responses.ResponseOutputItem[] {
511+
const latestCompactionOutput = getLatestCompactionOutput(output, preferredOutputIndex);
512+
if (!latestCompactionOutput) {
513+
return output;
514+
}
515+
516+
return output.filter((item, idx) => !isCompactionOutputItem(item) || idx === latestCompactionOutput.outputIndex);
517+
}
518+
519+
export async function processResponseFromChatEndpoint(instantiationService: IInstantiationService, telemetryService: ITelemetryService, logService: ILogService, response: Response, expectedNumChoices: number, finishCallback: FinishedCallback, telemetryData: TelemetryData, compactionThreshold?: number): Promise<AsyncIterableObject<ChatCompletion>> {
432520
return new AsyncIterableObject<ChatCompletion>(async feed => {
433521
const requestId = response.headers.get('X-Request-ID') ?? generateUuid();
434522
const ghRequestId = response.headers.get('x-github-request-id') ?? '';
435-
const processor = instantiationService.createInstance(OpenAIResponsesProcessor, telemetryData, requestId, ghRequestId);
523+
const processor = instantiationService.createInstance(OpenAIResponsesProcessor, telemetryData, telemetryService, requestId, ghRequestId, compactionThreshold);
436524
const parser = new SSEParser((ev) => {
437525
try {
438526
logService.trace(`SSE: ${ev.data}`);
@@ -474,13 +562,19 @@ interface CapiResponsesTextDeltaEvent extends Omit<OpenAI.Responses.ResponseText
474562
export class OpenAIResponsesProcessor {
475563
private textAccumulator: string = '';
476564
private hasReceivedReasoningSummary = false;
565+
private sawCompactionMessage = false;
566+
private latestCompactionOutputIndex: number | undefined;
567+
private latestCompactionItem: OpenAIContextManagementResponse | undefined;
477568
/** Maps output_index to { name, callId, arguments } for streaming tool call updates */
478569
private readonly toolCallInfo = new Map<number, { name: string; callId: string; arguments: string }>();
479570

480571
constructor(
481572
private readonly telemetryData: TelemetryData,
573+
private readonly telemetryService: ITelemetryService,
482574
private readonly requestId: string,
483575
private readonly ghRequestId: string,
576+
private readonly compactionThreshold: number | undefined,
577+
@ILogService private readonly logService: ILogService,
484578
) { }
485579

486580
public push(chunk: OpenAI.Responses.ResponseStreamEvent, _onProgress: FinishedCallback): ChatCompletion | undefined {
@@ -532,6 +626,12 @@ export class OpenAIResponsesProcessor {
532626
case 'response.output_item.done':
533627
if (chunk.item.type.toString() === openAIContextManagementCompactionType) {
534628
const compactionItem = chunk.item as unknown as OpenAIContextManagementResponse;
629+
if (this.latestCompactionOutputIndex !== undefined && chunk.output_index < this.latestCompactionOutputIndex) {
630+
return;
631+
}
632+
this.latestCompactionOutputIndex = chunk.output_index;
633+
this.latestCompactionItem = compactionItem;
634+
this.sawCompactionMessage = true;
535635
return onProgress({
536636
text: '',
537637
contextManagement: {
@@ -588,8 +688,58 @@ export class OpenAIResponsesProcessor {
588688
id: chunk.item_id
589689
}
590690
});
591-
case 'response.completed':
592-
onProgress({ text: '', statefulMarker: chunk.response.id });
691+
case 'response.completed': {
692+
const normalizedOutput = keepLatestCompactionOutput(chunk.response.output, this.latestCompactionOutputIndex);
693+
const latestCompactionOutput = getLatestCompactionOutput(normalizedOutput, this.latestCompactionOutputIndex);
694+
const latestCompactionItem = latestCompactionOutput?.item;
695+
const previousCompactionItem = this.latestCompactionItem;
696+
if (latestCompactionItem) {
697+
this.sawCompactionMessage = true;
698+
this.latestCompactionOutputIndex = latestCompactionOutput.outputIndex;
699+
}
700+
701+
const shouldEmitResolvedCompaction = latestCompactionItem && (
702+
!previousCompactionItem ||
703+
previousCompactionItem.id !== latestCompactionItem.id ||
704+
previousCompactionItem.encrypted_content !== latestCompactionItem.encrypted_content
705+
);
706+
if (latestCompactionItem) {
707+
this.latestCompactionItem = latestCompactionItem;
708+
}
709+
if (this.compactionThreshold !== undefined && this.sawCompactionMessage) {
710+
const promptTokens = chunk.response.usage?.input_tokens ?? 0;
711+
const totalTokens = chunk.response.usage?.total_tokens ?? 0;
712+
sendResponsesApiCompactionTelemetry(this.telemetryService, {
713+
outcome: 'compaction_returned',
714+
headerRequestId: this.requestId,
715+
gitHubRequestId: this.ghRequestId,
716+
model: chunk.response.model,
717+
}, {
718+
compactThreshold: this.compactionThreshold,
719+
promptTokens,
720+
totalTokens,
721+
});
722+
this.logService.debug(`[responsesAPI_compaction] Compaction enabled. headerRequestId=${this.requestId}`);
723+
} else if (this.compactionThreshold !== undefined && (chunk.response.usage?.input_tokens ?? 0) >= this.compactionThreshold) {
724+
const promptTokens = chunk.response.usage?.input_tokens ?? 0;
725+
const totalTokens = chunk.response.usage?.total_tokens ?? 0;
726+
sendResponsesApiCompactionTelemetry(this.telemetryService, {
727+
outcome: 'threshold_met_no_compaction',
728+
headerRequestId: this.requestId,
729+
gitHubRequestId: this.ghRequestId,
730+
model: chunk.response.model,
731+
}, {
732+
compactThreshold: this.compactionThreshold,
733+
promptTokens,
734+
totalTokens,
735+
});
736+
this.logService.debug(`[responsesAPI_compaction] Compaction enabled but context not compacted after threshold was met. headerRequestId=${this.requestId}, gitHubRequestId=${this.ghRequestId}, promptTokens=${promptTokens}, totalTokens=${totalTokens}`);
737+
}
738+
onProgress({
739+
text: '',
740+
statefulMarker: chunk.response.id,
741+
contextManagement: shouldEmitResolvedCompaction ? latestCompactionItem : undefined,
742+
});
593743
return {
594744
blockFinished: true,
595745
choiceIndex: 0,
@@ -613,7 +763,7 @@ export class OpenAIResponsesProcessor {
613763
finishReason: FinishedCompletionReason.Stop,
614764
message: {
615765
role: Raw.ChatRole.Assistant,
616-
content: chunk.response.output.map((item): Raw.ChatCompletionContentPart | undefined => {
766+
content: normalizedOutput.map((item): Raw.ChatCompletionContentPart | undefined => {
617767
if (item.type === 'message') {
618768
return { type: Raw.ChatCompletionContentPartKind.Text, text: item.content.map(c => c.type === 'output_text' ? c.text : c.refusal).join('') };
619769
} else if (item.type === 'image_generation_call' && item.result) {
@@ -622,6 +772,7 @@ export class OpenAIResponsesProcessor {
622772
}).filter(isDefined),
623773
}
624774
};
775+
}
625776
}
626777
}
627778
}

0 commit comments

Comments
 (0)