diff --git a/packages/otel/src/span-processor.test.ts b/packages/otel/src/span-processor.test.ts index 03913990..9018f662 100644 --- a/packages/otel/src/span-processor.test.ts +++ b/packages/otel/src/span-processor.test.ts @@ -36,6 +36,7 @@ function createTestSpan(opts: { instrumentationScopeName?: string; name?: string; initialAttributes?: Record; + startTime?: [number, number]; }): TestSpan { const attributes: Record = { ...(opts.initialAttributes ?? {}), @@ -68,7 +69,7 @@ function createTestSpan(opts: { }, // Stubbed ReadableSpan surface used by the export pipeline. duration: [0, 0], - startTime: [0, 0], + startTime: opts.startTime ?? [0, 0], endTime: [0, 0], kind: 0, status: { code: 0 }, @@ -367,6 +368,92 @@ describe("LangfuseSpanProcessor app-root marking", () => { }); }); +describe("Vercel AI SDK TTFT bridge", () => { + let processor: LangfuseSpanProcessor; + + beforeEach(() => { + spanIdCounter = 0; + processor = new LangfuseSpanProcessor({ + exporter: noopExporter, + shouldExportSpan: () => true, + }); + }); + + const TTFT_ATTR = + LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME; + + it("sets completion_start_time from ai.response.msToFirstChunk (streamText)", async () => { + const span = createTestSpan({ + traceId: TRACE_ID, + instrumentationScopeName: "ai", + initialAttributes: { "ai.response.msToFirstChunk": 250 }, + startTime: [1, 0], // 1000ms since epoch + }); + + processor.onEnd(span); + await processor.forceFlush(); + + expect(span.attributes[TTFT_ATTR]).toBe(JSON.stringify(new Date(1250))); + }); + + it("sets completion_start_time from ai.stream.msToFirstChunk (streamObject)", async () => { + const span = createTestSpan({ + traceId: TRACE_ID, + instrumentationScopeName: "ai", + initialAttributes: { "ai.stream.msToFirstChunk": 400 }, + startTime: [2, 0], + }); + + processor.onEnd(span); + await processor.forceFlush(); + + expect(span.attributes[TTFT_ATTR]).toBe(JSON.stringify(new Date(2400))); + }); + + it("does nothing when neither msToFirstChunk attribute is present", async () => { + const span = createTestSpan({ + traceId: TRACE_ID, + instrumentationScopeName: "ai", + }); + + processor.onEnd(span); + await processor.forceFlush(); + + expect(span.attributes[TTFT_ATTR]).toBeUndefined(); + }); + + it("does nothing for spans outside the ai instrumentation scope", async () => { + const span = createTestSpan({ + traceId: TRACE_ID, + instrumentationScopeName: "unknown.instrumentation", + initialAttributes: { "ai.response.msToFirstChunk": 250 }, + }); + + processor.onEnd(span); + await processor.forceFlush(); + + expect(span.attributes[TTFT_ATTR]).toBeUndefined(); + }); + + it("does not overwrite a pre-existing completion_start_time", async () => { + const userSetValue = JSON.stringify(new Date(9999)); + const span = createTestSpan({ + traceId: TRACE_ID, + instrumentationScopeName: "ai", + initialAttributes: { + "ai.response.msToFirstChunk": 250, + [TTFT_ATTR]: userSetValue, + }, + startTime: [1, 0], + }); + + processor.onEnd(span); + await processor.forceFlush(); + + expect(span.attributes[TTFT_ATTR]).toBe(userSetValue); + }); +}); + describe("propagation: internal app-root baggage", () => { it("does not surface the internal trace-id baggage as user metadata", () => { const ctx = contextWithBaggageClaim(TRACE_ID); diff --git a/packages/otel/src/span-processor.ts b/packages/otel/src/span-processor.ts index 7bf272d9..54e85ac9 100644 --- a/packages/otel/src/span-processor.ts +++ b/packages/otel/src/span-processor.ts @@ -432,6 +432,23 @@ export class LangfuseSpanProcessor implements SpanProcessor { return; } + if ( + span.instrumentationScope.name === "ai" && + !( + LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME in + span.attributes + ) + ) { + const ms = + span.attributes["ai.response.msToFirstChunk"] ?? + span.attributes["ai.stream.msToFirstChunk"]; + if (typeof ms === "number") { + span.attributes[ + LangfuseOtelSpanAttributes.OBSERVATION_COMPLETION_START_TIME + ] = JSON.stringify(new Date(hrTimeToMilliseconds(span.startTime) + ms)); + } + } + await this.applyMaskInPlace(span); await this.mediaService.process(span);