Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/e2ee/E2eeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Encryption_Type, TrackInfo } from '@livekit/protocol';
import { EventEmitter } from 'events';
import type TypedEventEmitter from 'typed-emitter';
import log, { LogLevel, workerLogger } from '../logger';
import { type DiagnosticEntry } from '../room/DiagnosticsBuffer';
import type RTCEngine from '../room/RTCEngine';
import type Room from '../room/Room';
import { ConnectionState } from '../room/Room';
Expand All @@ -18,6 +19,7 @@ import { type E2EEManagerCallbacks, EncryptionEvent, KeyProviderEvent } from './
import type {
DecryptDataRequestMessage,
DecryptDataResponseMessage,
DiagnosticsRequestMessage,
E2EEManagerOptions,
E2EEWorkerMessage,
EnableMessage,
Expand Down Expand Up @@ -50,6 +52,11 @@ export interface BaseE2EEManager {
participantIdentity: string,
keyIndex: number,
): Promise<DecryptDataResponseMessage['data']>;
/**
* Snapshot the worker-side diagnostics ring buffer. Returns an empty
* array if the worker does not respond within `timeoutMs`.
*/
fetchDiagnosticsSnapshot(timeoutMs?: number): Promise<DiagnosticEntry[]>;
on<E extends keyof E2EEManagerCallbacks>(event: E, listener: E2EEManagerCallbacks[E]): this;
}

Expand All @@ -74,6 +81,8 @@ export class E2EEManager
private encryptDataRequests: Map<string, Future<EncryptDataResponseMessage['data'], Error>> =
new Map();

private diagnosticsRequests: Map<string, Future<DiagnosticEntry[], Error>> = new Map();

private dataChannelEncryptionEnabled: boolean;

constructor(options: E2EEManagerOptions, dcEncryptionEnabled: boolean) {
Expand Down Expand Up @@ -221,6 +230,12 @@ export class E2EEManager
encryptFuture.resolve(data as EncryptDataResponseMessage['data']);
}
break;
case 'diagnosticsResponse':
{
const future = this.diagnosticsRequests.get(data.uuid);
future?.resolve?.(data.entries as DiagnosticEntry[]);
}
break;
default:
break;
}
Expand Down Expand Up @@ -336,6 +351,46 @@ export class E2EEManager
return future!.promise!;
}

/**
* Ask the worker for a snapshot of its diagnostics ring buffer. Resolves
* to an empty array (and logs a warning) if the worker does not respond
* within `timeoutMs`, so callers can safely merge the result without
* blocking on a dead worker.
*/
async fetchDiagnosticsSnapshot(timeoutMs: number = 500): Promise<DiagnosticEntry[]> {
if (!this.worker) {
return [];
}
const uuid = crypto.randomUUID();
const future = new Future<DiagnosticEntry[], Error>();
future.onFinally = () => {
this.diagnosticsRequests.delete(uuid);
};
this.diagnosticsRequests.set(uuid, future);
const msg: DiagnosticsRequestMessage = {
kind: 'diagnosticsRequest',
data: { uuid },
};
this.worker.postMessage(msg);

const timeout = new Promise<DiagnosticEntry[]>((resolve) => {
setTimeout(() => {
if (this.diagnosticsRequests.has(uuid)) {
log.warn('e2ee worker did not respond to diagnostics request in time', { timeoutMs });
this.diagnosticsRequests.delete(uuid);
resolve([]);
}
}, timeoutMs);
});

try {
return await Promise.race([future.promise, timeout]);
} catch (err) {
log.warn('failed to fetch e2ee worker diagnostics', { error: err });
return [];
}
}

handleEncryptedData(
payload: Uint8Array,
iv: Uint8Array,
Expand Down
26 changes: 25 additions & 1 deletion src/e2ee/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,28 @@ export interface EnableMessage extends BaseMessage {
};
}

export interface DiagnosticsRequestMessage extends BaseMessage {
kind: 'diagnosticsRequest';
data: {
/** Correlates the response with the awaiting main-thread request. */
uuid: string;
};
}

