Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 88 additions & 1 deletion packages/otel/src/span-processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ function createTestSpan(opts: {
instrumentationScopeName?: string;
name?: string;
initialAttributes?: Record<string, unknown>;
startTime?: [number, number];
}): TestSpan {
const attributes: Record<string, unknown> = {
...(opts.initialAttributes ?? {}),
Expand Down Expand Up @@ -68,7 +69,7 @@ function createTestSpan(opts: {
},
// Stubbed ReadableSpan surface used by the export pipeline.
duration: [0, 0],
startTime: [0, 0],
startTime: opts.startTime ?? [0, 0],
endTime: [0, 0],
kind: 0,
status: { code: 0 },
Expand Down Expand Up @@ -367,6 +368,92 @@ describe("LangfuseSpanProcessor app-root marking", () => {
});
});

describe("Vercel AI SDK TTFT bridge", () => {
let processor: LangfuseSpanProcessor;

beforeEach(() => {
spanIdCounter = 0;
processor = new LangfuseSpanProcessor({
exporter: noopExporter,
shouldExportSpan: () => true,
});
});

const TTFT_ATTR =
LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME;

it("sets completion_start_time from ai.response.msToFirstChunk (streamText)", async () => {
const span = createTestSpan({
traceId: TRACE_ID,
instrumentationScopeName: "ai",
initialAttributes: { "ai.response.msToFirstChunk": 250 },
startTime: [1, 0], // 1000ms since epoch
});

processor.onEnd(span);
await processor.forceFlush();

expect(span.attributes[TTFT_ATTR]).toBe(JSON.stringify(new Date(1250)));
});

it("sets completion_start_time from ai.stream.msToFirstChunk (streamObject)", async () => {
const span = createTestSpan({
traceId: TRACE_ID,
instrumentationScopeName: "ai",
initialAttributes: { "ai.stream.msToFirstChunk": 400 },
startTime: [2, 0],
});

processor.onEnd(span);
await processor.forceFlush();

expect(span.attributes[TTFT_ATTR]).toBe(JSON.stringify(new Date(2400)));
});

it("does nothing when neither msToFirstChunk attribute is present", async () => {
const span = createTestSpan({
traceId: TRACE_ID,
instrumentationScopeName: "ai",
});

processor.onEnd(span);
await processor.forceFlush();

expect(span.attributes[TTFT_ATTR]).toBeUndefined();
});

it("does nothing for spans outside the ai instrumentation scope", async () => {
const span = createTestSpan({
traceId: TRACE_ID,
instrumentationScopeName: "unknown.instrumentation",
initialAttributes: { "ai.response.msToFirstChunk": 250 },
});

processor.onEnd(span);
await processor.forceFlush();

expect(span.attributes[TTFT_ATTR]).toBeUndefined();
});

it("does not overwrite a pre-existing completion_start_time", async () => {
const userSetValue = JSON.stringify(new Date(9999));
const span = createTestSpan({
traceId: TRACE_ID,
instrumentationScopeName: "ai",
initialAttributes: {
"ai.response.msToFirstChunk": 250,
[TTFT_ATTR]: userSetValue,
},
startTime: [1, 0],
});

processor.onEnd(span);
await processor.forceFlush();

expect(span.attributes[TTFT_ATTR]).toBe(userSetValue);
});
});

describe("propagation: internal app-root baggage", () => {
it("does not surface the internal trace-id baggage as user metadata", () => {
const ctx = contextWithBaggageClaim(TRACE_ID);
Expand Down
17 changes: 17 additions & 0 deletions packages/otel/src/span-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,23 @@ export class LangfuseSpanProcessor implements SpanProcessor {
return;
}

if (
span.instrumentationScope.name === "ai" &&
!(
LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME in
span.attributes
)
) {
const ms =
span.attributes["ai.response.msToFirstChunk"] ??
span.attributes["ai.stream.msToFirstChunk"];
if (typeof ms === "number") {
span.attributes[
LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME
] = JSON.stringify(new Date(hrTimeToMilliseconds(span.startTime) + ms));
}
}

await this.applyMaskInPlace(span);
await this.mediaService.process(span);

Expand Down