Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions agents/src/telemetry/traces.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { context as otelContext, trace } from '@opentelemetry/api';
import { InMemorySpanExporter, SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { setTracerProvider, tracer } from './traces.js';

/** Helper: extract parentSpanId across OTel SDK v1/v2 */
function parentSpanId(span: unknown): string | undefined {
return (
(span as { parentSpanId?: string }).parentSpanId ??
(span as { parentSpanContext?: { spanId: string } }).parentSpanContext?.spanId
);
}

describe('DynamicTracer', () => {
let exporter: InMemorySpanExporter;
let provider: NodeTracerProvider;

beforeEach(() => {
exporter = new InMemorySpanExporter();
provider = new NodeTracerProvider({
spanProcessors: [new SimpleSpanProcessor(exporter)],
});
provider.register();
setTracerProvider(provider);
});

afterEach(async () => {
await provider.shutdown();
otelContext.disable();
trace.disable();
});

it('inherits the active OTel context as parent when no explicit context is passed', async () => {
const outerTracer = provider.getTracer('test');

await outerTracer.startActiveSpan('outer', async (outer) => {
const child = tracer.startSpan({ name: 'child' });
child.end();
outer.end();
});

const spans = exporter.getFinishedSpans();
const outerSpan = spans.find((s) => s.name === 'outer');
const childSpan = spans.find((s) => s.name === 'child');

expect(outerSpan).toBeDefined();
expect(childSpan).toBeDefined();
expect(parentSpanId(childSpan)).toBe(outerSpan!.spanContext().spanId);
});
});

describe('register() set-once semantics', () => {
let userExporter: InMemorySpanExporter;
let userProvider: NodeTracerProvider;
let cloudExporter: InMemorySpanExporter;
let cloudProvider: NodeTracerProvider;

beforeEach(() => {
// Step 1: User registers their own provider (simulates NodeSDK.start())
userExporter = new InMemorySpanExporter();
userProvider = new NodeTracerProvider({
spanProcessors: [new SimpleSpanProcessor(userExporter)],
});
userProvider.register();

// Step 2: LiveKit cloud calls register() + setTracerProvider() (simulates setupCloudTracer)
cloudExporter = new InMemorySpanExporter();
cloudProvider = new NodeTracerProvider({
spanProcessors: [new SimpleSpanProcessor(cloudExporter)],
});
cloudProvider.register(); // should be a no-op since user already registered
setTracerProvider(cloudProvider); // sets LiveKit's internal DynamicTracer
});

afterEach(async () => {
await userProvider.shutdown();
await cloudProvider.shutdown();
otelContext.disable();
trace.disable();
});

it('second register() does not replace the global context manager', () => {
// Create a span via the global provider and verify context propagation still works
const globalTracer = trace.getTracer('test-global');
let contextWorks = false;

globalTracer.startActiveSpan('test', (span) => {
const active = trace.getSpan(otelContext.active());
contextWorks = active === span;
span.end();
});

expect(contextWorks).toBe(true);
});

it('spans from global tracer land in user exporter, not cloud exporter', () => {
const globalTracer = trace.getTracer('test-global');
globalTracer.startActiveSpan('global-span', (span) => {
span.end();
});

expect(userExporter.getFinishedSpans().map((s) => s.name)).toContain('global-span');
expect(cloudExporter.getFinishedSpans().map((s) => s.name)).not.toContain('global-span');
});

it('LiveKit DynamicTracer spans land in cloud exporter', () => {
const lkSpan = tracer.startSpan({ name: 'agent_session' });
lkSpan.end();

expect(cloudExporter.getFinishedSpans().map((s) => s.name)).toContain('agent_session');
});

it('LiveKit span inherits user parent context across providers', () => {
const userTracer = userProvider.getTracer('user-app');

userTracer.startActiveSpan('user-parent', (parent) => {
// LiveKit creates a child span via its DynamicTracer
const lkSpan = tracer.startSpan({ name: 'agent_session' });
lkSpan.end();
parent.end();
});

const userSpans = userExporter.getFinishedSpans();
const cloudSpans = cloudExporter.getFinishedSpans();

const userParent = userSpans.find((s) => s.name === 'user-parent')!;
const lkSession = cloudSpans.find((s) => s.name === 'agent_session')!;

expect(userParent).toBeDefined();
expect(lkSession).toBeDefined();

// Same trace ID — they're part of the same distributed trace
expect(lkSession.spanContext().traceId).toBe(userParent.spanContext().traceId);

// LK span is a child of the user's parent span
expect(parentSpanId(lkSession)).toBe(userParent.spanContext().spanId);
});
});
4 changes: 3 additions & 1 deletion agents/src/telemetry/traces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,10 @@ export async function setupCloudTracer(options: {
resource,
spanProcessors: [new MetadataSpanProcessor(metadata), new BatchSpanProcessor(spanExporter)],
});
// register() installs an AsyncLocalStorageContextManager (needed for span nesting)
// and sets the global tracer provider. Both use set-once semantics in the OTel API,
// so if the user already called NodeSDK.start(), these are safe no-ops.
tracerProvider.register();

setTracerProvider(tracerProvider);

// Initialize standalone Pino cloud exporter (no OTEL SDK dependency)
Expand Down
2 changes: 1 addition & 1 deletion agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ export class AgentActivity implements RecognitionHooks {
maxEndpointingDelay:
this.agent.turnHandling?.endpointing?.maxDelay ??
this.agentSession.sessionOptions.turnHandling.endpointing.maxDelay,
rootSpanContext: this.agentSession.rootSpanContext,
agentSession: this.agentSession,
sttModel: this.stt?.label,
sttProvider: this.getSttProvider(),
getLinkedParticipant: () => this.agentSession._roomIO?.linkedParticipant,
Expand Down
5 changes: 2 additions & 3 deletions agents/src/voice/agent_session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import type { AudioFrame, Room } from '@livekit/rtc-node';
import { ThrowsPromise } from '@livekit/throws-transformer/throws';
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
import type { Context, Span } from '@opentelemetry/api';
import { ROOT_CONTEXT, context as otelContext, trace } from '@opentelemetry/api';
import { context as otelContext, trace } from '@opentelemetry/api';
import { EventEmitter } from 'node:events';
import type { ReadableStream } from 'node:stream/web';
import type { z } from 'zod';
Expand Down Expand Up @@ -532,10 +532,9 @@ export class AgentSession<

this.sessionSpan = tracer.startSpan({
name: 'agent_session',
context: ROOT_CONTEXT,
});

this.rootSpanContext = trace.setSpan(ROOT_CONTEXT, this.sessionSpan);
this.rootSpanContext = trace.setSpan(otelContext.active(), this.sessionSpan);

await this._startImpl({
agent,
Expand Down
14 changes: 7 additions & 7 deletions agents/src/voice/audio_recognition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import { type SpeechEvent, SpeechEventType } from '../stt/stt.js';
import { traceTypes, tracer } from '../telemetry/index.js';
import { Task, cancelAndWait, delay, readStream, waitForAbort } from '../utils.js';
import { type VAD, type VADEvent, VADEventType } from '../vad.js';
import type { TurnDetectionMode } from './agent_session.js';
import type { AgentSession, TurnDetectionMode } from './agent_session.js';
import type { STTNode } from './io.js';
import { setParticipantSpanAttributes } from './utils.js';

Expand Down Expand Up @@ -142,8 +142,8 @@ export interface AudioRecognitionOptions {
minEndpointingDelay: number;
/** Maximum endpointing delay in milliseconds. */
maxEndpointingDelay: number;
/** Root span context for tracing. */
rootSpanContext?: Context;
/** Live reference to AgentSession — used to read rootSpanContext fresh on each span creation. */
agentSession?: AgentSession;
/** STT model name for tracing */
sttModel?: string;
/** STT provider name for tracing */
Expand Down Expand Up @@ -173,7 +173,7 @@ export class AudioRecognition {
private minEndpointingDelay: number;
private maxEndpointingDelay: number;
private lastLanguage?: LanguageCode;
private rootSpanContext?: Context;
private agentSession?: AgentSession;
private sttModel?: string;
private sttProvider?: string;
private getLinkedParticipant?: () => ParticipantLike | undefined;
Expand Down Expand Up @@ -227,7 +227,7 @@ export class AudioRecognition {
this.minEndpointingDelay = opts.minEndpointingDelay;
this.maxEndpointingDelay = opts.maxEndpointingDelay;
this.lastLanguage = undefined;
this.rootSpanContext = opts.rootSpanContext;
this.agentSession = opts.agentSession;
this.sttModel = opts.sttModel;
this.sttProvider = opts.sttProvider;
this.getLinkedParticipant = opts.getLinkedParticipant;
Expand Down Expand Up @@ -510,7 +510,7 @@ export class AudioRecognition {

this.userTurnSpan = tracer.startSpan({
name: 'user_turn',
context: this.rootSpanContext,
context: this.agentSession?.rootSpanContext,
startTime,
});

Expand All @@ -530,7 +530,7 @@ export class AudioRecognition {
}

private userTurnContext(span: Span): Context {
const base = this.rootSpanContext ?? ROOT_CONTEXT;
const base = this.agentSession?.rootSpanContext ?? ROOT_CONTEXT;
return trace.setSpan(base, span);
}

Expand Down