diff --git a/src/e2ee/E2eeManager.ts b/src/e2ee/E2eeManager.ts index 7bc0d4d71d..429d6f44a0 100644 --- a/src/e2ee/E2eeManager.ts +++ b/src/e2ee/E2eeManager.ts @@ -2,6 +2,7 @@ import { Encryption_Type, TrackInfo } from '@livekit/protocol'; import { EventEmitter } from 'events'; import type TypedEventEmitter from 'typed-emitter'; import log, { LogLevel, workerLogger } from '../logger'; +import { type DiagnosticEntry } from '../room/DiagnosticsBuffer'; import type RTCEngine from '../room/RTCEngine'; import type Room from '../room/Room'; import { ConnectionState } from '../room/Room'; @@ -18,6 +19,7 @@ import { type E2EEManagerCallbacks, EncryptionEvent, KeyProviderEvent } from './ import type { DecryptDataRequestMessage, DecryptDataResponseMessage, + DiagnosticsRequestMessage, E2EEManagerOptions, E2EEWorkerMessage, EnableMessage, @@ -50,6 +52,11 @@ export interface BaseE2EEManager { participantIdentity: string, keyIndex: number, ): Promise; + /** + * Snapshot the worker-side diagnostics ring buffer. Returns an empty + * array if the worker does not respond within `timeoutMs`. + */ + fetchDiagnosticsSnapshot(timeoutMs?: number): Promise; on(event: E, listener: E2EEManagerCallbacks[E]): this; } @@ -74,6 +81,8 @@ export class E2EEManager private encryptDataRequests: Map> = new Map(); + private diagnosticsRequests: Map> = new Map(); + private dataChannelEncryptionEnabled: boolean; constructor(options: E2EEManagerOptions, dcEncryptionEnabled: boolean) { @@ -221,6 +230,12 @@ export class E2EEManager encryptFuture.resolve(data as EncryptDataResponseMessage['data']); } break; + case 'diagnosticsResponse': + { + const future = this.diagnosticsRequests.get(data.uuid); + future?.resolve?.(data.entries as DiagnosticEntry[]); + } + break; default: break; } @@ -336,6 +351,46 @@ export class E2EEManager return future!.promise!; } + /** + * Ask the worker for a snapshot of its diagnostics ring buffer. Resolves + * to an empty array (and logs a warning) if the worker does not respond + * within `timeoutMs`, so callers can safely merge the result without + * blocking on a dead worker. + */ + async fetchDiagnosticsSnapshot(timeoutMs: number = 500): Promise { + if (!this.worker) { + return []; + } + const uuid = crypto.randomUUID(); + const future = new Future(); + future.onFinally = () => { + this.diagnosticsRequests.delete(uuid); + }; + this.diagnosticsRequests.set(uuid, future); + const msg: DiagnosticsRequestMessage = { + kind: 'diagnosticsRequest', + data: { uuid }, + }; + this.worker.postMessage(msg); + + const timeout = new Promise((resolve) => { + setTimeout(() => { + if (this.diagnosticsRequests.has(uuid)) { + log.warn('e2ee worker did not respond to diagnostics request in time', { timeoutMs }); + this.diagnosticsRequests.delete(uuid); + resolve([]); + } + }, timeoutMs); + }); + + try { + return await Promise.race([future.promise, timeout]); + } catch (err) { + log.warn('failed to fetch e2ee worker diagnostics', { error: err }); + return []; + } + } + handleEncryptedData( payload: Uint8Array, iv: Uint8Array, diff --git a/src/e2ee/types.ts b/src/e2ee/types.ts index d7caa48e3e..4fc3a221e8 100644 --- a/src/e2ee/types.ts +++ b/src/e2ee/types.ts @@ -105,6 +105,28 @@ export interface EnableMessage extends BaseMessage { }; } +export interface DiagnosticsRequestMessage extends BaseMessage { + kind: 'diagnosticsRequest'; + data: { + /** Correlates the response with the awaiting main-thread request. */ + uuid: string; + }; +} + +export interface DiagnosticsResponseMessage extends BaseMessage { + kind: 'diagnosticsResponse'; + data: { + uuid: string; + /** + * Snapshot of the worker-side diagnostics ring buffer, in chronological + * order. Typed as `unknown[]` here to keep the message type free of a + * cross-module dependency on `DiagnosticEntry`; callers on the main + * thread cast to `DiagnosticEntry[]`. + */ + entries: unknown[]; + }; +} + export interface InitAck extends BaseMessage { kind: 'initAck'; data: { @@ -166,7 +188,9 @@ export type E2EEWorkerMessage = | DecryptDataRequestMessage | DecryptDataResponseMessage | EncryptDataRequestMessage - | EncryptDataResponseMessage; + | EncryptDataResponseMessage + | DiagnosticsRequestMessage + | DiagnosticsResponseMessage; export type KeySet = { material: CryptoKey; encryptionKey: CryptoKey }; diff --git a/src/e2ee/worker/FrameCryptor.ts b/src/e2ee/worker/FrameCryptor.ts index 1958f2bec1..9b0cb32f93 100644 --- a/src/e2ee/worker/FrameCryptor.ts +++ b/src/e2ee/worker/FrameCryptor.ts @@ -239,9 +239,17 @@ export class FrameCryptor extends BaseFrameCryptor { .catch((e) => { if (e instanceof TypeError && e.message === 'Destination stream closed') { // this can happen when subscriptions happen in quick successions, but doesn't influence functionality - workerLogger.debug('destination stream closed'); + workerLogger.debug('destination stream closed', { ...this.logContext, trackId }); } else { - workerLogger.warn('transform error', { error: e, ...this.logContext }); + // pipeTo rejected — the transform pipeline for this track is torn + // down and no further frames will be (de)crypted until it is rebuilt. + // Log at error so the main thread / diagnostics buffer can surface it. + workerLogger.error('frame cryptor transform pipeline broke', { + ...this.logContext, + trackId, + operation, + error: e, + }); this.emit( CryptorEvent.Error, e instanceof CryptorError @@ -429,11 +437,23 @@ export class FrameCryptor extends BaseFrameCryptor { return controller.enqueue(encodedFrame); } catch (e: any) { - // TODO: surface this to the app. - workerLogger.error(e); + workerLogger.error('frame encryption failed', { + ...this.logContext, + error: e, + ssrc: encodedFrame.getMetadata().synchronizationSource, + }); + this.emitThrottledError( + e instanceof CryptorError + ? e + : new CryptorError( + `frame encryption failed: ${e?.message ?? e}`, + undefined, + this.participantIdentity, + ), + ); } } else { - workerLogger.debug('failed to encrypt, emitting error', this.logContext); + workerLogger.warn('failed to encrypt, emitting error', this.logContext); this.emitThrottledError( new CryptorError( `encryption key missing for encoding`, @@ -479,7 +499,13 @@ export class FrameCryptor extends BaseFrameCryptor { const keyIndex = data[encodedFrame.data.byteLength - 1]; if (this.keys.hasInvalidKeyAtIndex(keyIndex)) { - // drop frame + // drop frame — key at this index was marked invalid after too many + // consecutive decryption failures. Log at debug so the downstream + // silence has a traceable cause. + workerLogger.debug('dropping frame, key index is marked invalid', { + ...this.logContext, + keyIndex, + }); return; } @@ -490,6 +516,14 @@ export class FrameCryptor extends BaseFrameCryptor { if (decodedFrame) { return controller.enqueue(decodedFrame); } + // `decryptFrame` resolved without a frame — the only path that hits + // this is a ratchet recursion that unwound without producing output. + // Previously this was a silent drop which made decryption failures + // invisible to the main thread; surface it explicitly here. + workerLogger.warn('decryption yielded no frame, dropping', { + ...this.logContext, + keyIndex, + }); } catch (error) { if (error instanceof CryptorError && error.reason === CryptorErrorReason.InvalidKey) { // emit an error if the key handler thinks we have a valid key @@ -498,7 +532,7 @@ export class FrameCryptor extends BaseFrameCryptor { this.keys.decryptionFailure(keyIndex); } } else { - workerLogger.warn('decoding frame failed', { error }); + workerLogger.warn('decoding frame failed', { ...this.logContext, keyIndex, error }); } } } else { @@ -627,7 +661,11 @@ export class FrameCryptor extends BaseFrameCryptor { * as the key has not been updated on the keyHandler instance */ - workerLogger.warn('maximum ratchet attempts exceeded'); + workerLogger.warn('maximum ratchet attempts exceeded, giving up on frame', { + ...this.logContext, + keyIndex, + attempts: ratchetOpts.ratchetCount, + }); throw new CryptorError( `valid key missing for participant ${this.participantIdentity}`, CryptorErrorReason.InvalidKey, diff --git a/src/e2ee/worker/e2ee.worker.ts b/src/e2ee/worker/e2ee.worker.ts index f9c642576c..2ccb796207 100644 --- a/src/e2ee/worker/e2ee.worker.ts +++ b/src/e2ee/worker/e2ee.worker.ts @@ -1,4 +1,5 @@ -import { workerLogger } from '../../logger'; +import { setLogExtension, workerLogger } from '../../logger'; +import { DiagnosticsBuffer } from '../../room/DiagnosticsBuffer'; import type { VideoCodec } from '../../room/track/options'; import { AsyncQueue } from '../../utils/AsyncQueue'; import { KEY_PROVIDER_DEFAULTS } from '../constants'; @@ -6,6 +7,7 @@ import { CryptorErrorReason } from '../errors'; import { CryptorEvent, KeyHandlerEvent } from '../events'; import type { DecryptDataResponseMessage, + DiagnosticsResponseMessage, E2EEWorkerMessage, EncryptDataResponseMessage, ErrorMessage, @@ -37,6 +39,49 @@ let rtpMap: Map = new Map(); workerLogger.setDefaultLevel('info'); +/** + * Worker-side ring buffer of recent log entries. Populated via a + * `setLogExtension` sink on `workerLogger` and shipped to the main thread + * on demand when a `diagnosticsRequest` arrives. Avoids streaming every + * log line across the postMessage boundary. + */ +const workerDiagnostics = new DiagnosticsBuffer(); +let workerDiagnosticsSinkInstalled = false; + +function installWorkerDiagnosticsSink() { + if (workerDiagnosticsSinkInstalled) return; + workerDiagnosticsSinkInstalled = true; + setLogExtension((level, message, context) => { + workerDiagnostics.push({ + type: 'log', + timestamp: Date.now(), + level, + message, + context: context ? sanitizeForPostMessage(context) : undefined, + }); + }, workerLogger); +} + +/** + * Strip fields that aren't structured-clone-safe (functions, DOM nodes, + * etc.) and coerce Error instances into plain objects so that the buffer + * snapshot can be transferred to the main thread via `postMessage` without + * throwing when a diagnostics request arrives. + */ +function sanitizeForPostMessage(ctx: object): object { + const out: Record = {}; + for (const [key, value] of Object.entries(ctx)) { + if (value instanceof Error) { + out[key] = { name: value.name, message: value.message, stack: value.stack }; + } else if (typeof value === 'function') { + // skip + } else { + out[key] = value; + } + } + return out; +} + onmessage = (ev) => { messageQueue.run(async () => { const { kind, data }: E2EEWorkerMessage = ev.data; @@ -44,6 +89,7 @@ onmessage = (ev) => { switch (kind) { case 'init': workerLogger.setLevel(data.loglevel); + installWorkerDiagnosticsSink(); workerLogger.info('worker initialized'); keyProviderOptions = data.keyProviderOptions; useSharedKey = !!data.keyProviderOptions.sharedKey; @@ -94,11 +140,6 @@ onmessage = (ev) => { data.payload, getParticipantKeyHandler(data.participantIdentity), ); - console.log('encrypted payload', { - original: data.payload, - encrypted: encryptedPayload, - iv, - }); postMessage({ kind: 'encryptDataResponse', data: { @@ -181,6 +222,12 @@ onmessage = (ev) => { case 'setSifTrailer': handleSifTrailer(data.trailer); break; + case 'diagnosticsRequest': + postMessage({ + kind: 'diagnosticsResponse', + data: { uuid: data.uuid, entries: workerDiagnostics.snapshot() }, + } satisfies DiagnosticsResponseMessage); + break; default: break; } diff --git a/src/logger.test.ts b/src/logger.test.ts index e45bbecc03..9de9044e41 100644 --- a/src/logger.test.ts +++ b/src/logger.test.ts @@ -12,33 +12,12 @@ import { } from './logger'; describe('formatDisplayContext', () => { - it('returns an empty string for undefined or empty input', () => { + // `DISPLAY_KEYS` is intentionally empty by default — the formatter is a + // no-op until keys are opted into the bracketed prefix. + it('returns an empty string for every input until display keys are configured', () => { expect(formatDisplayContext(undefined)).toBe(''); expect(formatDisplayContext({})).toBe(''); - }); - - it('renders only recognised display keys, skipping undefined/null/empty', () => { - const out = formatDisplayContext({ - room: 'foo', - roomID: undefined, - participant: 'alice', - participantID: '', - trackID: null, - source: 'camera', - irrelevant: 'should-not-show', - }); - expect(out).toBe('[room=foo participant=alice source=camera]'); - }); - - it('preserves the canonical key ordering regardless of input order', () => { - const a = formatDisplayContext({ trackID: 'T', room: 'R', participant: 'P' }); - const b = formatDisplayContext({ participant: 'P', trackID: 'T', room: 'R' }); - expect(a).toBe(b); - expect(a).toBe('[room=R participant=P trackID=T]'); - }); - - it('handles non-string values by stringifying them', () => { - expect(formatDisplayContext({ reconnectAttempt: 3 })).toBe('[reconnectAttempt=3]'); + expect(formatDisplayContext({ room: 'foo', participant: 'alice' })).toBe(''); }); }); @@ -52,7 +31,7 @@ describe('getLogger with context provider', () => { setLogExtension(extension, base); }; - it('prepends a context prefix to the message and merges context for extensions', () => { + it('merges bound context with per-call extras for extensions', () => { const extension = vi.fn(); hookBase(LoggerNames.Default, extension); setLogLevel(LogLevel.debug, LoggerNames.Default); @@ -67,11 +46,11 @@ describe('getLogger with context provider', () => { expect(debugCalls).toHaveLength(1); const [level, msg, ctx] = debugCalls[0]; expect(level).toBe(LogLevel.debug); - expect(msg).toBe('[room=foo participant=alice] hello world'); + expect(msg).toBe('hello world'); expect(ctx).toEqual({ room: 'foo', participant: 'alice', extra: 1 }); }); - it('omits the prefix when the bound context has no display keys', () => { + it('passes bound context through when no extras are supplied', () => { const extension = vi.fn(); hookBase(LoggerNames.Room, extension); setLogLevel(LogLevel.info, LoggerNames.Room); @@ -82,7 +61,7 @@ describe('getLogger with context provider', () => { expect(extension).toHaveBeenCalledWith(LogLevel.info, 'plain', { irrelevant: 'x' }); }); - it('reflects dynamic changes to the bound context on every call', () => { + it('re-reads the bound context on every call', () => { const extension = vi.fn(); hookBase(LoggerNames.Engine, extension); setLogLevel(LogLevel.info, LoggerNames.Engine); @@ -95,8 +74,8 @@ describe('getLogger with context provider', () => { log.info('second'); const infos = extension.mock.calls.filter((c) => c[0] === LogLevel.info); - expect(infos[0][1]).toBe('[room=r1] first'); - expect(infos[1][1]).toBe('[room=r2 participant=bob] second'); + expect(infos[0][2]).toEqual({ room: 'r1' }); + expect(infos[1][2]).toEqual({ room: 'r2', participant: 'bob' }); }); it('returns an unwrapped logger when no context provider is supplied', () => { diff --git a/src/logger.ts b/src/logger.ts index ba17f62ccf..51266100d9 100644 --- a/src/logger.ts +++ b/src/logger.ts @@ -54,18 +54,7 @@ export default livekitLogger as StructuredLogger; * message prefix. Ordering here defines ordering in the rendered prefix. * Values that are `undefined` or empty strings are skipped. */ -const DISPLAY_KEYS = [ - 'room', - 'roomID', - 'participant', - 'participantID', - 'trackID', - 'source', - 'target', - 'transport', - 'reconnectAttempt', - 'region', -] as const; +const DISPLAY_KEYS = [] as const; /** * Render the subset of `ctx` listed in `DISPLAY_KEYS` as a bracketed prefix @@ -110,15 +99,13 @@ function wrapWithContext(base: StructuredLogger, ctxFn: ContextProvider): Struct // Resolve the underlying method on every call so that later // setLogExtension installations (which replace the base logger's // methods via loglevel's methodFactory) are picked up. - const wrap = - (method: LogMethod) => - (msg: string, extra?: object) => { - const ctx = ctxFn(); - const prefix = formatDisplayContext(ctx); - const finalMsg = prefix ? `${prefix} ${msg}` : msg; - const merged = ctx || extra ? { ...ctx, ...extra } : undefined; - base[method](finalMsg, merged); - }; + const wrap = (method: LogMethod) => (msg: string, extra?: object) => { + const ctx = ctxFn(); + const prefix = formatDisplayContext(ctx); + const finalMsg = prefix ? `${prefix} ${msg}` : msg; + const merged = ctx || extra ? { ...ctx, ...extra } : undefined; + base[method](finalMsg, merged); + }; const proxy = Object.create(base) as StructuredLogger; proxy.trace = wrap('trace'); diff --git a/src/room/Room.test.ts b/src/room/Room.test.ts index ea4ade5524..9c52354a80 100644 --- a/src/room/Room.test.ts +++ b/src/room/Room.test.ts @@ -32,14 +32,14 @@ describe('Active device switch', () => { }); describe('Room diagnostics', () => { - it('captures internal log entries into the ring buffer', () => { + it('captures internal log entries into the ring buffer', async () => { setLogLevel(LogLevel.debug, LoggerNames.Room); const room = new Room({ diagnostics: { size: 16 } }); // trigger an internal log at a known level (room as unknown as { log: { info: (m: string) => void } }).log.info( 'diagnostics roundtrip probe', ); - const entries = room.getRecentDiagnostics(); + const entries = await room.getRecentDiagnostics(); const probe = entries.find( (e): e is LogDiagnosticEntry => e.type === 'log' && (e as LogDiagnosticEntry).message.includes('diagnostics roundtrip probe'), @@ -49,12 +49,12 @@ describe('Room diagnostics', () => { setLogLevel(LogLevel.info); }); - it('returns an empty array when diagnostics are disabled', () => { + it('returns an empty array when diagnostics are disabled', async () => { const room = new Room({ diagnostics: false }); - expect(room.getRecentDiagnostics()).toEqual([]); + expect(await room.getRecentDiagnostics()).toEqual([]); }); - it('accepts custom entries via recordDiagnostic', () => { + it('accepts custom entries via recordDiagnostic', async () => { const room = new Room({ diagnostics: { size: 4 } }); room.recordDiagnostic({ type: 'log', @@ -62,7 +62,7 @@ describe('Room diagnostics', () => { level: LogLevel.warn, message: 'manual entry', }); - const entries = room.getRecentDiagnostics(); + const entries = await room.getRecentDiagnostics(); expect(entries.some((e) => (e as LogDiagnosticEntry).message === 'manual entry')).toBe(true); }); }); diff --git a/src/room/Room.ts b/src/room/Room.ts index 102145789e..e9df18d7bd 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -489,11 +489,24 @@ class Room extends (EventEmitter as new () => TypedEmitter) /** * Returns a chronological snapshot of recent internal events retained by * the diagnostics ring buffer. Currently contains log entries; additional - * kinds (e.g. WebRTC stats) may be added over time. Returns an empty - * array when diagnostics have been disabled via `options.diagnostics: false`. + * kinds (e.g. WebRTC stats) may be added over time. When e2ee is active, + * the worker-side buffer is fetched on demand and merged into the result. + * Returns an empty array when diagnostics have been disabled via + * `options.diagnostics: false`. */ - getRecentDiagnostics(): DiagnosticEntry[] { - return this.diagnosticsBuffer?.snapshot() ?? []; + async getRecentDiagnostics(): Promise { + if (!this.diagnosticsBuffer) { + return []; + } + const mainEntries = this.diagnosticsBuffer.snapshot(); + const workerEntries = (await this.e2eeManager?.fetchDiagnosticsSnapshot()) ?? []; + if (workerEntries.length === 0) { + return mainEntries; + } + // Merge by timestamp so the returned log reads as a single chronological + // stream across both threads. `sort` is stable in V8 / WebKit so equal + // timestamps preserve insertion order within each source. + return [...mainEntries, ...workerEntries].sort((a, b) => a.timestamp - b.timestamp); } /**