From f8c2fd111f636884fb702066f2c14e5013468a65 Mon Sep 17 00:00:00 2001 From: Ryan Yue Date: Sat, 23 May 2026 14:25:15 -0700 Subject: [PATCH] fix(otel): bridge Vercel AI SDK TTFT to completion_start_time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `LangfuseSpanProcessor` did not convert the AI SDK's `ai.response.msToFirstChunk` (streamText) / `ai.stream.msToFirstChunk` (streamObject) attributes into `langfuse.observation.completion_start_time`, so generation observations land in Langfuse with TTFT blank. `langfuse-vercel` handled this in #379 and #401. Port the same arithmetic to `LangfuseSpanProcessor` — old exporter set a property on the ingestion event, new processor sets the attribute on the span before OTLP export. A user-set `completion_start_time` is preserved. --- packages/otel/src/span-processor.test.ts | 89 +++++++++++++++++++++++- packages/otel/src/span-processor.ts | 17 +++++ 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/packages/otel/src/span-processor.test.ts b/packages/otel/src/span-processor.test.ts index 03913990..9018f662 100644 --- a/packages/otel/src/span-processor.test.ts +++ b/packages/otel/src/span-processor.test.ts @@ -36,6 +36,7 @@ function createTestSpan(opts: { instrumentationScopeName?: string; name?: string; initialAttributes?: Record; + startTime?: [number, number]; }): TestSpan { const attributes: Record = { ...(opts.initialAttributes ?? {}), @@ -68,7 +69,7 @@ function createTestSpan(opts: { }, // Stubbed ReadableSpan surface used by the export pipeline. duration: [0, 0], - startTime: [0, 0], + startTime: opts.startTime ?? [0, 0], endTime: [0, 0], kind: 0, status: { code: 0 }, @@ -367,6 +368,92 @@ describe("LangfuseSpanProcessor app-root marking", () => { }); }); +describe("Vercel AI SDK TTFT bridge", () => { + let processor: LangfuseSpanProcessor; + + beforeEach(() => { + spanIdCounter = 0; + processor = new LangfuseSpanProcessor({ + exporter: noopExporter, + shouldExportSpan: () => true, + }); + }); + + const TTFT_ATTR = + LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME; + + it("sets completion_start_time from ai.response.msToFirstChunk (streamText)", async () => { + const span = createTestSpan({ + traceId: TRACE_ID, + instrumentationScopeName: "ai", + initialAttributes: { "ai.response.msToFirstChunk": 250 }, + startTime: [1, 0], // 1000ms since epoch + }); + + processor.onEnd(span); + await processor.forceFlush(); + + expect(span.attributes[TTFT_ATTR]).toBe(JSON.stringify(new Date(1250))); + }); + + it("sets completion_start_time from ai.stream.msToFirstChunk (streamObject)", async () => { + const span = createTestSpan({ + traceId: TRACE_ID, + instrumentationScopeName: "ai", + initialAttributes: { "ai.stream.msToFirstChunk": 400 }, + startTime: [2, 0], + }); + + processor.onEnd(span); + await processor.forceFlush(); + + expect(span.attributes[TTFT_ATTR]).toBe(JSON.stringify(new Date(2400))); + }); + + it("does nothing when neither msToFirstChunk attribute is present", async () => { + const span = createTestSpan({ + traceId: TRACE_ID, + instrumentationScopeName: "ai", + }); + + processor.onEnd(span); + await processor.forceFlush(); + + expect(span.attributes[TTFT_ATTR]).toBeUndefined(); + }); + + it("does nothing for spans outside the ai instrumentation scope", async () => { + const span = createTestSpan({ + traceId: TRACE_ID, + instrumentationScopeName: "unknown.instrumentation", + initialAttributes: { "ai.response.msToFirstChunk": 250 }, + }); + + processor.onEnd(span); + await processor.forceFlush(); + + expect(span.attributes[TTFT_ATTR]).toBeUndefined(); + }); + + it("does not overwrite a pre-existing completion_start_time", async () => { + const userSetValue = JSON.stringify(new Date(9999)); + const span = createTestSpan({ + traceId: TRACE_ID, + instrumentationScopeName: "ai", + initialAttributes: { + "ai.response.msToFirstChunk": 250, + [TTFT_ATTR]: userSetValue, + }, + startTime: [1, 0], + }); + + processor.onEnd(span); + await processor.forceFlush(); + + expect(span.attributes[TTFT_ATTR]).toBe(userSetValue); + }); +}); + describe("propagation: internal app-root baggage", () => { it("does not surface the internal trace-id baggage as user metadata", () => { const ctx = contextWithBaggageClaim(TRACE_ID); diff --git a/packages/otel/src/span-processor.ts b/packages/otel/src/span-processor.ts index 7bf272d9..54e85ac9 100644 --- a/packages/otel/src/span-processor.ts +++ b/packages/otel/src/span-processor.ts @@ -432,6 +432,23 @@ export class LangfuseSpanProcessor implements SpanProcessor { return; } + if ( + span.instrumentationScope.name === "ai" && + !( + LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME in + span.attributes + ) + ) { + const ms = + span.attributes["ai.response.msToFirstChunk"] ?? + span.attributes["ai.stream.msToFirstChunk"]; + if (typeof ms === "number") { + span.attributes[ + LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME + ] = JSON.stringify(new Date(hrTimeToMilliseconds(span.startTime) + ms)); + } + } + await this.applyMaskInPlace(span); await this.mediaService.process(span);