diff --git a/.size-limit.cjs b/.size-limit.cjs index c9f5ffe6dc..19c0ae8cf2 100644 --- a/.size-limit.cjs +++ b/.size-limit.cjs @@ -2,7 +2,7 @@ module.exports = [ { path: 'dist/livekit-client.esm.mjs', import: '{ Room }', - limit: '100 kB', + limit: '101 kB', }, { path: 'dist/livekit-client.umd.js', diff --git a/examples/demo/demo.ts b/examples/demo/demo.ts index 8b625152a7..5c2c1b125a 100644 --- a/examples/demo/demo.ts +++ b/examples/demo/demo.ts @@ -23,6 +23,7 @@ import { ParticipantEvent, RemoteParticipant, RemoteTrackPublication, + RemoteVideoTrack, Room, RoomEvent, ScreenSharePresets, @@ -40,7 +41,10 @@ import { supportsAV1, supportsVP9, } from '../../src/index'; +//@ts-ignore +import PTWorker from '../../src/packetTrailer/worker/packetTrailer.worker?worker'; import type { DataTrackFrame } from '../../src/room/data-track/frame'; +import { TrackEvent } from '../../src/room/events'; import { isSVCCodec, sleep, supportsH265 } from '../../src/room/utils'; setLogLevel(LogLevel.debug); @@ -106,6 +110,7 @@ const appActions = { const cryptoKey = ($('crypto-key')).value; const autoSubscribe = ($('auto-subscribe')).checked; const e2eeEnabled = ($('e2ee')).checked; + const packetTrailerEnabled = ($('packet-trailer')).checked; const audioOutputId = ($('audio-output')).value; let backupCodecPolicy: BackupCodecPolicy | undefined; if (($('multicodec-simulcast')).checked) { @@ -137,6 +142,7 @@ const appActions = { encryption: e2eeEnabled ? { keyProvider: state.e2eeKeyProvider, worker: new E2EEWorker() } : undefined, + packetTrailer: packetTrailerEnabled ? { worker: new PTWorker() } : undefined, }; if ( roomOpts.publishDefaults?.videoCodec === 'av1' || @@ -243,6 +249,35 @@ const appActions = { appendLog('subscribed to track', pub.trackSid, participant.identity); renderParticipant(participant); renderScreenShare(room); + if (track instanceof RemoteVideoTrack) { + let lastLatencyUpdate = 0; + let latencyDisplay = ''; + track.on(TrackEvent.TimeSyncUpdate, ({ rtpTimestamp }) => { + const meta = track.lookupFrameMetadata({ rtpTimestamp }); + const overlayElm = document.getElementById(`pt-overlay-${participant.identity}`); + if (overlayElm && meta) { + let text = meta.frameId ? `Frame ID: ${meta.frameId}` : ''; + if (meta.userTimestamp) { + const now = Date.now(); + const receiveTime = new Date(now); + const publishTime = new Date(Number(meta.userTimestamp / 1000n)); + if (now - lastLatencyUpdate >= 500) { + lastLatencyUpdate = now; + latencyDisplay = `${(receiveTime.getTime() - publishTime.getTime()).toFixed(1)}ms`; + } + const fmt = (d: Date) => { + const pad = (n: number, w = 2) => String(n).padStart(w, '0'); + return `${d.getFullYear()}-${pad(d.getMonth() + 1)}-${pad(d.getDate())} ${pad(d.getHours())}:${pad(d.getMinutes())}:${pad(d.getSeconds())}:${pad(d.getMilliseconds(), 3)}`; + }; + text += + `\nPublish: ${fmt(publishTime)}` + + `\nReceive: ${fmt(receiveTime)}` + + `\nLatency: ${latencyDisplay}`; + } + overlayElm.textContent = text; + } + }); + } }) .on(RoomEvent.TrackUnsubscribed, (_, pub, participant) => { appendLog('unsubscribed from track', pub.trackSid); @@ -850,6 +885,7 @@ function renderParticipant(participant: Participant, remove: boolean = false) { div.innerHTML = ` +
diff --git a/examples/demo/index.html b/examples/demo/index.html index da6dc03e5b..ee36990486 100644 --- a/examples/demo/index.html +++ b/examples/demo/index.html @@ -70,6 +70,10 @@

Livekit Sample App

+
+ + +
diff --git a/examples/demo/styles.css b/examples/demo/styles.css index 059a0815e5..cd2ea7f551 100644 --- a/examples/demo/styles.css +++ b/examples/demo/styles.css @@ -164,3 +164,19 @@ position: absolute; z-index: 4; } + +.participant .pt-overlay { + position: absolute; + top: 4px; + left: 4px; + padding: 2px 6px; + background-color: rgba(0, 0, 0, 0.6); + color: #fff; + font-family: monospace; + font-size: 11px; + line-height: 1.4; + border-radius: 3px; + z-index: 6; + pointer-events: none; + white-space: pre; +} diff --git a/package.json b/package.json index fbbc286b86..20ec11110a 100644 --- a/package.json +++ b/package.json @@ -15,6 +15,11 @@ "types": "./dist/src/e2ee/worker/e2ee.worker.d.ts", "import": "./dist/livekit-client.e2ee.worker.mjs", "require": "./dist/livekit-client.e2ee.worker.js" + }, + "./packet-trailer-worker": { + "types": "./dist/src/packetTrailer/worker/packetTrailer.worker.d.ts", + "import": "./dist/livekit-client.pt.worker.mjs", + "require": "./dist/livekit-client.pt.worker.js" } }, "files": [ @@ -29,6 +34,9 @@ ], "./dist/src/e2ee/worker/e2ee.worker.d.ts": [ "./dist/ts4.2/e2ee/worker/e2ee.worker.d.ts" + ], + "./dist/src/packetTrailer/worker/packetTrailer.worker.d.ts": [ + "./dist/ts4.2/packetTrailer/worker/packetTrailer.worker.d.ts" ] } }, @@ -36,7 +44,7 @@ "author": "LiveKit ", "license": "Apache-2.0", "scripts": { - "build": "rollup --config --bundleConfigAsCjs && rollup --config rollup.config.worker.js --bundleConfigAsCjs && pnpm downlevel-dts", + "build": "rollup --config --bundleConfigAsCjs && rollup --config rollup.config.worker.js --bundleConfigAsCjs && rollup --config rollup.config.pt-worker.js --bundleConfigAsCjs && pnpm downlevel-dts", "build:clean": "rm -rf ./dist && pnpm build", "build:watch": "rollup --watch --config --bundleConfigAsCjs", "build:worker:watch": "rollup --watch --config rollup.config.worker.js --bundleConfigAsCjs", @@ -57,7 +65,7 @@ }, "dependencies": { "@livekit/mutex": "1.1.1", - "@livekit/protocol": "1.45.3", + "@livekit/protocol": "1.45.6", "events": "^3.3.0", "jose": "^6.1.0", "loglevel": "^1.9.2", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 203931e4a7..02c03f0aa4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -12,8 +12,8 @@ importers: specifier: 1.1.1 version: 1.1.1 '@livekit/protocol': - specifier: 1.45.3 - version: 1.45.3 + specifier: 1.45.6 + version: 1.45.6 '@types/dom-mediacapture-record': specifier: ^1 version: 1.0.22 @@ -1124,8 +1124,8 @@ packages: '@livekit/mutex@1.1.1': resolution: {integrity: sha512-EsshAucklmpuUAfkABPxJNhzj9v2sG7JuzFDL4ML1oJQSV14sqrpTYnsaOudMAw9yOaW53NU3QQTlUQoRs4czw==} - '@livekit/protocol@1.45.3': - resolution: {integrity: sha512-WmMxBTsy4dRBqcrswFwUUlgq3Z0nnhOqKR6tX749Rb/PcB1yBMUtrHxZvcsS6qi3/5+86zHeVG+exmu1sZqfJg==} + '@livekit/protocol@1.45.6': + resolution: {integrity: sha512-YPDmrUiVe1EY/q/2bD+Fp+69DWq6LZgeH+G/KEbz07OIVf8hgAYzfb1FgiOdWLRpSj06+SuTmrOY604fWNuD3w==} '@livekit/throws-transformer@0.1.3': resolution: {integrity: sha512-PBttE6W6g/2ALGu6kWOunZ5qdrXwP9Ge1An2/62OfE6Rhc0Abd4yp6ex2pWhwUfGxDsSZvFgoB1Ia/5mWAMuKQ==} @@ -3852,8 +3852,8 @@ packages: engines: {node: '>=14.17'} hasBin: true - typescript@6.0.0-dev.20260416: - resolution: {integrity: sha512-64CCow5rQc6+oBAPqLtvZ8f3iz4WVCLwMVkg6/kOfMQlGshVOMkBhCXn+MMP4SX4Xtnz48VpD98FkptirMEuEQ==} + typescript@6.0.0-dev.20260401: + resolution: {integrity: sha512-trGd1r4vZ89ABUR8VnF2Lsl/Tb9ObfQNH3r+kF8NFVSLCq3b3x96McL6pXMnH/aa4tutrziJZvb7zm3WsAEfCA==} engines: {node: '>=14.17'} hasBin: true @@ -5300,7 +5300,7 @@ snapshots: '@livekit/mutex@1.1.1': {} - '@livekit/protocol@1.45.3': + '@livekit/protocol@1.45.6': dependencies: '@bufbuild/protobuf': 1.10.1 @@ -6398,7 +6398,7 @@ snapshots: dependencies: semver: 7.6.0 shelljs: 0.8.5 - typescript: 6.0.0-dev.20260416 + typescript: 6.0.0-dev.20260401 dunder-proto@1.0.1: dependencies: @@ -8284,7 +8284,7 @@ snapshots: typescript@5.8.3: {} - typescript@6.0.0-dev.20260416: {} + typescript@6.0.0-dev.20260401: {} uc.micro@2.1.0: {} diff --git a/rollup.config.pt-worker.js b/rollup.config.pt-worker.js new file mode 100644 index 0000000000..c6f00d04f8 --- /dev/null +++ b/rollup.config.pt-worker.js @@ -0,0 +1,25 @@ +import terser from '@rollup/plugin-terser'; +import typescript from 'rollup-plugin-typescript2'; +import packageJson from './package.json'; +import { commonPlugins, kebabCaseToPascalCase } from './rollup.config'; + +export default { + input: 'src/packetTrailer/worker/packetTrailer.worker.ts', + output: [ + { + file: `dist/${packageJson.name}.pt.worker.mjs`, + format: 'es', + strict: true, + sourcemap: true, + }, + { + file: `dist/${packageJson.name}.pt.worker.js`, + format: 'umd', + strict: true, + sourcemap: true, + name: kebabCaseToPascalCase(packageJson.name) + '.pt.worker', + plugins: [terser()], + }, + ], + plugins: [typescript({ tsconfig: './src/packetTrailer/worker/tsconfig.json' }), ...commonPlugins], +}; diff --git a/src/api/SignalClient.test.ts b/src/api/SignalClient.test.ts index bffa5b8268..968c7c92b1 100644 --- a/src/api/SignalClient.test.ts +++ b/src/api/SignalClient.test.ts @@ -1,10 +1,14 @@ import { + ClientInfo_Capability, DisconnectReason, + JoinRequest, JoinResponse, LeaveRequest, ReconnectResponse, SignalRequest, SignalResponse, + WrappedJoinRequest, + WrappedJoinRequest_Compression, } from '@livekit/protocol'; import { beforeEach, describe, expect, it, vi } from 'vitest'; import { ConnectionError, ConnectionErrorReason } from '../room/errors'; @@ -60,6 +64,7 @@ interface MockWebSocketStreamOptions { connection?: WebSocketConnection; opened?: Promise; closed?: Promise; + onUrl?: (url: string) => void; readyState?: number; } @@ -69,18 +74,19 @@ function mockWebSocketStream(options: MockWebSocketStreamOptions = {}) { opened = connection ? Promise.resolve(connection) : new Promise(() => {}), closed = new Promise(() => {}), readyState = 1, + onUrl, } = options; - return vi.mocked(WebSocketStream).mockImplementationOnce( - () => - ({ - url: 'wss://test.livekit.io', - opened, - closed, - close: vi.fn(), - readyState, - }) as any, - ); + return vi.mocked(WebSocketStream).mockImplementationOnce((url) => { + onUrl?.(url); + return { + url: 'wss://test.livekit.io', + opened, + closed, + close: vi.fn(), + readyState, + } as any; + }); } describe('SignalClient.connect', () => { @@ -99,6 +105,47 @@ describe('SignalClient.connect', () => { signalClient = new SignalClient(false); }); + async function decodeJoinRequestFromUrl(url: string): Promise { + const joinRequestParam = new URL(url).searchParams.get('join_request'); + expect(joinRequestParam).toBeTruthy(); + + const paddedBase64Url = joinRequestParam!.padEnd( + joinRequestParam!.length + ((4 - (joinRequestParam!.length % 4)) % 4), + '=', + ); + const binaryString = atob(paddedBase64Url.replace(/-/g, '+').replace(/_/g, '/')); + const wrappedBytes = new Uint8Array(binaryString.length); + for (let i = 0; i < binaryString.length; i += 1) { + wrappedBytes[i] = binaryString.charCodeAt(i); + } + + const wrappedJoinRequest = WrappedJoinRequest.fromBinary(wrappedBytes); + if (wrappedJoinRequest.compression === WrappedJoinRequest_Compression.NONE) { + return JoinRequest.fromBinary(wrappedJoinRequest.joinRequest); + } + + const stream = new DecompressionStream('gzip'); + const writer = stream.writable.getWriter(); + writer.write(wrappedJoinRequest.joinRequest); + writer.close(); + + const chunks: Uint8Array[] = []; + const reader = stream.readable.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + } + const totalLength = chunks.reduce((acc, chunk) => acc + chunk.length, 0); + const bytes = new Uint8Array(totalLength); + let offset = 0; + for (const chunk of chunks) { + bytes.set(chunk, offset); + offset += chunk.length; + } + return JoinRequest.fromBinary(bytes); + } + describe('Happy Path - Initial Join', () => { it('should successfully connect and receive join response', async () => { const joinResponse = createJoinResponse(); @@ -113,6 +160,51 @@ describe('SignalClient.connect', () => { expect(result).toEqual(joinResponse); expect(signalClient.currentState).toBe(SignalConnectionState.CONNECTED); }); + + it('does not advertise packet trailer capability by default', async () => { + const joinResponse = createJoinResponse(); + const signalResponse = createSignalResponse('join', joinResponse); + const mockReadable = createMockReadableStream([signalResponse]); + const mockConnection = createMockConnection(mockReadable); + let capturedUrl = ''; + + mockWebSocketStream({ + connection: mockConnection, + onUrl: (url) => { + capturedUrl = url; + }, + }); + + await signalClient.join('wss://test.livekit.io', 'test-token', defaultOptions); + + const joinRequest = await decodeJoinRequestFromUrl(capturedUrl); + expect(joinRequest.clientInfo?.capabilities).toEqual([]); + }); + + it('advertises packet trailer capability when provided', async () => { + const joinResponse = createJoinResponse(); + const signalResponse = createSignalResponse('join', joinResponse); + const mockReadable = createMockReadableStream([signalResponse]); + const mockConnection = createMockConnection(mockReadable); + let capturedUrl = ''; + + mockWebSocketStream({ + connection: mockConnection, + onUrl: (url) => { + capturedUrl = url; + }, + }); + + await signalClient.join('wss://test.livekit.io', 'test-token', { + ...defaultOptions, + clientInfoCapabilities: [ClientInfo_Capability.CAP_PACKET_TRAILER], + }); + + const joinRequest = await decodeJoinRequestFromUrl(capturedUrl); + expect(joinRequest.clientInfo?.capabilities).toEqual([ + ClientInfo_Capability.CAP_PACKET_TRAILER, + ]); + }); }); describe('Happy Path - Reconnect', () => { diff --git a/src/api/SignalClient.ts b/src/api/SignalClient.ts index b8768fd71e..ceaf8697e1 100644 --- a/src/api/SignalClient.ts +++ b/src/api/SignalClient.ts @@ -3,6 +3,7 @@ import { AddTrackRequest, AudioTrackFeature, ClientInfo, + ClientInfo_Capability, ConnectionQualityUpdate, ConnectionSettings, DataTrackSubscriberHandles, @@ -83,6 +84,7 @@ interface ConnectOpts extends SignalOptions { export interface SignalOptions { autoSubscribe: boolean; adaptiveStream?: boolean; + clientInfoCapabilities?: ClientInfo_Capability[]; maxRetries: number; e2eeEnabled: boolean; websocketTimeout: number; @@ -323,7 +325,7 @@ export class SignalClient { this.connectOptions = opts; this.useV0SignalPath = useV0Path; - const clientInfo = getClientInfo(); + const clientInfo = getClientInfo(opts.clientInfoCapabilities); const params = useV0Path ? createConnectionParams(token, clientInfo, opts) : await createJoinRequestConnectionParams(token, clientInfo, opts, publisherOffer); diff --git a/src/api/WebSocketStream.test.ts b/src/api/WebSocketStream.test.ts index 3445348042..08de033427 100644 --- a/src/api/WebSocketStream.test.ts +++ b/src/api/WebSocketStream.test.ts @@ -1,4 +1,3 @@ -/* eslint-disable @typescript-eslint/no-unused-vars */ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { WebSocketStream } from './WebSocketStream'; diff --git a/src/e2ee/E2eeManager.ts b/src/e2ee/E2eeManager.ts index 7bc0d4d71d..3be1ebab5d 100644 --- a/src/e2ee/E2eeManager.ts +++ b/src/e2ee/E2eeManager.ts @@ -8,10 +8,17 @@ import { ConnectionState } from '../room/Room'; import { DeviceUnsupportedError } from '../room/errors'; import { EngineEvent, ParticipantEvent, RoomEvent } from '../room/events'; import type RemoteTrack from '../room/track/RemoteTrack'; +import RemoteVideoTrack from '../room/track/RemoteVideoTrack'; import type { Track } from '../room/track/Track'; import type { VideoCodec } from '../room/track/options'; import { mimeTypeToVideoCodecString } from '../room/track/utils'; -import { Future, isChromiumBased, isLocalTrack, isSafariBased, isVideoTrack } from '../room/utils'; +import { + Future, + isLocalTrack, + isSafariBased, + isScriptTransformSupportedForWorker, + isVideoTrack, +} from '../room/utils'; import type { BaseKeyProvider } from './KeyProvider'; import { E2EE_FLAG } from './constants'; import { type E2EEManagerCallbacks, EncryptionEvent, KeyProviderEvent } from './events'; @@ -34,7 +41,7 @@ import type { SifTrailerMessage, UpdateCodecMessage, } from './types'; -import { isE2EESupported, isScriptTransformSupported } from './utils'; +import { isE2EESupported } from './utils'; export interface BaseE2EEManager { setup(room: Room): void; @@ -221,6 +228,9 @@ export class E2EEManager encryptFuture.resolve(data as EncryptDataResponseMessage['data']); } break; + case 'packetTrailerMetadata': + this.handlePacketTrailerMetadata(data.trackId, data.rtpTimestamp, data.ssrc, data.metadata); + break; default: break; } @@ -231,6 +241,33 @@ export class E2EEManager this.emit(EncryptionEvent.EncryptionError, ev.error, undefined); }; + private handlePacketTrailerMetadata( + trackId: string, + rtpTimestamp: number, + ssrc: number, + metadata: { userTimestamp: bigint; frameId: number }, + ) { + if (!this.room) { + return; + } + for (const participant of [ + this.room.localParticipant, + ...this.room.remoteParticipants.values(), + ]) { + for (const pub of participant.trackPublications.values()) { + if ( + pub.track && + pub.track.mediaStreamID === trackId && + pub.track instanceof RemoteVideoTrack && + pub.track.packetTrailerExtractor + ) { + pub.track.packetTrailerExtractor.storeMetadata(rtpTimestamp, ssrc, metadata); + return; + } + } + } + } + public setupEngine(engine: RTCEngine) { engine.on(EngineEvent.RTPVideoMapUpdate, (rtpMap) => { this.postRTPMap(rtpMap); @@ -448,11 +485,16 @@ export class E2EEManager if (!trackInfo?.mimeType || trackInfo.mimeType === '') { throw new TypeError('MimeType missing from trackInfo, cannot set up E2EE cryptor'); } + const hasPacketTrailer = + track.kind === 'video' && + !!trackInfo.packetTrailerFeatures && + trackInfo.packetTrailerFeatures.length > 0; this.handleReceiver( track.receiver, track.mediaStreamID, remoteId, track.kind === 'video' ? mimeTypeToVideoCodecString(trackInfo.mimeType) : undefined, + hasPacketTrailer, ); } @@ -474,22 +516,19 @@ export class E2EEManager trackId: string, participantIdentity: string, codec?: VideoCodec, + hasPacketTrailer?: boolean, ) { if (!this.worker) { return; } - if ( - isScriptTransformSupported() && - // Chrome occasionally throws an `InvalidState` error when using script transforms directly after introducing this API in 141. - // Disabling it for Chrome based browsers until the API has stabilized - !isChromiumBased() - ) { + if (isScriptTransformSupportedForWorker()) { const options: ScriptTransformOptions = { kind: 'decode', participantIdentity, trackId, codec, + hasPacketTrailer, }; // @ts-ignore receiver.transform = new RTCRtpScriptTransform(this.worker, options); @@ -532,6 +571,7 @@ export class E2EEManager codec, participantIdentity: participantIdentity, isReuse: E2EE_FLAG in receiver, + hasPacketTrailer, }, }; this.worker.postMessage(msg, [readable, writable]); @@ -555,12 +595,7 @@ export class E2EEManager throw TypeError('local identity needs to be known in order to set up encrypted sender'); } - if ( - isScriptTransformSupported() && - // Chrome occasionally throws an `InvalidState` error when using script transforms directly after introducing this API in 141. - // Disabling it for Chrome based browsers until the API has stabilized - !isChromiumBased() - ) { + if (isScriptTransformSupportedForWorker()) { log.info('initialize script transform'); const options = { kind: 'encode', diff --git a/src/e2ee/types.ts b/src/e2ee/types.ts index d7caa48e3e..a3ff0b712d 100644 --- a/src/e2ee/types.ts +++ b/src/e2ee/types.ts @@ -1,4 +1,5 @@ import type { LogLevel } from '../logger'; +import type { PacketTrailerFramePayload } from '../packetTrailer/packetTrailer'; import type { VideoCodec } from '../room/track/options'; import type { BaseE2EEManager } from './E2eeManager'; import type { BaseKeyProvider } from './KeyProvider'; @@ -51,6 +52,12 @@ export interface EncodeMessage extends BaseMessage { trackId: string; codec?: VideoCodec; isReuse: boolean; + /** + * Whether the published track advertises packet trailer features. + * When false, the cryptor skips the per-frame trailer extraction path + * entirely on decode. + */ + hasPacketTrailer?: boolean; }; } @@ -150,6 +157,11 @@ export interface EncryptDataResponseMessage extends BaseMessage { }; } +export interface PTMetadataFromE2EEMessage extends BaseMessage { + kind: 'packetTrailerMetadata'; + data: PacketTrailerFramePayload; +} + export type E2EEWorkerMessage = | InitMessage | SetKeyMessage @@ -166,7 +178,8 @@ export type E2EEWorkerMessage = | DecryptDataRequestMessage | DecryptDataResponseMessage | EncryptDataRequestMessage - | EncryptDataResponseMessage; + | EncryptDataResponseMessage + | PTMetadataFromE2EEMessage; export type KeySet = { material: CryptoKey; encryptionKey: CryptoKey }; @@ -221,4 +234,10 @@ export type ScriptTransformOptions = { participantIdentity: string; trackId: string; codec?: VideoCodec; + /** + * Whether the published track advertises packet trailer features. + * When false, the cryptor skips the per-frame trailer extraction path + * entirely on decode. + */ + hasPacketTrailer?: boolean; }; diff --git a/src/e2ee/utils.ts b/src/e2ee/utils.ts index b7f78b94a7..f51c33cd7b 100644 --- a/src/e2ee/utils.ts +++ b/src/e2ee/utils.ts @@ -8,11 +8,12 @@ export function isE2EESupported() { export function isScriptTransformSupported() { // @ts-ignore - return typeof window.RTCRtpScriptTransform !== 'undefined'; + return typeof window !== 'undefined' && typeof window.RTCRtpScriptTransform !== 'undefined'; } export function isInsertableStreamSupported() { return ( + typeof window !== 'undefined' && typeof window.RTCRtpSender !== 'undefined' && // @ts-ignore typeof window.RTCRtpSender.prototype.createEncodedStreams !== 'undefined' diff --git a/src/e2ee/worker/FrameCryptor.ts b/src/e2ee/worker/FrameCryptor.ts index 1958f2bec1..00858f3265 100644 --- a/src/e2ee/worker/FrameCryptor.ts +++ b/src/e2ee/worker/FrameCryptor.ts @@ -1,13 +1,19 @@ -/* eslint-disable @typescript-eslint/no-unused-vars */ // TODO code inspired by https://github.com/webrtc/samples/blob/gh-pages/src/content/insertable-streams/endtoend-encryption/js/worker.js import { EventEmitter } from 'events'; import type TypedEventEmitter from 'typed-emitter'; import { workerLogger } from '../../logger'; +import { processPacketTrailer } from '../../packetTrailer/packetTrailer'; import type { VideoCodec } from '../../room/track/options'; import { ENCRYPTION_ALGORITHM, IV_LENGTH, UNENCRYPTED_BYTES } from '../constants'; import { CryptorError, CryptorErrorReason } from '../errors'; import { type CryptorCallbacks, CryptorEvent } from '../events'; -import type { DecodeRatchetOptions, KeyProviderOptions, KeySet, RatchetResult } from '../types'; +import type { + DecodeRatchetOptions, + KeyProviderOptions, + KeySet, + PTMetadataFromE2EEMessage, + RatchetResult, +} from '../types'; import { deriveKeys, isVideoFrame, needsRbspUnescaping, parseRbsp, writeRbsp } from '../utils'; import type { ParticipantKeyHandler } from './ParticipantKeyHandler'; import { processNALUsForEncryption } from './naluUtils'; @@ -71,6 +77,13 @@ export class FrameCryptor extends BaseFrameCryptor { private currentTransform?: TransformerInfo; + /** + * Whether the subscribed track advertises packet trailer features. + * When false, we skip the per-frame trailer extraction path entirely + * on decode to avoid unnecessary work on tracks that don't use it. + */ + private hasPacketTrailer: boolean = false; + /** * Throttling mechanism for decryption errors to prevent memory leaks */ @@ -178,6 +191,15 @@ export class FrameCryptor extends BaseFrameCryptor { this.rtpMap = map; } + /** + * Sets whether the track associated with this cryptor carries packet + * trailer data. When false, {@link decodeFunction} skips the per-frame + * trailer extraction branch entirely. + */ + setHasPacketTrailer(hasPacketTrailer: boolean) { + this.hasPacketTrailer = hasPacketTrailer; + } + setupTransform( operation: 'encode' | 'decode', readable: ReadableStream, @@ -454,6 +476,24 @@ export class FrameCryptor extends BaseFrameCryptor { encodedFrame: RTCEncodedVideoFrame | RTCEncodedAudioFrame, controller: TransformStreamDefaultController, ) { + if (this.hasPacketTrailer && isVideoFrame(encodedFrame)) { + try { + const ptResult = processPacketTrailer(encodedFrame, this.trackId); + if (ptResult.data) { + encodedFrame.data = ptResult.data; + } + if (ptResult.payload && this.participantIdentity) { + const msg: PTMetadataFromE2EEMessage = { + kind: 'packetTrailerMetadata', + data: ptResult.payload, + }; + postMessage(msg); + } + } catch { + // best-effort: never break the media pipeline if trailer parsing fails + } + } + if ( !this.isEnabled() || // skip for decryption for empty dtx frames diff --git a/src/e2ee/worker/e2ee.worker.ts b/src/e2ee/worker/e2ee.worker.ts index f9c642576c..b4529a2853 100644 --- a/src/e2ee/worker/e2ee.worker.ts +++ b/src/e2ee/worker/e2ee.worker.ts @@ -64,6 +64,7 @@ onmessage = (ev) => { break; case 'decode': let cryptor = getTrackCryptor(data.participantIdentity, data.trackId); + cryptor.setHasPacketTrailer(!!data.hasPacketTrailer); cryptor.setupTransform( kind, data.readableStream, @@ -333,10 +334,11 @@ if (self.RTCTransformEvent) { self.onrtctransform = (event: RTCTransformEvent) => { // @ts-ignore const transformer = event.transformer; - const { kind, participantIdentity, trackId, codec } = + const { kind, participantIdentity, trackId, codec, hasPacketTrailer } = transformer.options as ScriptTransformOptions; messageQueue.run(async () => { const cryptor = getTrackCryptor(participantIdentity, trackId); + cryptor.setHasPacketTrailer(!!hasPacketTrailer); workerLogger.debug('onrtctransform setup', { participantIdentity, trackId, codec }); cryptor.setupTransform( kind, diff --git a/src/index.ts b/src/index.ts index 1dda5a86de..7383d3633b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -63,6 +63,11 @@ import { import { getBrowser } from './utils/browserParser'; export { RpcError, type RpcInvocationData, type PerformRpcParams } from './room/rpc'; +export type { PacketTrailerMetadata } from './packetTrailer/types'; +export { + PacketTrailerManager, + type PacketTrailerOptions, +} from './packetTrailer/PacketTrailerManager'; export * from './connectionHelper/ConnectionCheck'; export * from './connectionHelper/checks/Checker'; diff --git a/src/options.ts b/src/options.ts index f562672220..9b0ef36e77 100644 --- a/src/options.ts +++ b/src/options.ts @@ -1,4 +1,5 @@ import type { E2EEOptions } from './e2ee/types'; +import type { PacketTrailerOptions } from './packetTrailer/PacketTrailerManager'; import type { ReconnectPolicy } from './room/ReconnectPolicy'; import type { AudioCaptureOptions, @@ -100,6 +101,13 @@ export interface InternalRoomOptions { loggerName?: string; + /** + * @experimental + * Options for enabling packet trailer extraction on received video tracks. + * Packet trailers carry frame-level metadata such as user timestamps and frame IDs. + */ + packetTrailer?: PacketTrailerOptions; + /** * will attempt to connect via single peer connection mode. * falls back to dual peer connection mode if not available. diff --git a/src/packetTrailer/PacketTrailerManager.test.ts b/src/packetTrailer/PacketTrailerManager.test.ts new file mode 100644 index 0000000000..108e74d5a6 --- /dev/null +++ b/src/packetTrailer/PacketTrailerManager.test.ts @@ -0,0 +1,172 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; +import type { TrackInfo } from '@livekit/protocol'; +import { PacketTrailerManager } from './PacketTrailerManager'; + +describe('PacketTrailerManager', () => { + const originalRTCRtpSender = window.RTCRtpSender; + const originalUserAgent = navigator.userAgent; + const originalRTCRtpScriptTransform = (window as unknown as { RTCRtpScriptTransform?: unknown }) + .RTCRtpScriptTransform; + + afterEach(() => { + Object.defineProperty(window, 'RTCRtpSender', { + configurable: true, + value: originalRTCRtpSender, + writable: true, + }); + Object.defineProperty(window.navigator, 'userAgent', { + configurable: true, + value: originalUserAgent, + }); + Object.defineProperty(window, 'RTCRtpScriptTransform', { + configurable: true, + value: originalRTCRtpScriptTransform, + writable: true, + }); + Object.defineProperty(globalThis, 'RTCRtpScriptTransform', { + configurable: true, + value: originalRTCRtpScriptTransform, + writable: true, + }); + }); + + function stubInsertableStreamsSupport() { + class MockRTCRtpSender { + createEncodedStreams() {} + } + + Object.defineProperty(window, 'RTCRtpSender', { + configurable: true, + value: MockRTCRtpSender, + writable: true, + }); + } + + function useSafariUserAgent() { + Object.defineProperty(window.navigator, 'userAgent', { + configurable: true, + value: + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 14_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15', + }); + } + + function setScriptTransform(mock: unknown) { + Object.defineProperty(window, 'RTCRtpScriptTransform', { + configurable: true, + value: mock, + writable: true, + }); + Object.defineProperty(globalThis, 'RTCRtpScriptTransform', { + configurable: true, + value: mock, + writable: true, + }); + } + + function setupWorkerReceiver(manager: PacketTrailerManager, receiver: RTCRtpReceiver) { + ( + manager as unknown as { + setupWorkerReceiver: (receiver: RTCRtpReceiver, newTrackId: string) => void; + } + ).setupWorkerReceiver(receiver, 'track-id'); + } + + function setupReceiver( + manager: PacketTrailerManager, + receiver: RTCRtpReceiver, + trackId: string, + trackInfo?: TrackInfo, + ) { + ( + manager as unknown as { + setupReceiver: ( + track: { receiver: RTCRtpReceiver; mediaStreamID: string }, + trackInfo?: TrackInfo, + ) => void; + } + ).setupReceiver({ receiver, mediaStreamID: trackId }, trackInfo); + } + + function makeReceiver() { + const readable = {} as ReadableStream; + const writable = {} as WritableStream; + const createEncodedStreams = vi.fn(() => ({ readable, writable })); + + return { + receiver: { createEncodedStreams } as unknown as RTCRtpReceiver, + readable, + writable, + createEncodedStreams, + }; + } + + it('uses RTCRtpScriptTransform for packet trailer extraction when supported', () => { + useSafariUserAgent(); + const transform = {}; + const RTCRtpScriptTransform = vi.fn(() => transform); + setScriptTransform(RTCRtpScriptTransform); + + const worker = {} as Worker; + const manager = new PacketTrailerManager({ worker }); + const receiver = { + createEncodedStreams: vi.fn(), + } as unknown as RTCRtpReceiver; + + setupWorkerReceiver(manager, receiver); + + expect(RTCRtpScriptTransform).toHaveBeenCalledWith(worker, { + kind: 'decode', + trackId: 'track-id', + }); + expect((receiver as unknown as { transform: unknown }).transform).toBe(transform); + expect( + (receiver as unknown as { createEncodedStreams: ReturnType }) + .createEncodedStreams, + ).not.toHaveBeenCalled(); + }); + + it('sets up a passthrough receiver pipeline when a subscribed track has no packet trailer features', () => { + stubInsertableStreamsSupport(); + + const worker = { postMessage: vi.fn() } as unknown as Worker; + const manager = new PacketTrailerManager({ worker }); + const { receiver, readable, writable, createEncodedStreams } = makeReceiver(); + + setupReceiver(manager, receiver, 'track-without-trailer'); + + expect(createEncodedStreams).toHaveBeenCalledTimes(1); + expect(worker.postMessage).toHaveBeenCalledWith( + { + kind: 'decode', + data: { + readableStream: readable, + writableStream: writable, + trackId: 'track-without-trailer', + hasPacketTrailer: false, + }, + }, + [readable, writable], + ); + }); + + it('updates a reused receiver from trailer extraction to passthrough for tracks without packet trailer features', () => { + stubInsertableStreamsSupport(); + + const worker = { postMessage: vi.fn() } as unknown as Worker; + const manager = new PacketTrailerManager({ worker }); + const { receiver } = makeReceiver(); + const trackInfo = { packetTrailerFeatures: [1] } as unknown as TrackInfo; + + setupReceiver(manager, receiver, 'track-with-trailer', trackInfo); + setupReceiver(manager, receiver, 'track-without-trailer'); + + expect(worker.postMessage).toHaveBeenLastCalledWith({ + kind: 'updateTrackId', + data: { + oldTrackId: 'track-with-trailer', + newTrackId: 'track-without-trailer', + hasPacketTrailer: false, + }, + }); + }); +}); diff --git a/src/packetTrailer/PacketTrailerManager.ts b/src/packetTrailer/PacketTrailerManager.ts new file mode 100644 index 0000000000..7e4438f5e5 --- /dev/null +++ b/src/packetTrailer/PacketTrailerManager.ts @@ -0,0 +1,250 @@ +import type { TrackInfo } from '@livekit/protocol'; +import log from '../logger'; +import type Room from '../room/Room'; +import { RoomEvent } from '../room/events'; +import { PacketTrailerExtractor } from '../room/track/PacketTrailerExtractor'; +import type RemoteTrack from '../room/track/RemoteTrack'; +import RemoteVideoTrack from '../room/track/RemoteVideoTrack'; +import type { PTDecodeMessage, PTUpdateTrackIdMessage, PTWorkerMessage } from './types'; +import { isPacketTrailerSupported, shouldUsePacketTrailerScriptTransform } from './utils'; + +export interface PacketTrailerOptions { + /** + * Dedicated worker for extracting packet trailers off the main thread. + * + * Encoded video streams are transferred to the worker for processing, which + * avoids per-frame work on the main thread. + */ + worker: Worker; +} + +/** + * Manages packet trailer extraction for received video tracks. + * + * When a track's TrackInfo indicates packet trailer features, the manager + * wires up an encoded frame transform to strip the trailer from encoded frames + * and cache the metadata for lookup. + * + * Packet trailer extraction is worker-only. If no worker is configured, the + * SDK does not advertise packet trailer support and skips extraction. + * + * When E2EE is active, the E2EE FrameCryptor worker handles trailer + * extraction directly (before decryption), so this manager only creates + * the extractor/metadata cache — no separate pipeline is installed. + * + * @experimental + */ +export class PacketTrailerManager { + private worker?: Worker; + + private room?: Room; + + private extractors = new Map(); + + /** + * Tracks the trackId associated with each receiver that has had its + * encoded streams handed off to the worker. Used to detect receiver + * reuse (transceiver recycling) so we can remap trackIds instead of + * re-transferring already-consumed streams. + */ + private workerPipelines = new Map(); + + constructor(options?: PacketTrailerOptions) { + this.worker = options?.worker; + } + + /** @internal */ + setup(room: Room) { + if (room === this.room) { + return; + } + this.room = room; + + if (this.worker) { + this.worker.onmessage = this.onWorkerMessage; + this.worker.onerror = this.onWorkerError; + this.worker.postMessage({ kind: 'init' }); + } + + room + .on(RoomEvent.TrackSubscribed, (track, pub, _participant) => { + if (track.kind !== 'video') { + return; + } + this.setupReceiver(track as unknown as RemoteVideoTrack, pub.trackInfo); + }) + .on(RoomEvent.TrackUnsubscribed, (track) => { + this.teardownTrack(track); + }) + .on(RoomEvent.Disconnected, () => { + this.cleanup(); + }); + } + + private setupReceiver(track: RemoteVideoTrack, trackInfo?: TrackInfo) { + const receiver = track.receiver; + if (!receiver) { + return; + } + + // Only install a pipeline for tracks that actually advertise packet + // trailer features. This keeps us out of the way for tracks published by + // clients on older protocols or that don't opt into the feature. + const hasFeatures = + !!trackInfo?.packetTrailerFeatures && trackInfo.packetTrailerFeatures.length > 0; + if (!hasFeatures) { + if (!this.room?.hasE2EESetup) { + this.setupPassthroughReceiver(receiver, track.mediaStreamID); + } + return; + } + + if ( + !isPacketTrailerSupported(this.worker ? { worker: this.worker } : undefined) && + !this.room?.hasE2EESetup + ) { + log.warn('packet trailer transform not supported; skipping extraction'); + return; + } + + const extractor = new PacketTrailerExtractor(); + const trackId = track.mediaStreamID; + + this.extractors.set(trackId, extractor); + track.packetTrailerExtractor = extractor; + + if (this.room?.hasE2EESetup) { + // E2EE worker strips the trailer and injects metadata directly into + // the extractor via E2eeManager; no pipeline is needed here. + return; + } + + this.setupWorkerReceiver(receiver, trackId, true); + } + + private setupPassthroughReceiver(receiver: RTCRtpReceiver, trackId: string) { + if (shouldUsePacketTrailerScriptTransform()) { + if ('transform' in receiver) { + // @ts-ignore + receiver.transform = null; + } + return; + } + + if ( + this.worker && + isPacketTrailerSupported({ worker: this.worker }) && + !this.workerPipelines.has(receiver) + ) { + this.setupWorkerReceiver(receiver, trackId, false); + return; + } + + if (this.worker && this.workerPipelines.has(receiver)) { + this.setupWorkerReceiver(receiver, trackId, false); + } + } + + private setupWorkerReceiver( + receiver: RTCRtpReceiver, + newTrackId: string, + hasPacketTrailer = true, + ) { + const worker = this.worker; + if (!worker) { + return; + } + + if (shouldUsePacketTrailerScriptTransform()) { + // @ts-ignore + receiver.transform = new RTCRtpScriptTransform(worker, { + kind: 'decode', + trackId: newTrackId, + }); + return; + } + + const existingTrackId = this.workerPipelines.get(receiver); + + if (existingTrackId) { + // Receiver is reused (transceiver recycled). The worker already owns + // the encoded streams — just remap the trackId so metadata is keyed + // correctly and re-activate processing. + const msg: PTUpdateTrackIdMessage = { + kind: 'updateTrackId', + data: { oldTrackId: existingTrackId, newTrackId, hasPacketTrailer }, + }; + worker.postMessage(msg); + this.workerPipelines.set(receiver, newTrackId); + return; + } + + if (!('createEncodedStreams' in receiver)) { + log.warn('createEncodedStreams not supported'); + return; + } + + let streams: { readable: ReadableStream; writable: WritableStream }; + try { + // @ts-ignore — createEncodedStreams is not in standard typings + streams = receiver.createEncodedStreams(); + } catch (err) { + log.warn('failed to create encoded streams', { error: err }); + return; + } + + const msg: PTDecodeMessage = { + kind: 'decode', + data: { + readableStream: streams.readable, + writableStream: streams.writable, + trackId: newTrackId, + hasPacketTrailer, + }, + }; + worker.postMessage(msg, [streams.readable, streams.writable]); + this.workerPipelines.set(receiver, newTrackId); + } + + private teardownTrack(track: RemoteTrack) { + const trackId = track.mediaStreamID; + const extractor = this.extractors.get(trackId); + if (extractor) { + extractor.dispose(); + this.extractors.delete(trackId); + } + + if (track instanceof RemoteVideoTrack) { + track.packetTrailerExtractor = undefined; + } + + // The receiver pipeline is intentionally left running. If the receiver is + // reused for a new track, `setupReceiver` will remap it. If the room + // disconnects, `cleanup` drops all state. Any metadata produced in the + // meantime is harmless — the extractor above has already been disposed and + // is no longer reachable from any track. + } + + private cleanup() { + for (const extractor of this.extractors.values()) { + extractor.dispose(); + } + this.extractors.clear(); + this.workerPipelines.clear(); + this.worker?.terminate(); + } + + private onWorkerMessage = (ev: MessageEvent) => { + const msg = ev.data; + if (msg.kind === 'metadata') { + const extractor = this.extractors.get(msg.data.trackId); + if (extractor) { + extractor.storeMetadata(msg.data.rtpTimestamp, msg.data.ssrc, msg.data.metadata); + } + } + }; + + private onWorkerError = (ev: ErrorEvent) => { + log.error('packet trailer worker encountered an error:', { error: ev.error }); + }; +} diff --git a/src/packetTrailer/packetTrailer.test.ts b/src/packetTrailer/packetTrailer.test.ts new file mode 100644 index 0000000000..ba2cd7a18c --- /dev/null +++ b/src/packetTrailer/packetTrailer.test.ts @@ -0,0 +1,79 @@ +import { describe, expect, it } from 'vitest'; +import { appendPacketTrailer, extractPacketTrailer, processPacketTrailer } from './packetTrailer'; + +describe('packetTrailer', () => { + it('extracts user timestamp and frame id from packet trailer', () => { + const payload = Uint8Array.from([1, 2, 3, 4]); + const trailer = appendPacketTrailer(payload, 1_744_249_600_123_456n, 42); + const extracted = extractPacketTrailer(trailer); + + expect(Array.from(extracted.data)).toEqual(Array.from(payload)); + expect(extracted.metadata).toEqual({ + userTimestamp: 1_744_249_600_123_456n, + frameId: 42, + }); + }); + + it('extracts timestamp-only trailer when frameId is 0', () => { + const payload = Uint8Array.from([1, 2, 3, 4]); + const trailer = appendPacketTrailer(payload, 1_744_249_600_123_456n, 0); + const extracted = extractPacketTrailer(trailer); + + expect(Array.from(extracted.data)).toEqual(Array.from(payload)); + expect(extracted.metadata).toEqual({ + userTimestamp: 1_744_249_600_123_456n, + frameId: 0, + }); + }); + + it('extracts frameId-only trailer when timestamp is 0', () => { + const payload = Uint8Array.from([1, 2, 3, 4]); + const trailer = appendPacketTrailer(payload, 0n, 42); + const extracted = extractPacketTrailer(trailer); + + expect(Array.from(extracted.data)).toEqual(Array.from(payload)); + expect(extracted.metadata).toEqual({ + userTimestamp: 0n, + frameId: 42, + }); + }); + + it('returns data unchanged when both timestamp and frameId are 0', () => { + const payload = Uint8Array.from([1, 2, 3, 4]); + const result = appendPacketTrailer(payload, 0n, 0); + + expect(Array.from(result)).toEqual(Array.from(payload)); + }); + + it('passes frames through when there is no valid trailer', () => { + const payload = Uint8Array.from([1, 2, 3, 4, 5]); + const extracted = extractPacketTrailer(payload); + + expect(Array.from(extracted.data)).toEqual(Array.from(payload)); + expect(extracted.metadata).toBeUndefined(); + }); + + it('uses the encoded frame timestamp when metadata does not include an RTP timestamp', () => { + const payload = Uint8Array.from([1, 2, 3, 4]); + const trailer = appendPacketTrailer(payload, 1_744_249_600_123_456n, 42); + const frame = { + data: trailer.buffer, + timestamp: 1234, + getMetadata() { + return {}; + }, + } as unknown as RTCEncodedVideoFrame; + + const result = processPacketTrailer(frame, 'track-id'); + + expect(result.payload).toEqual({ + trackId: 'track-id', + rtpTimestamp: 1234, + ssrc: 0, + metadata: { + userTimestamp: 1_744_249_600_123_456n, + frameId: 42, + }, + }); + }); +}); diff --git a/src/packetTrailer/packetTrailer.ts b/src/packetTrailer/packetTrailer.ts new file mode 100644 index 0000000000..f33edc83d4 --- /dev/null +++ b/src/packetTrailer/packetTrailer.ts @@ -0,0 +1,250 @@ +import type { PacketTrailerMetadata } from './types'; + +export const PACKET_TRAILER_MAGIC = Uint8Array.from([ + 'L'.charCodeAt(0), + 'K'.charCodeAt(0), + 'T'.charCodeAt(0), + 'S'.charCodeAt(0), +]); + +export const PACKET_TRAILER_TIMESTAMP_TAG = 0x01; +export const PACKET_TRAILER_FRAME_ID_TAG = 0x02; +export const PACKET_TRAILER_ENVELOPE_SIZE = 5; + +const TIMESTAMP_TLV_SIZE = 10; +const FRAME_ID_TLV_SIZE = 6; + +export interface ExtractPacketTrailerResult { + data: Uint8Array; + metadata?: PacketTrailerMetadata; +} + +export function appendPacketTrailer( + data: Uint8Array, + userTimestamp: bigint, + frameId: number, +): Uint8Array { + const hasTimestamp = userTimestamp !== BigInt(0); + const hasFrameId = frameId !== 0; + + if (!hasTimestamp && !hasFrameId) { + return data; + } + + const trailerLength = + (hasTimestamp ? TIMESTAMP_TLV_SIZE : 0) + + (hasFrameId ? FRAME_ID_TLV_SIZE : 0) + + PACKET_TRAILER_ENVELOPE_SIZE; + const result = new Uint8Array(data.length + trailerLength); + let offset = 0; + + result.set(data, offset); + offset += data.length; + + if (hasTimestamp) { + result[offset++] = PACKET_TRAILER_TIMESTAMP_TAG ^ 0xff; + result[offset++] = 8 ^ 0xff; + writeUint64Xor(result, offset, userTimestamp); + offset += 8; + } + + if (hasFrameId) { + result[offset++] = PACKET_TRAILER_FRAME_ID_TAG ^ 0xff; + result[offset++] = 4 ^ 0xff; + writeUint32Xor(result, offset, frameId); + offset += 4; + } + + result[offset++] = trailerLength ^ 0xff; + result.set(PACKET_TRAILER_MAGIC, offset); + + return result; +} + +export function extractPacketTrailer(data: ArrayBuffer | Uint8Array): ExtractPacketTrailerResult { + const bytes = data instanceof Uint8Array ? data : new Uint8Array(data); + if (bytes.length < PACKET_TRAILER_ENVELOPE_SIZE) { + return { data: bytes }; + } + + const magicOffset = bytes.length - PACKET_TRAILER_MAGIC.length; + if (!matchesMagic(bytes, magicOffset)) { + return { data: bytes }; + } + + const trailerLength = bytes[bytes.length - PACKET_TRAILER_ENVELOPE_SIZE] ^ 0xff; + if (trailerLength < PACKET_TRAILER_ENVELOPE_SIZE || trailerLength > bytes.length) { + return { data: bytes }; + } + + const trailerStart = bytes.length - trailerLength; + const trailerEnd = bytes.length - PACKET_TRAILER_ENVELOPE_SIZE; + const strippedData = bytes.subarray(0, trailerStart); + let offset = trailerStart; + let foundAny = false; + const metadata: PacketTrailerMetadata = { + userTimestamp: BigInt(0), + frameId: 0, + }; + + while (offset + 2 <= trailerEnd) { + const tag = bytes[offset++] ^ 0xff; + const length = bytes[offset++] ^ 0xff; + + if (offset + length > trailerEnd) { + break; + } + + if (tag === PACKET_TRAILER_TIMESTAMP_TAG && length === 8) { + metadata.userTimestamp = readUint64Xor(bytes, offset); + foundAny = true; + } else if (tag === PACKET_TRAILER_FRAME_ID_TAG && length === 4) { + metadata.frameId = readUint32Xor(bytes, offset, length); + foundAny = true; + } + + offset += length; + } + + if (!foundAny) { + return { data: bytes }; + } + + return { data: strippedData, metadata }; +} + +function matchesMagic(data: Uint8Array, offset: number) { + for (let index = 0; index < PACKET_TRAILER_MAGIC.length; index += 1) { + if (data[offset + index] !== PACKET_TRAILER_MAGIC[index]) { + return false; + } + } + return true; +} + +function readUint64Xor(data: Uint8Array, offset: number): bigint { + const hi = BigInt( + (((data[offset] ^ 0xff) << 24) | + ((data[offset + 1] ^ 0xff) << 16) | + ((data[offset + 2] ^ 0xff) << 8) | + (data[offset + 3] ^ 0xff)) >>> + 0, + ); + const lo = BigInt( + (((data[offset + 4] ^ 0xff) << 24) | + ((data[offset + 5] ^ 0xff) << 16) | + ((data[offset + 6] ^ 0xff) << 8) | + (data[offset + 7] ^ 0xff)) >>> + 0, + ); + return (hi << BigInt(32)) | lo; +} + +function readUint32Xor(data: Uint8Array, offset: number, length: number) { + let value = 0; + for (let index = 0; index < length; index += 1) { + value = (value << 8) | (data[offset + index] ^ 0xff); + } + return value >>> 0; +} + +function writeUint64Xor(target: Uint8Array, offset: number, value: bigint) { + const hi = Number((value >> BigInt(32)) & BigInt(0xffffffff)); + const lo = Number(value & BigInt(0xffffffff)); + target[offset] = (hi >>> 24) ^ 0xff; + target[offset + 1] = ((hi >>> 16) & 0xff) ^ 0xff; + target[offset + 2] = ((hi >>> 8) & 0xff) ^ 0xff; + target[offset + 3] = (hi & 0xff) ^ 0xff; + target[offset + 4] = (lo >>> 24) ^ 0xff; + target[offset + 5] = ((lo >>> 16) & 0xff) ^ 0xff; + target[offset + 6] = ((lo >>> 8) & 0xff) ^ 0xff; + target[offset + 7] = (lo & 0xff) ^ 0xff; +} + +function writeUint32Xor(target: Uint8Array, offset: number, value: number) { + for (let index = 3; index >= 0; index -= 1) { + target[offset + (3 - index)] = ((value >> (index * 8)) & 0xff) ^ 0xff; + } +} + +export function getFrameRtpTimestamp( + frame: RTCEncodedVideoFrame | RTCEncodedAudioFrame, +): number | undefined { + try { + const metadata = frame.getMetadata() as Record; + if (typeof metadata.rtpTimestamp === 'number') { + return metadata.rtpTimestamp; + } + if (typeof metadata.timestamp === 'number') { + return metadata.timestamp; + } + } catch { + // getMetadata() might not be available + } + if (typeof frame.timestamp === 'number') { + return frame.timestamp; + } + return undefined; +} + +export function getFrameSsrc(frame: RTCEncodedVideoFrame | RTCEncodedAudioFrame): number { + try { + const metadata = frame.getMetadata() as Record; + if (typeof metadata.synchronizationSource === 'number') { + return metadata.synchronizationSource; + } + } catch {} + return 0; +} + +export interface PacketTrailerFramePayload { + trackId: string; + rtpTimestamp: number; + ssrc: number; + metadata: PacketTrailerMetadata; +} + +export interface ProcessPacketTrailerResult { + data?: ArrayBuffer; + payload?: PacketTrailerFramePayload; +} + +/** + * Extracts a packet trailer from an encoded frame and returns the stripped + * frame data (if any) along with a ready-to-post metadata payload. Returns an + * empty object when no trailer is present, an RTP timestamp can't be read, or + * a trackId isn't available. + */ +export function processPacketTrailer( + frame: RTCEncodedVideoFrame | RTCEncodedAudioFrame, + trackId: string | undefined, +): ProcessPacketTrailerResult { + if (frame.data.byteLength === 0) { + return {}; + } + + const result = extractPacketTrailer(frame.data); + if (!result.metadata) { + return {}; + } + + const strippedData = (result.data.buffer as ArrayBuffer).slice( + result.data.byteOffset, + result.data.byteOffset + result.data.byteLength, + ); + + const rtpTimestamp = getFrameRtpTimestamp(frame); + if (rtpTimestamp === undefined || !trackId) { + return { data: strippedData }; + } + + return { + data: strippedData, + payload: { + trackId, + rtpTimestamp, + ssrc: getFrameSsrc(frame), + metadata: result.metadata, + }, + }; +} diff --git a/src/packetTrailer/types.ts b/src/packetTrailer/types.ts new file mode 100644 index 0000000000..1370ddf01b --- /dev/null +++ b/src/packetTrailer/types.ts @@ -0,0 +1,55 @@ +import type { PacketTrailerFramePayload } from './packetTrailer'; + +export interface PacketTrailerMetadata { + userTimestamp: bigint; + frameId: number; +} + +export interface PTBaseMessage { + kind: string; + data?: unknown; +} + +export interface PTInitMessage extends PTBaseMessage { + kind: 'init'; +} + +export interface PTInitAck extends PTBaseMessage { + kind: 'initAck'; +} + +export interface PTDecodeMessage extends PTBaseMessage { + kind: 'decode'; + data: { + readableStream: ReadableStream; + writableStream: WritableStream; + trackId: string; + hasPacketTrailer: boolean; + }; +} + +export type PTScriptTransformOptions = { + kind: 'decode'; + trackId: string; +}; + +export interface PTMetadataMessage extends PTBaseMessage { + kind: 'metadata'; + data: PacketTrailerFramePayload; +} + +export interface PTUpdateTrackIdMessage extends PTBaseMessage { + kind: 'updateTrackId'; + data: { + oldTrackId: string; + newTrackId: string; + hasPacketTrailer: boolean; + }; +} + +export type PTWorkerMessage = + | PTInitMessage + | PTInitAck + | PTDecodeMessage + | PTUpdateTrackIdMessage + | PTMetadataMessage; diff --git a/src/packetTrailer/utils.test.ts b/src/packetTrailer/utils.test.ts new file mode 100644 index 0000000000..6cde3fdf68 --- /dev/null +++ b/src/packetTrailer/utils.test.ts @@ -0,0 +1,67 @@ +import { afterEach, describe, expect, it } from 'vitest'; +import { isPacketTrailerSupported } from './utils'; + +describe('packet trailer support', () => { + const originalRTCRtpSender = window.RTCRtpSender; + const originalRTCRtpScriptTransform = (window as unknown as { RTCRtpScriptTransform?: unknown }) + .RTCRtpScriptTransform; + const originalUserAgent = navigator.userAgent; + + afterEach(() => { + Object.defineProperty(window, 'RTCRtpSender', { + configurable: true, + value: originalRTCRtpSender, + writable: true, + }); + Object.defineProperty(window, 'RTCRtpScriptTransform', { + configurable: true, + value: originalRTCRtpScriptTransform, + writable: true, + }); + Object.defineProperty(window.navigator, 'userAgent', { + configurable: true, + value: originalUserAgent, + }); + }); + + function stubScriptTransformSupport(userAgent: string) { + Object.defineProperty(window, 'RTCRtpSender', { + configurable: true, + value: undefined, + writable: true, + }); + Object.defineProperty(window, 'RTCRtpScriptTransform', { + configurable: true, + value: class MockRTCRtpScriptTransform {}, + writable: true, + }); + Object.defineProperty(window.navigator, 'userAgent', { + configurable: true, + value: userAgent, + }); + } + + it('supports packet trailers with RTCRtpScriptTransform on Safari', () => { + stubScriptTransformSupport( + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 14_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15', + ); + + expect(isPacketTrailerSupported({ worker: {} as Worker })).toBe(true); + }); + + it('supports packet trailers with RTCRtpScriptTransform on Firefox', () => { + stubScriptTransformSupport( + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 14.0; rv:144.0) Gecko/20100101 Firefox/144.0', + ); + + expect(isPacketTrailerSupported({ worker: {} as Worker })).toBe(true); + }); + + it('does not use RTCRtpScriptTransform support on Chromium-based browsers', () => { + stubScriptTransformSupport( + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 14_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36', + ); + + expect(isPacketTrailerSupported({ worker: {} as Worker })).toBe(false); + }); +}); diff --git a/src/packetTrailer/utils.ts b/src/packetTrailer/utils.ts new file mode 100644 index 0000000000..e0c184d7ec --- /dev/null +++ b/src/packetTrailer/utils.ts @@ -0,0 +1,13 @@ +import { isInsertableStreamSupported } from '../e2ee/utils'; +import { isScriptTransformSupportedForWorker } from '../room/utils'; +import type { PacketTrailerOptions } from './PacketTrailerManager'; + +export function shouldUsePacketTrailerScriptTransform() { + return isScriptTransformSupportedForWorker(); +} + +export function isPacketTrailerSupported(options?: PacketTrailerOptions) { + return ( + !!options?.worker && (isInsertableStreamSupported() || shouldUsePacketTrailerScriptTransform()) + ); +} diff --git a/src/packetTrailer/worker/packetTrailer.worker.ts b/src/packetTrailer/worker/packetTrailer.worker.ts new file mode 100644 index 0000000000..79e439561b --- /dev/null +++ b/src/packetTrailer/worker/packetTrailer.worker.ts @@ -0,0 +1,103 @@ +import { processPacketTrailer } from '../packetTrailer'; +import type { PTMetadataMessage, PTScriptTransformOptions, PTWorkerMessage } from '../types'; + +/** + * Holds the trackId currently associated with a pipeline. A mutable + * wrapper is used so the transform closure always reads the latest + * trackId after a receiver gets re-bound to a new track. + */ +interface PipelineState { + trackId: string; + hasPacketTrailer: boolean; +} + +const pipelines = new Map(); + +onmessage = (ev: MessageEvent) => { + const msg = ev.data; + + switch (msg.kind) { + case 'init': + postMessage({ kind: 'initAck' }); + break; + + case 'decode': + setupDecodeTransform( + msg.data.readableStream, + msg.data.writableStream, + msg.data.trackId, + msg.data.hasPacketTrailer, + ); + break; + + case 'updateTrackId': + updateTrackId(msg.data.oldTrackId, msg.data.newTrackId, msg.data.hasPacketTrailer); + break; + + default: + break; + } +}; + +function setupDecodeTransform( + readable: ReadableStream, + writable: WritableStream, + trackId: string, + hasPacketTrailer: boolean, +) { + const state: PipelineState = { trackId, hasPacketTrailer }; + pipelines.set(trackId, state); + + const transform = new TransformStream({ + transform( + frame: RTCEncodedVideoFrame, + controller: TransformStreamDefaultController, + ) { + try { + if (state.hasPacketTrailer) { + const result = processPacketTrailer(frame, state.trackId); + if (result.data) { + frame.data = result.data; + } + if (result.payload) { + const msg: PTMetadataMessage = { kind: 'metadata', data: result.payload }; + postMessage(msg); + } + } + } catch { + // Never drop frames on trailer-extraction failure — pass through so + // video keeps decoding even if metadata is lost for this frame. + } + controller.enqueue(frame); + }, + }); + + readable + .pipeThrough(transform) + .pipeTo(writable) + .catch(() => { + pipelines.delete(state.trackId); + }); +} + +function updateTrackId(oldTrackId: string, newTrackId: string, hasPacketTrailer: boolean) { + const state = pipelines.get(oldTrackId); + if (state) { + state.trackId = newTrackId; + state.hasPacketTrailer = hasPacketTrailer; + pipelines.delete(oldTrackId); + pipelines.set(newTrackId, state); + } +} + +// Operations using RTCRtpScriptTransform. +// @ts-ignore +if (self.RTCTransformEvent) { + // @ts-ignore + self.onrtctransform = (event: RTCTransformEvent) => { + // @ts-ignore + const transformer = event.transformer; + const { trackId } = transformer.options as PTScriptTransformOptions; + setupDecodeTransform(transformer.readable, transformer.writable, trackId, true); + }; +} diff --git a/src/packetTrailer/worker/tsconfig.json b/src/packetTrailer/worker/tsconfig.json new file mode 100644 index 0000000000..5e10a3dfc2 --- /dev/null +++ b/src/packetTrailer/worker/tsconfig.json @@ -0,0 +1,14 @@ +{ + "extends": "../../../tsconfig.json", + "compilerOptions": { + "lib": [ + "DOM", + "DOM.Iterable", + "ES2017", + "ES2018.Promise", + "WebWorker", + "ES2021.WeakRef", + "DOM.AsyncIterable" + ] + } +} diff --git a/src/room/BackOffStrategy.test.ts b/src/room/BackOffStrategy.test.ts index 270face22d..9bd9fab454 100644 --- a/src/room/BackOffStrategy.test.ts +++ b/src/room/BackOffStrategy.test.ts @@ -6,7 +6,7 @@ vi.mock('./utils', async () => { const actual = await vi.importActual('./utils'); return { ...actual, - // eslint-disable-next-line @typescript-eslint/no-unused-vars + sleep: vi.fn((ms: number) => Promise.resolve()), extractProjectFromUrl: vi.fn((url: URL) => { // @ts-ignore diff --git a/src/room/RTCEngine.test.ts b/src/room/RTCEngine.test.ts new file mode 100644 index 0000000000..616cf9db8b --- /dev/null +++ b/src/room/RTCEngine.test.ts @@ -0,0 +1,150 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; +import RTCEngine from './RTCEngine'; +import { roomOptionDefaults } from './defaults'; + +describe('RTCEngine', () => { + const originalRTCRtpSender = window.RTCRtpSender; + const originalRTCRtpScriptTransform = (window as unknown as { RTCRtpScriptTransform?: unknown }) + .RTCRtpScriptTransform; + const originalUserAgent = navigator.userAgent; + + afterEach(() => { + Object.defineProperty(window, 'RTCRtpSender', { + configurable: true, + value: originalRTCRtpSender, + writable: true, + }); + Object.defineProperty(window, 'RTCRtpScriptTransform', { + configurable: true, + value: originalRTCRtpScriptTransform, + writable: true, + }); + Object.defineProperty(window.navigator, 'userAgent', { + configurable: true, + value: originalUserAgent, + }); + }); + + function stubInsertableStreamsSupport() { + class MockRTCRtpSender { + createEncodedStreams() {} + } + + Object.defineProperty(window, 'RTCRtpSender', { + configurable: true, + value: MockRTCRtpSender, + writable: true, + }); + } + + function stubScriptTransformSupport() { + Object.defineProperty(window, 'RTCRtpScriptTransform', { + configurable: true, + value: class MockRTCRtpScriptTransform {}, + writable: true, + }); + Object.defineProperty(window.navigator, 'userAgent', { + configurable: true, + value: + 'Mozilla/5.0 (Macintosh; Intel Mac OS X 14_0) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15', + }); + } + + function makeRTCConfiguration(engine: RTCEngine) { + return ( + engine as unknown as { makeRTCConfiguration: () => RTCConfiguration } + ).makeRTCConfiguration(); + } + + function setupSenderPassthrough(engine: RTCEngine, sender: RTCRtpSender) { + ( + engine as unknown as { setupSenderPassthrough: (sender: RTCRtpSender) => void } + ).setupSenderPassthrough(sender); + } + + it('does not enable encoded insertable streams without E2EE or a packet trailer worker', () => { + stubInsertableStreamsSupport(); + + const engine = new RTCEngine(roomOptionDefaults); + + expect(makeRTCConfiguration(engine).encodedInsertableStreams).toBeUndefined(); + }); + + it('enables encoded insertable streams when a packet trailer worker is configured', () => { + stubInsertableStreamsSupport(); + + const engine = new RTCEngine({ + ...roomOptionDefaults, + packetTrailer: { worker: {} as Worker }, + }); + + expect(makeRTCConfiguration(engine).encodedInsertableStreams).toBe(true); + }); + + it('does not enable encoded insertable streams for packet trailers when script transforms are supported', () => { + stubInsertableStreamsSupport(); + stubScriptTransformSupport(); + + const engine = new RTCEngine({ + ...roomOptionDefaults, + packetTrailer: { worker: {} as Worker }, + }); + + expect(makeRTCConfiguration(engine).encodedInsertableStreams).toBeUndefined(); + }); + + it('enables encoded insertable streams for E2EE', () => { + stubInsertableStreamsSupport(); + + const engine = new RTCEngine(roomOptionDefaults); + ( + engine as unknown as { + signalOpts: { + autoSubscribe: boolean; + maxRetries: number; + e2eeEnabled: boolean; + websocketTimeout: number; + }; + } + ).signalOpts = { + autoSubscribe: true, + maxRetries: 1, + e2eeEnabled: true, + websocketTimeout: 15_000, + }; + + expect(makeRTCConfiguration(engine).encodedInsertableStreams).toBe(true); + }); + + it('does not create sender encoded streams when packetTrailer has no worker', () => { + const engine = new RTCEngine({ + ...roomOptionDefaults, + packetTrailer: {} as never, + }); + const createEncodedStreams = vi.fn(); + const sender = { + createEncodedStreams, + } as unknown as RTCRtpSender; + + setupSenderPassthrough(engine, sender); + + expect(createEncodedStreams).not.toHaveBeenCalled(); + }); + + it('does not create sender passthrough streams for packet trailers when script transforms are supported', () => { + stubScriptTransformSupport(); + + const engine = new RTCEngine({ + ...roomOptionDefaults, + packetTrailer: { worker: {} as Worker }, + }); + const createEncodedStreams = vi.fn(); + const sender = { + createEncodedStreams, + } as unknown as RTCRtpSender; + + setupSenderPassthrough(engine, sender); + + expect(createEncodedStreams).not.toHaveBeenCalled(); + }); +}); diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 992f02a9c5..f97fedde20 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -54,9 +54,10 @@ import { toProtoSessionDescription, } from '../api/SignalClient'; import type { BaseE2EEManager } from '../e2ee/E2eeManager'; -import { asEncryptablePacket } from '../e2ee/utils'; +import { asEncryptablePacket, isInsertableStreamSupported } from '../e2ee/utils'; import log, { LoggerNames, getLogger } from '../logger'; import type { InternalRoomOptions } from '../options'; +import { shouldUsePacketTrailerScriptTransform } from '../packetTrailer/utils'; import TypedPromise from '../utils/TypedPromise'; import { DataPacketBuffer } from '../utils/dataPacketBuffer'; import { TTLMap } from '../utils/ttlmap'; @@ -771,8 +772,15 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit ): RTCConfiguration { const rtcConfig = { ...this.rtcConfig }; - if (this.signalOpts?.e2eeEnabled) { - this.log.debug('E2EE - setting up transports with insertable streams', this.logContext); + // E2EE and packet trailer extraction both rely on encoded frame transforms. + // Only opt into the createEncodedStreams flavor when that path will be + // used; RTCRtpScriptTransform does not need the PeerConnection flag. + const needsInsertableStreams = + this.signalOpts?.e2eeEnabled || + (this.options.packetTrailer?.worker && !shouldUsePacketTrailerScriptTransform()); + + if (needsInsertableStreams && isInsertableStreamSupported()) { + this.log.debug('setting up transports with insertable streams', this.logContext); // this makes sure that no data is sent before the transforms are ready // @ts-ignore rtcConfig.encodedInsertableStreams = true; @@ -1016,16 +1024,17 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit opts: TrackPublishOptions, encodings?: RTCRtpEncodingParameters[], ) { + let sender: RTCRtpSender; if (supportsTransceiver()) { - const sender = await this.createTransceiverRTCRtpSender(track, opts, encodings); - return sender; - } - if (supportsAddTrack()) { + sender = await this.createTransceiverRTCRtpSender(track, opts, encodings); + } else if (supportsAddTrack()) { this.log.warn('using add-track fallback', this.logContext); - const sender = await this.createRTCRtpSender(track.mediaStreamTrack); - return sender; + sender = await this.createRTCRtpSender(track.mediaStreamTrack); + } else { + throw new UnexpectedConnectionState('Required webRTC APIs not supported on this device'); } - throw new UnexpectedConnectionState('Required webRTC APIs not supported on this device'); + this.setupSenderPassthrough(sender); + return sender; } async createSimulcastSender( @@ -1034,16 +1043,34 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit opts: TrackPublishOptions, encodings?: RTCRtpEncodingParameters[], ) { - // store RTCRtpSender + let sender: RTCRtpSender | undefined; if (supportsTransceiver()) { - return this.createSimulcastTransceiverSender(track, simulcastTrack, opts, encodings); - } - if (supportsAddTrack()) { + sender = await this.createSimulcastTransceiverSender(track, simulcastTrack, opts, encodings); + } else if (supportsAddTrack()) { this.log.debug('using add-track fallback', this.logContext); - return this.createRTCRtpSender(track.mediaStreamTrack); + sender = await this.createRTCRtpSender(track.mediaStreamTrack); + } else { + throw new UnexpectedConnectionState('Cannot stream on this device'); + } + if (sender) { + this.setupSenderPassthrough(sender); } + return sender; + } - throw new UnexpectedConnectionState('Cannot stream on this device'); + private setupSenderPassthrough(sender: RTCRtpSender) { + if ( + !this.options.packetTrailer?.worker || + this.signalOpts?.e2eeEnabled || + shouldUsePacketTrailerScriptTransform() + ) { + return; + } + if ('createEncodedStreams' in sender) { + // @ts-ignore + const { readable, writable } = sender.createEncodedStreams(); + readable.pipeTo(writable); + } } private async createTransceiverRTCRtpSender( diff --git a/src/room/Room.ts b/src/room/Room.ts index cf782f5a49..6db435dd75 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -1,6 +1,7 @@ import { Mutex } from '@livekit/mutex'; import { ChatMessage as ChatMessageModel, + ClientInfo_Capability, ConnectionQualityUpdate, type DataPacket, DataPacket_Kind, @@ -43,6 +44,8 @@ import type { RoomConnectOptions, RoomOptions, } from '../options'; +import { PacketTrailerManager } from '../packetTrailer/PacketTrailerManager'; +import { isPacketTrailerSupported } from '../packetTrailer/utils'; import TypedPromise from '../utils/TypedPromise'; import { getBrowser } from '../utils/browserParser'; import { BackOffStrategy } from './BackOffStrategy'; @@ -187,6 +190,8 @@ class Room extends (EventEmitter as new () => TypedEmitter) private e2eeManager: BaseE2EEManager | undefined; + private packetTrailerManager: PacketTrailerManager | undefined; + private e2eeStateMutex: Mutex = new Mutex(); private connectionReconcileInterval?: ReturnType; @@ -305,6 +310,8 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.outgoingDataTrackManager, ); + this.setupPacketTrailer(); + if (this.options.e2ee || this.options.encryption) { this.setupE2EE(); } @@ -463,6 +470,13 @@ class Room extends (EventEmitter as new () => TypedEmitter) } } + private setupPacketTrailer() { + // The manager is always created so tracks that advertise packet trailer + // features can be wired up when the app passes a packet trailer worker. + this.packetTrailerManager = new PacketTrailerManager(this.options.packetTrailer); + this.packetTrailerManager.setup(this); + } + private get logContext() { return { room: this.name, @@ -915,6 +929,9 @@ class Room extends (EventEmitter as new () => TypedEmitter) autoSubscribe: connectOptions.autoSubscribe, adaptiveStream: typeof roomOptions.adaptiveStream === 'object' ? true : roomOptions.adaptiveStream, + clientInfoCapabilities: isPacketTrailerSupported(roomOptions.packetTrailer) + ? [ClientInfo_Capability.CAP_PACKET_TRAILER] + : undefined, maxRetries: connectOptions.maxRetries, e2eeEnabled: !!this.e2eeManager, websocketTimeout: connectOptions.websocketTimeout, diff --git a/src/room/track/PacketTrailerExtractor.ts b/src/room/track/PacketTrailerExtractor.ts new file mode 100644 index 0000000000..fbd55d48ae --- /dev/null +++ b/src/room/track/PacketTrailerExtractor.ts @@ -0,0 +1,43 @@ +import type { PacketTrailerMetadata } from '../../packetTrailer/types'; + +const MAX_ENTRIES = 300; + +/** + * Caches packet trailer metadata extracted from received video frames, + * keyed by RTP timestamp so it can be looked up when the frame is displayed. + * + * Metadata is populated either by the packet trailer worker managed by + * `PacketTrailerManager` (non-E2EE) or by the E2EE FrameCryptor worker + * after decryption (E2EE). + * + * @experimental + */ +export class PacketTrailerExtractor { + private metadataMap = new Map(); + + private activeSsrc: number = 0; + + storeMetadata(rtpTimestamp: number, ssrc: number, metadata: PacketTrailerMetadata) { + // Simulcast layer switch: SSRC changed, flush stale entries from old layer. + if (this.activeSsrc !== 0 && this.activeSsrc !== ssrc) { + this.metadataMap.clear(); + } + this.activeSsrc = ssrc; + + while (this.metadataMap.size >= MAX_ENTRIES) { + const evicted = this.metadataMap.keys().next().value!; + this.metadataMap.delete(evicted); + } + + this.metadataMap.set(rtpTimestamp, metadata); + } + + lookupMetadata(rtpTimestamp: number): PacketTrailerMetadata | undefined { + return this.metadataMap.get(rtpTimestamp); + } + + dispose() { + this.metadataMap.clear(); + this.activeSsrc = 0; + } +} diff --git a/src/room/track/RemoteVideoTrack.ts b/src/room/track/RemoteVideoTrack.ts index d7d1c9436f..b7bed74ac8 100644 --- a/src/room/track/RemoteVideoTrack.ts +++ b/src/room/track/RemoteVideoTrack.ts @@ -1,3 +1,4 @@ +import type { PacketTrailerMetadata } from '../../packetTrailer/types'; import { debounce } from '../debounce'; import { TrackEvent } from '../events'; import type { VideoReceiverStats } from '../stats'; @@ -6,6 +7,7 @@ import CriticalTimers from '../timers'; import type { LoggerOptions } from '../types'; import type { ObservableMediaElement } from '../utils'; import { getDevicePixelRatio, getIntersectionObserver, getResizeObserver, isWeb } from '../utils'; +import type { PacketTrailerExtractor } from './PacketTrailerExtractor'; import RemoteTrack from './RemoteTrack'; import { Track, attachToElement, detachTrack } from './Track'; import type { AdaptiveStreamSettings } from './types'; @@ -23,6 +25,9 @@ export default class RemoteVideoTrack extends RemoteTrack { private lastDimensions?: Track.Dimensions; + /** @internal */ + packetTrailerExtractor?: PacketTrailerExtractor; + constructor( mediaTrack: MediaStreamTrack, sid: string, @@ -38,6 +43,23 @@ export default class RemoteVideoTrack extends RemoteTrack { return this.adaptiveStreamSettings !== undefined; } + /** + * Look up frame-level metadata for a given RTP timestamp. + * Use with the `TrackEvent.TimeSyncUpdate` event to correlate displayed frames + * with their capture-time metadata. + * + * Requires the room to be configured with the `packetTrailer` worker option + * and the publishing track to have packet trailer features enabled. + * + */ + lookupFrameMetadata({ + rtpTimestamp, + }: { + rtpTimestamp: number; + }): PacketTrailerMetadata | undefined { + return this.packetTrailerExtractor?.lookupMetadata(rtpTimestamp); + } + override setStreamState(value: Track.StreamState) { super.setStreamState(value); this.log.debug('setStreamState', value); diff --git a/src/room/track/utils.ts b/src/room/track/utils.ts index e4974e76eb..19e99354e4 100644 --- a/src/room/track/utils.ts +++ b/src/room/track/utils.ts @@ -280,7 +280,10 @@ export function getLogContextFromTrack(track: Track | TrackPublication): Record< } export function supportsSynchronizationSources(): boolean { - return typeof RTCRtpReceiver !== 'undefined' && 'getSynchronizationSources' in RTCRtpReceiver; + return ( + typeof RTCRtpReceiver !== 'undefined' && + typeof RTCRtpReceiver.prototype.getSynchronizationSources === 'function' + ); } export function diffAttributes( diff --git a/src/room/utils.test.ts b/src/room/utils.test.ts index 7d404c2170..c7d099b3b3 100644 --- a/src/room/utils.test.ts +++ b/src/room/utils.test.ts @@ -1,5 +1,6 @@ +import { ClientInfo_Capability } from '@livekit/protocol'; import { describe, expect, it } from 'vitest'; -import { extractMaxAgeFromRequestHeaders, splitUtf8, toWebsocketUrl } from './utils'; +import { extractMaxAgeFromRequestHeaders, getClientInfo, splitUtf8, toWebsocketUrl } from './utils'; describe('toWebsocketUrl', () => { it('leaves wss urls alone', () => { @@ -15,6 +16,18 @@ describe('toWebsocketUrl', () => { }); }); +describe('getClientInfo', () => { + it('does not advertise packet trailer capability by default', () => { + expect(getClientInfo().capabilities).toEqual([]); + }); + + it('advertises packet trailer capability when provided', () => { + expect(getClientInfo([ClientInfo_Capability.CAP_PACKET_TRAILER]).capabilities).toEqual([ + ClientInfo_Capability.CAP_PACKET_TRAILER, + ]); + }); +}); + describe('splitUtf8', () => { it('splits a string into chunks of the given size', () => { expect(splitUtf8('hello world', 5)).toEqual([ diff --git a/src/room/utils.ts b/src/room/utils.ts index 49df9b6fca..45f4a210b0 100644 --- a/src/room/utils.ts +++ b/src/room/utils.ts @@ -1,6 +1,7 @@ import { ChatMessage as ChatMessageModel, ClientInfo, + ClientInfo_Capability, ClientInfo_SDK, DisconnectReason, Transcription as TranscriptionModel, @@ -179,6 +180,18 @@ export function isChromiumBased(): boolean { return !!browser && browser.name === 'Chrome' && browser.os !== 'iOS'; } +export function isScriptTransformSupportedForWorker(): boolean { + // Chrome occasionally throws an `InvalidState` error when using script transforms directly after introducing this API in 141. + // Disabling it for Chrome based browsers until the API has stabilized. + // @ts-ignore + return ( + typeof window !== 'undefined' && + // @ts-ignore + typeof window.RTCRtpScriptTransform !== 'undefined' && + !isChromiumBased() + ); +} + export function isSafari(): boolean { return getBrowser()?.name === 'Safari'; } @@ -364,8 +377,9 @@ export interface ObservableMediaElement extends HTMLMediaElement { handleVisibilityChanged: (entry: IntersectionObserverEntry) => void; } -export function getClientInfo(): ClientInfo { +export function getClientInfo(capabilities?: ClientInfo_Capability[]): ClientInfo { const info = new ClientInfo({ + capabilities, sdk: ClientInfo_SDK.JS, protocol: protocolVersion, version, diff --git a/src/test/MockMediaStreamTrack.ts b/src/test/MockMediaStreamTrack.ts index 593143d3ec..3ccbb809fb 100644 --- a/src/test/MockMediaStreamTrack.ts +++ b/src/test/MockMediaStreamTrack.ts @@ -1,4 +1,3 @@ -/* eslint-disable @typescript-eslint/no-unused-vars */ // @ts-ignore export default class MockMediaStreamTrack implements MediaStreamTrack { contentHint: string = ''; diff --git a/src/version.ts b/src/version.ts index e0d96ee71b..87eaf89839 100644 --- a/src/version.ts +++ b/src/version.ts @@ -1,4 +1,4 @@ import { version as v } from '../package.json'; export const version = v; -export const protocolVersion = 16; +export const protocolVersion = 17;