export interface DiagnosticsResponseMessage extends BaseMessage {
kind: 'diagnosticsResponse';
data: {
uuid: string;
/**
* Snapshot of the worker-side diagnostics ring buffer, in chronological
* order. Typed as `unknown[]` here to keep the message type free of a
* cross-module dependency on `DiagnosticEntry`; callers on the main
* thread cast to `DiagnosticEntry[]`.
*/
entries: unknown[];
};
}

export interface InitAck extends BaseMessage {
kind: 'initAck';
data: {
Expand Down Expand Up @@ -166,7 +188,9 @@ export type E2EEWorkerMessage =
| DecryptDataRequestMessage
| DecryptDataResponseMessage
| EncryptDataRequestMessage
| EncryptDataResponseMessage;
| EncryptDataResponseMessage
| DiagnosticsRequestMessage
| DiagnosticsResponseMessage;

export type KeySet = { material: CryptoKey; encryptionKey: CryptoKey };

Expand Down
54 changes: 46 additions & 8 deletions src/e2ee/worker/FrameCryptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,17 @@ export class FrameCryptor extends BaseFrameCryptor {
.catch((e) => {
if (e instanceof TypeError && e.message === 'Destination stream closed') {
// this can happen when subscriptions happen in quick successions, but doesn't influence functionality
workerLogger.debug('destination stream closed');
workerLogger.debug('destination stream closed', { ...this.logContext, trackId });
} else {
workerLogger.warn('transform error', { error: e, ...this.logContext });
// pipeTo rejected — the transform pipeline for this track is torn
// down and no further frames will be (de)crypted until it is rebuilt.
// Log at error so the main thread / diagnostics buffer can surface it.
workerLogger.error('frame cryptor transform pipeline broke', {
...this.logContext,
trackId,
operation,
error: e,
});
this.emit(
CryptorEvent.Error,
e instanceof CryptorError
Expand Down Expand Up @@ -429,11 +437,23 @@ export class FrameCryptor extends BaseFrameCryptor {

return controller.enqueue(encodedFrame);
} catch (e: any) {
// TODO: surface this to the app.
workerLogger.error(e);
workerLogger.error('frame encryption failed', {
...this.logContext,
error: e,
ssrc: encodedFrame.getMetadata().synchronizationSource,
});
this.emitThrottledError(
e instanceof CryptorError
? e
: new CryptorError(
`frame encryption failed: ${e?.message ?? e}`,
undefined,
this.participantIdentity,
),
);
}
} else {
workerLogger.debug('failed to encrypt, emitting error', this.logContext);
workerLogger.warn('failed to encrypt, emitting error', this.logContext);
this.emitThrottledError(
new CryptorError(
`encryption key missing for encoding`,
Expand Down Expand Up @@ -479,7 +499,13 @@ export class FrameCryptor extends BaseFrameCryptor {
const keyIndex = data[encodedFrame.data.byteLength - 1];

if (this.keys.hasInvalidKeyAtIndex(keyIndex)) {
// drop frame
// drop frame — key at this index was marked invalid after too many
// consecutive decryption failures. Log at debug so the downstream
// silence has a traceable cause.
workerLogger.debug('dropping frame, key index is marked invalid', {
...this.logContext,
keyIndex,
});
return;
}

Expand All @@ -490,6 +516,14 @@ export class FrameCryptor extends BaseFrameCryptor {
if (decodedFrame) {
return controller.enqueue(decodedFrame);
}
// `decryptFrame` resolved without a frame — the only path that hits
// this is a ratchet recursion that unwound without producing output.
// Previously this was a silent drop which made decryption failures
// invisible to the main thread; surface it explicitly here.
workerLogger.warn('decryption yielded no frame, dropping', {
...this.logContext,
keyIndex,
});
} catch (error) {
if (error instanceof CryptorError && error.reason === CryptorErrorReason.InvalidKey) {
// emit an error if the key handler thinks we have a valid key
Expand All @@ -498,7 +532,7 @@ export class FrameCryptor extends BaseFrameCryptor {
this.keys.decryptionFailure(keyIndex);
}
} else {
workerLogger.warn('decoding frame failed', { error });
workerLogger.warn('decoding frame failed', { ...this.logContext, keyIndex, error });
}
}
} else {
Expand Down Expand Up @@ -627,7 +661,11 @@ export class FrameCryptor extends BaseFrameCryptor {
* as the key has not been updated on the keyHandler instance
*/

workerLogger.warn('maximum ratchet attempts exceeded');
workerLogger.warn('maximum ratchet attempts exceeded, giving up on frame', {
...this.logContext,
keyIndex,
attempts: ratchetOpts.ratchetCount,
});
throw new CryptorError(
`valid key missing for participant ${this.participantIdentity}`,
CryptorErrorReason.InvalidKey,
Expand Down
59 changes: 53 additions & 6 deletions src/e2ee/worker/e2ee.worker.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { workerLogger } from '../../logger';
import { setLogExtension, workerLogger } from '../../logger';
import { DiagnosticsBuffer } from '../../room/DiagnosticsBuffer';
import type { VideoCodec } from '../../room/track/options';
import { AsyncQueue } from '../../utils/AsyncQueue';
import { KEY_PROVIDER_DEFAULTS } from '../constants';
import { CryptorErrorReason } from '../errors';
import { CryptorEvent, KeyHandlerEvent } from '../events';
import type {
DecryptDataResponseMessage,
DiagnosticsResponseMessage,
E2EEWorkerMessage,
EncryptDataResponseMessage,
ErrorMessage,
Expand Down Expand Up @@ -37,13 +39,57 @@ let rtpMap: Map<number, VideoCodec> = new Map();

workerLogger.setDefaultLevel('info');

/**
* Worker-side ring buffer of recent log entries. Populated via a
* `setLogExtension` sink on `workerLogger` and shipped to the main thread
* on demand when a `diagnosticsRequest` arrives. Avoids streaming every
* log line across the postMessage boundary.
*/
const workerDiagnostics = new DiagnosticsBuffer();
let workerDiagnosticsSinkInstalled = false;

function installWorkerDiagnosticsSink() {
if (workerDiagnosticsSinkInstalled) return;
workerDiagnosticsSinkInstalled = true;
setLogExtension((level, message, context) => {
workerDiagnostics.push({
type: 'log',
timestamp: Date.now(),
level,
message,
context: context ? sanitizeForPostMessage(context) : undefined,
});
}, workerLogger);
}

/**
* Strip fields that aren't structured-clone-safe (functions, DOM nodes,
* etc.) and coerce Error instances into plain objects so that the buffer
* snapshot can be transferred to the main thread via `postMessage` without
* throwing when a diagnostics request arrives.
*/
function sanitizeForPostMessage(ctx: object): object {
const out: Record<string, unknown> = {};
for (const [key, value] of Object.entries(ctx)) {
if (value instanceof Error) {
out[key] = { name: value.name, message: value.message, stack: value.stack };
} else if (typeof value === 'function') {
// skip
} else {
out[key] = value;
}
}
return out;
}

onmessage = (ev) => {
messageQueue.run(async () => {
const { kind, data }: E2EEWorkerMessage = ev.data;

switch (kind) {
case 'init':
workerLogger.setLevel(data.loglevel);
installWorkerDiagnosticsSink();
workerLogger.info('worker initialized');
keyProviderOptions = data.keyProviderOptions;
useSharedKey = !!data.keyProviderOptions.sharedKey;
Expand Down Expand Up @@ -94,11 +140,6 @@ onmessage = (ev) => {
data.payload,
getParticipantKeyHandler(data.participantIdentity),
);
console.log('encrypted payload', {
original: data.payload,
encrypted: encryptedPayload,
iv,
});
postMessage({
kind: 'encryptDataResponse',
data: {
Expand Down Expand Up @@ -181,6 +222,12 @@ onmessage = (ev) => {
case 'setSifTrailer':
handleSifTrailer(data.trailer);
break;
case 'diagnosticsRequest':
postMessage({
kind: 'diagnosticsResponse',
data: { uuid: data.uuid, entries: workerDiagnostics.snapshot() },
} satisfies DiagnosticsResponseMessage);
break;
default:
break;
}
Expand Down
Loading