Skip to content

Commit 5b16a5c

Browse files
authored
ref(core): Extract shared endStreamSpan for AI integrations (#20021)
Extracts duplicated stream finalization logic (setting response attributes + ending the span) into a shared `endStreamSpan` function in `packages/core/src/tracing/ai/utils.ts`, used by OpenAI, Anthropic, and Google GenAI streaming.
1 parent 4b72b41 commit 5b16a5c

File tree

4 files changed

+78
-184
lines changed

4 files changed

+78
-184
lines changed

packages/core/src/tracing/ai/utils.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ import { getClient } from '../../currentScopes';
66
import type { Span } from '../../types-hoist/span';
77
import { isThenable } from '../../utils/is';
88
import {
9+
GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE,
10+
GEN_AI_RESPONSE_ID_ATTRIBUTE,
11+
GEN_AI_RESPONSE_MODEL_ATTRIBUTE,
12+
GEN_AI_RESPONSE_STREAMING_ATTRIBUTE,
13+
GEN_AI_RESPONSE_TEXT_ATTRIBUTE,
14+
GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE,
915
GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE,
1016
GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE,
1117
GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE,
@@ -99,6 +105,68 @@ export function setTokenUsageAttributes(
99105
}
100106
}
101107

108+
export interface StreamResponseState {
109+
responseId?: string;
110+
responseModel?: string;
111+
finishReasons: string[];
112+
responseTexts: string[];
113+
toolCalls: unknown[];
114+
promptTokens?: number;
115+
completionTokens?: number;
116+
totalTokens?: number;
117+
cacheCreationInputTokens?: number;
118+
cacheReadInputTokens?: number;
119+
}
120+
121+
/**
122+
* Ends a streaming span by setting all accumulated response attributes and ending the span.
123+
* Shared across OpenAI, Anthropic, and Google GenAI streaming implementations.
124+
*/
125+
export function endStreamSpan(span: Span, state: StreamResponseState, recordOutputs: boolean): void {
126+
if (!span.isRecording()) {
127+
return;
128+
}
129+
130+
const attrs: Record<string, string | number | boolean> = {
131+
[GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true,
132+
};
133+
134+
if (state.responseId) attrs[GEN_AI_RESPONSE_ID_ATTRIBUTE] = state.responseId;
135+
if (state.responseModel) attrs[GEN_AI_RESPONSE_MODEL_ATTRIBUTE] = state.responseModel;
136+
137+
if (state.promptTokens !== undefined) attrs[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE] = state.promptTokens;
138+
if (state.completionTokens !== undefined) attrs[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE] = state.completionTokens;
139+
140+
// Use explicit total if provided (OpenAI, Google), otherwise compute from cache tokens (Anthropic)
141+
if (state.totalTokens !== undefined) {
142+
attrs[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE] = state.totalTokens;
143+
} else if (
144+
state.promptTokens !== undefined ||
145+
state.completionTokens !== undefined ||
146+
state.cacheCreationInputTokens !== undefined ||
147+
state.cacheReadInputTokens !== undefined
148+
) {
149+
attrs[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE] =
150+
(state.promptTokens ?? 0) +
151+
(state.completionTokens ?? 0) +
152+
(state.cacheCreationInputTokens ?? 0) +
153+
(state.cacheReadInputTokens ?? 0);
154+
}
155+
156+
if (state.finishReasons.length) {
157+
attrs[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE] = JSON.stringify(state.finishReasons);
158+
}
159+
if (recordOutputs && state.responseTexts.length) {
160+
attrs[GEN_AI_RESPONSE_TEXT_ATTRIBUTE] = state.responseTexts.join('');
161+
}
162+
if (recordOutputs && state.toolCalls.length) {
163+
attrs[GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE] = JSON.stringify(state.toolCalls);
164+
}
165+
166+
span.setAttributes(attrs);
167+
span.end();
168+
}
169+
102170
/**
103171
* Get the truncated JSON string for a string or array of strings.
104172
*

packages/core/src/tracing/anthropic-ai/streaming.ts

Lines changed: 3 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,7 @@
11
import { captureException } from '../../exports';
22
import { SPAN_STATUS_ERROR } from '../../tracing';
33
import type { Span } from '../../types-hoist/span';
4-
import {
5-
GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE,
6-
GEN_AI_RESPONSE_ID_ATTRIBUTE,
7-
GEN_AI_RESPONSE_MODEL_ATTRIBUTE,
8-
GEN_AI_RESPONSE_STREAMING_ATTRIBUTE,
9-
GEN_AI_RESPONSE_TEXT_ATTRIBUTE,
10-
GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE,
11-
} from '../ai/gen-ai-attributes';
12-
import { setTokenUsageAttributes } from '../ai/utils';
4+
import { endStreamSpan } from '../ai/utils';
135
import type { AnthropicAiStreamingEvent } from './types';
146
import { mapAnthropicErrorToStatusMessage } from './utils';
157

@@ -208,60 +200,6 @@ function processEvent(
208200
handleContentBlockStop(event, state);
209201
}
210202

211-
/**
212-
* Finalizes span attributes when stream processing completes
213-
*/
214-
function finalizeStreamSpan(state: StreamingState, span: Span, recordOutputs: boolean): void {
215-
if (!span.isRecording()) {
216-
return;
217-
}
218-
219-
// Set common response attributes if available
220-
if (state.responseId) {
221-
span.setAttributes({
222-
[GEN_AI_RESPONSE_ID_ATTRIBUTE]: state.responseId,
223-
});
224-
}
225-
if (state.responseModel) {
226-
span.setAttributes({
227-
[GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: state.responseModel,
228-
});
229-
}
230-
231-
setTokenUsageAttributes(
232-
span,
233-
state.promptTokens,
234-
state.completionTokens,
235-
state.cacheCreationInputTokens,
236-
state.cacheReadInputTokens,
237-
);
238-
239-
span.setAttributes({
240-
[GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true,
241-
});
242-
243-
if (state.finishReasons.length > 0) {
244-
span.setAttributes({
245-
[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons),
246-
});
247-
}
248-
249-
if (recordOutputs && state.responseTexts.length > 0) {
250-
span.setAttributes({
251-
[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''),
252-
});
253-
}
254-
255-
// Set tool calls if any were captured
256-
if (recordOutputs && state.toolCalls.length > 0) {
257-
span.setAttributes({
258-
[GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE]: JSON.stringify(state.toolCalls),
259-
});
260-
}
261-
262-
span.end();
263-
}
264-
265203
/**
266204
* Instruments an async iterable stream of Anthropic events, updates the span with
267205
* streaming attributes and (optionally) the aggregated output text, and yields
@@ -291,50 +229,7 @@ export async function* instrumentAsyncIterableStream(
291229
yield event;
292230
}
293231
} finally {
294-
// Set common response attributes if available
295-
if (state.responseId) {
296-
span.setAttributes({
297-
[GEN_AI_RESPONSE_ID_ATTRIBUTE]: state.responseId,
298-
});
299-
}
300-
if (state.responseModel) {
301-
span.setAttributes({
302-
[GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: state.responseModel,
303-
});
304-
}
305-
306-
setTokenUsageAttributes(
307-
span,
308-
state.promptTokens,
309-
state.completionTokens,
310-
state.cacheCreationInputTokens,
311-
state.cacheReadInputTokens,
312-
);
313-
314-
span.setAttributes({
315-
[GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true,
316-
});
317-
318-
if (state.finishReasons.length > 0) {
319-
span.setAttributes({
320-
[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons),
321-
});
322-
}
323-
324-
if (recordOutputs && state.responseTexts.length > 0) {
325-
span.setAttributes({
326-
[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''),
327-
});
328-
}
329-
330-
// Set tool calls if any were captured
331-
if (recordOutputs && state.toolCalls.length > 0) {
332-
span.setAttributes({
333-
[GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE]: JSON.stringify(state.toolCalls),
334-
});
335-
}
336-
337-
span.end();
232+
endStreamSpan(span, state, recordOutputs);
338233
}
339234
}
340235

@@ -366,7 +261,7 @@ export function instrumentMessageStream<R extends { on: (...args: unknown[]) =>
366261
// The event fired when a message is done being streamed by the API. Corresponds to the message_stop SSE event.
367262
// @see https://github.com/anthropics/anthropic-sdk-typescript/blob/d3be31f5a4e6ebb4c0a2f65dbb8f381ae73a9166/helpers.md?plain=1#L42-L44
368263
stream.on('message', () => {
369-
finalizeStreamSpan(state, span, recordOutputs);
264+
endStreamSpan(span, state, recordOutputs);
370265
});
371266

372267
stream.on('error', (error: unknown) => {

packages/core/src/tracing/google-genai/streaming.ts

Lines changed: 3 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,7 @@
11
import { captureException } from '../../exports';
22
import { SPAN_STATUS_ERROR } from '../../tracing';
3-
import type { Span, SpanAttributeValue } from '../../types-hoist/span';
4-
import {
5-
GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE,
6-
GEN_AI_RESPONSE_ID_ATTRIBUTE,
7-
GEN_AI_RESPONSE_MODEL_ATTRIBUTE,
8-
GEN_AI_RESPONSE_STREAMING_ATTRIBUTE,
9-
GEN_AI_RESPONSE_TEXT_ATTRIBUTE,
10-
GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE,
11-
GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE,
12-
GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE,
13-
GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE,
14-
} from '../ai/gen-ai-attributes';
3+
import type { Span } from '../../types-hoist/span';
4+
import { endStreamSpan } from '../ai/utils';
155
import type { GoogleGenAIResponse } from './types';
166

177
/**
@@ -137,27 +127,6 @@ export async function* instrumentStream(
137127
yield chunk;
138128
}
139129
} finally {
140-
const attrs: Record<string, SpanAttributeValue> = {
141-
[GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true,
142-
};
143-
144-
if (state.responseId) attrs[GEN_AI_RESPONSE_ID_ATTRIBUTE] = state.responseId;
145-
if (state.responseModel) attrs[GEN_AI_RESPONSE_MODEL_ATTRIBUTE] = state.responseModel;
146-
if (state.promptTokens !== undefined) attrs[GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE] = state.promptTokens;
147-
if (state.completionTokens !== undefined) attrs[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE] = state.completionTokens;
148-
if (state.totalTokens !== undefined) attrs[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE] = state.totalTokens;
149-
150-
if (state.finishReasons.length) {
151-
attrs[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE] = JSON.stringify(state.finishReasons);
152-
}
153-
if (recordOutputs && state.responseTexts.length) {
154-
attrs[GEN_AI_RESPONSE_TEXT_ATTRIBUTE] = state.responseTexts.join('');
155-
}
156-
if (recordOutputs && state.toolCalls.length) {
157-
attrs[GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE] = JSON.stringify(state.toolCalls);
158-
}
159-
160-
span.setAttributes(attrs);
161-
span.end();
130+
endStreamSpan(span, state, recordOutputs);
162131
}
163132
}

packages/core/src/tracing/openai/streaming.ts

Lines changed: 4 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
import { captureException } from '../../exports';
22
import { SPAN_STATUS_ERROR } from '../../tracing';
33
import type { Span } from '../../types-hoist/span';
4-
import {
5-
GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE,
6-
GEN_AI_RESPONSE_STREAMING_ATTRIBUTE,
7-
GEN_AI_RESPONSE_TEXT_ATTRIBUTE,
8-
GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE,
9-
} from '../ai/gen-ai-attributes';
4+
import { endStreamSpan } from '../ai/utils';
105
import { RESPONSE_EVENT_TYPES } from './constants';
116
import type {
127
ChatCompletionChunk,
@@ -15,12 +10,7 @@ import type {
1510
ResponseFunctionCall,
1611
ResponseStreamingEvent,
1712
} from './types';
18-
import {
19-
isChatCompletionChunk,
20-
isResponsesApiStreamEvent,
21-
setCommonResponseAttributes,
22-
setTokenUsageAttributes,
23-
} from './utils';
13+
import { isChatCompletionChunk, isResponsesApiStreamEvent } from './utils';
2414

2515
/**
2616
* State object used to accumulate information from a stream of OpenAI events/chunks.
@@ -240,35 +230,7 @@ export async function* instrumentStream<T>(
240230
yield event;
241231
}
242232
} finally {
243-
setCommonResponseAttributes(span, state.responseId, state.responseModel);
244-
setTokenUsageAttributes(span, state.promptTokens, state.completionTokens, state.totalTokens);
245-
246-
span.setAttributes({
247-
[GEN_AI_RESPONSE_STREAMING_ATTRIBUTE]: true,
248-
});
249-
250-
if (state.finishReasons.length) {
251-
span.setAttributes({
252-
[GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons),
253-
});
254-
}
255-
256-
if (recordOutputs && state.responseTexts.length) {
257-
span.setAttributes({
258-
[GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''),
259-
});
260-
}
261-
262-
// Set tool calls attribute if any were accumulated
263-
const chatCompletionToolCallsArray = Object.values(state.chatCompletionToolCalls);
264-
const allToolCalls = [...chatCompletionToolCallsArray, ...state.responsesApiToolCalls];
265-
266-
if (allToolCalls.length > 0) {
267-
span.setAttributes({
268-
[GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE]: JSON.stringify(allToolCalls),
269-
});
270-
}
271-
272-
span.end();
233+
const allToolCalls = [...Object.values(state.chatCompletionToolCalls), ...state.responsesApiToolCalls];
234+
endStreamSpan(span, { ...state, toolCalls: allToolCalls }, recordOutputs);
273235
}
274236
}

0 commit comments

Comments
 (0)