diff --git a/.changeset/afraid-cooks-visit.md b/.changeset/afraid-cooks-visit.md new file mode 100644 index 0000000000..0cb8e1897b --- /dev/null +++ b/.changeset/afraid-cooks-visit.md @@ -0,0 +1,5 @@ +--- +'livekit-client': patch +--- + +Add local data track flush method diff --git a/src/room/Room.ts b/src/room/Room.ts index cf782f5a49..7e339173d9 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -290,8 +290,10 @@ class Room extends (EventEmitter as new () => TypedEmitter) .on('trackUnpublished', (event) => { this.emit(RoomEvent.LocalDataTrackUnpublished, event.sid); }) - .on('packetAvailable', ({ bytes }) => { - this.engine.sendLossyBytes(bytes, DataChannelKind.DATA_TRACK_LOSSY, 'wait'); + .on('packetAvailable', ({ handle, bytes }) => { + this.engine + .sendLossyBytes(bytes, DataChannelKind.DATA_TRACK_LOSSY, 'wait') + .finally(() => this.outgoingDataTrackManager.handlePacketSendComplete(handle)); }); this.disconnectLock = new Mutex(); @@ -1749,6 +1751,8 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.bufferedEvents = []; this.transcriptionReceivedTimes.clear(); this.incomingDataStreamManager.clearControllers(); + this.incomingDataTrackManager.reset(); + this.outgoingDataTrackManager.reset(); if (this.state === ConnectionState.Disconnected) { return; } diff --git a/src/room/data-track/LocalDataTrack.ts b/src/room/data-track/LocalDataTrack.ts index 43d82f99c8..e408660028 100644 --- a/src/room/data-track/LocalDataTrack.ts +++ b/src/room/data-track/LocalDataTrack.ts @@ -1,4 +1,5 @@ import log, { LoggerNames, type StructuredLogger, getLogger } from '../../logger'; +import { Future } from '../utils'; import { type DataTrackFrame, DataTrackFrameInternal } from './frame'; import type { DataTrackHandle } from './handle'; import type OutgoingDataTrackManager from './outgoing/OutgoingDataTrackManager'; @@ -28,14 +29,35 @@ export default class LocalDataTrack implements ILocalTrack, IDataTrack { protected log: StructuredLogger = log; + /** Resolves once the data track has sent all pending packets the rtc data channel buffer. */ + protected flushedFuture = new Future(); + /** @internal */ constructor(options: DataTrackOptions, manager: OutgoingDataTrackManager) { this.options = options; this.manager = manager; this.log = getLogger(LoggerNames.DataTracks); + + this.manager.on('packetsFlushed', this.handleManagerPacketsFlushed); + this.manager.on('reset', this.handleManagerReset); } + private handleManagerReset = () => { + // When the associated manager resets, mark any in flight flushes as complete + // There's nothing actionable a user can do to get these to complete so no + // error is being thrown. + this.handleManagerPacketsFlushed(); + + this.manager.off('packetsFlushed', this.handleManagerReset); + this.manager.off('reset', this.handleManagerReset); + }; + + private handleManagerPacketsFlushed = () => { + this.flushedFuture.resolve?.(); + this.flushedFuture = new Future(); + }; + /** @internal */ static withExplicitHandle( options: DataTrackOptions, @@ -104,6 +126,35 @@ export default class LocalDataTrack implements ILocalTrack, IDataTrack { } } + /** + * When called, waits for all in flight packets to be sent before resolving. + * + * Use this to: + * + * 1. Send frames exactly in order: + * ```ts + * await track.tryPush(/* ... *\/); + * await track.flush(); + * await track.tryPush(/* ... *\/); + * await track.flush(); + * // ... etc ... + * ``` + * + * 2. Wait for frames to all be delivered before unpublishing a local data track: + * + * ```ts + * await track.tryPush(/* ... *\/); + * await track.tryPush(/* ... *\/); + * await track.tryPush(/* ... *\/); + * // ... etc ... + * await track.flush(); + * await track.unpublish(); + * ``` + **/ + async flush(): Promise { + return this.flushedFuture.promise; + } + /** * Unpublish the track from the SFU. Once this is called, any further calls to {@link tryPush} * will fail. diff --git a/src/room/data-track/handle.ts b/src/room/data-track/handle.ts index 81f2d94d2d..a6c8e2f056 100644 --- a/src/room/data-track/handle.ts +++ b/src/room/data-track/handle.ts @@ -66,4 +66,8 @@ export class DataTrackHandleAllocator { } return this.value; } + + reset() { + this.value = 0; + } } diff --git a/src/room/data-track/incoming/IncomingDataTrackManager.test.ts b/src/room/data-track/incoming/IncomingDataTrackManager.test.ts index 6f24e2dbd4..02ed0098ed 100644 --- a/src/room/data-track/incoming/IncomingDataTrackManager.test.ts +++ b/src/room/data-track/incoming/IncomingDataTrackManager.test.ts @@ -327,7 +327,7 @@ describe('DataTrackIncomingManager', () => { expect(sfuUpdateSubscriptionEvent.subscribe).toStrictEqual(false); // 7. Make sure shutting down the manager doesn't throw errors - manager.shutdown(); + manager.reset(); }); it('should NOT terminate the sfu subscription if the abortsignal is triggered on one of two active subscriptions', async () => { @@ -675,7 +675,7 @@ describe('DataTrackIncomingManager', () => { ); // 4. Shutdown the manager, and make sure it doesn't throw - manager.shutdown(); + manager.reset(); // 5. Make sure the trackUnpublished event fires for the descriptor const trackUnpublishedEvent = await managerEvents.waitFor('trackUnpublished'); diff --git a/src/room/data-track/incoming/IncomingDataTrackManager.ts b/src/room/data-track/incoming/IncomingDataTrackManager.ts index 63501db88e..bcaf93dd81 100644 --- a/src/room/data-track/incoming/IncomingDataTrackManager.ts +++ b/src/room/data-track/incoming/IncomingDataTrackManager.ts @@ -92,7 +92,7 @@ export default class IncomingDataTrackManager extends (EventEmitter as new () => * * This is an index that allows track descriptors to be looked up * by subscriber handle in O(1) time, to make routing incoming packets - * a (hot code path) faster. + * (a hot code path) faster. */ private subscriptionHandles = new Map(); @@ -626,8 +626,9 @@ export default class IncomingDataTrackManager extends (EventEmitter as new () => } } - /** Shutdown the manager, ending any subscriptions. */ - shutdown() { + /** Resets the manager, ending any subscriptions, and getting it ready for the next room + * connection. */ + reset() { for (const descriptor of this.descriptors.values()) { this.emit('trackUnpublished', { sid: descriptor.info.sid, @@ -643,5 +644,6 @@ export default class IncomingDataTrackManager extends (EventEmitter as new () => } } this.descriptors.clear(); + this.subscriptionHandles.clear(); } } diff --git a/src/room/data-track/outgoing/OutgoingDataTrackManager.test.ts b/src/room/data-track/outgoing/OutgoingDataTrackManager.test.ts index 4ef1bb8b08..6992907afc 100644 --- a/src/room/data-track/outgoing/OutgoingDataTrackManager.test.ts +++ b/src/room/data-track/outgoing/OutgoingDataTrackManager.test.ts @@ -586,7 +586,7 @@ describe('DataTrackOutgoingManager', () => { ]); // Shut down the manager - const shutdownPromise = manager.shutdown(); + const shutdownPromise = manager.reset(); // The pending data track should be cancelled await expect(pendingDescriptor.completionFuture.promise).rejects.toThrowError( @@ -602,4 +602,238 @@ describe('DataTrackOutgoingManager', () => { await shutdownPromise; }); + + it('should resolve flush() after a single tryPush once the packet is acknowledged', async () => { + const pubHandle = 5; + const manager = OutgoingDataTrackManager.withDescriptors( + new Map([ + [ + DataTrackHandle.fromNumber(pubHandle), + Descriptor.active( + { + sid: 'bogus-sid', + pubHandle, + name: 'test', + usesE2ee: false, + }, + null, + ), + ], + ]), + ); + const managerEvents = subscribeToEvents(manager, [ + 'packetAvailable', + 'packetsFlushed', + ]); + const localDataTrack = LocalDataTrack.withExplicitHandle( + { name: 'track name' }, + manager, + pubHandle, + ); + + // 1. Push a single-packet payload + await localDataTrack.tryPush({ payload: new Uint8Array([0x01, 0x02, 0x03, 0x04, 0x05]) }); + + // 2. The packet should have been emitted to be sent over the data channel + const packetEvent = await managerEvents.waitFor('packetAvailable'); + expect(packetEvent.handle).toStrictEqual(pubHandle); + + // 3. Calling flush() right after tryPush() should not resolve until the packet + // is acknowledged via handlePacketSendComplete + let flushed = false; + const flushPromise = localDataTrack.flush().then(() => { + flushed = true; + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(flushed).toStrictEqual(false); + expect(managerEvents.areThereBufferedEvents('packetsFlushed')).toBe(false); + + // 4. Acknowledge that the packet has been sent over the data channel + manager.handlePacketSendComplete(DataTrackHandle.fromNumber(pubHandle)); + + // 5. The packetsFlushed event fires once the in-flight packet counter reaches 0 + const flushedEvent = await managerEvents.waitFor('packetsFlushed'); + expect(flushedEvent.handle).toStrictEqual(pubHandle); + + // 6. The flush() promise resolves + await flushPromise; + expect(flushed).toStrictEqual(true); + }); + + it('should resolve flush() only after all packets in a multi-packet payload are acknowledged', async () => { + const pubHandle = 5; + const manager = OutgoingDataTrackManager.withDescriptors( + new Map([ + [ + DataTrackHandle.fromNumber(pubHandle), + Descriptor.active( + { + sid: 'bogus-sid', + pubHandle, + name: 'test', + usesE2ee: false, + }, + null, + ), + ], + ]), + ); + const managerEvents = subscribeToEvents(manager, [ + 'packetAvailable', + 'packetsFlushed', + ]); + const localDataTrack = LocalDataTrack.withExplicitHandle( + { name: 'track name' }, + manager, + pubHandle, + ); + + // 1. Push a payload large enough to span multiple packets (24k > single packet mtu) + await localDataTrack.tryPush({ payload: new Uint8Array(24_000).fill(0xbe) }); + + // 2. Two packetAvailable events should be emitted for this payload + await managerEvents.waitFor('packetAvailable'); + await managerEvents.waitFor('packetAvailable'); + + // 3. Call flush() before any of the packets have been acknowledged + let flushed = false; + const flushPromise = localDataTrack.flush().then(() => { + flushed = true; + }); + + // 4. Acknowledge the first packet -- flush should not resolve yet, in-flight counter still > 0 + manager.handlePacketSendComplete(DataTrackHandle.fromNumber(pubHandle)); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(flushed).toStrictEqual(false); + expect(managerEvents.areThereBufferedEvents('packetsFlushed')).toBe(false); + + // 5. Acknowledge the second packet -- flush resolves once the counter reaches 0 + manager.handlePacketSendComplete(DataTrackHandle.fromNumber(pubHandle)); + + const flushedEvent = await managerEvents.waitFor('packetsFlushed'); + expect(flushedEvent.handle).toStrictEqual(pubHandle); + + await flushPromise; + expect(flushed).toStrictEqual(true); + }); + + it('should resolve any pending flush() calls when the manager is reset', async () => { + const pubHandle = 5; + const manager = OutgoingDataTrackManager.withDescriptors( + new Map([ + [ + DataTrackHandle.fromNumber(pubHandle), + Descriptor.active( + { + sid: 'bogus-sid', + pubHandle, + name: 'test', + usesE2ee: false, + }, + null, + ), + ], + ]), + ); + const managerEvents = subscribeToEvents(manager, [ + 'packetAvailable', + 'packetsFlushed', + 'reset', + ]); + const localDataTrack = LocalDataTrack.withExplicitHandle( + { name: 'track name' }, + manager, + pubHandle, + ); + + // 1. Push a single-packet payload + await localDataTrack.tryPush({ payload: new Uint8Array([0x01, 0x02, 0x03, 0x04, 0x05]) }); + await managerEvents.waitFor('packetAvailable'); + + // 2. Call flush() before the in-flight packet is acknowledged -- it should remain + // pending because the in-flight counter is still > 0 + let flushed = false; + const flushPromise = localDataTrack.flush().then(() => { + flushed = true; + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(flushed).toStrictEqual(false); + expect(managerEvents.areThereBufferedEvents('packetsFlushed')).toBe(false); + + // 3. Reset the manager. This simulates a RTCEngine disconnect and should resolve + // the pending flush() even though the packet was never acknowledged. + await manager.reset(); + await managerEvents.waitFor('reset'); + + // 4. The flush() promise resolves + await flushPromise; + expect(flushed).toStrictEqual(true); + + // 5. No packetsFlushed event was emitted -- reset short-circuits the flush directly + // on the LocalDataTrack rather than going through the in-flight counter. + expect(managerEvents.areThereBufferedEvents('packetsFlushed')).toBe(false); + }); + + it('should resolve flush() at the end of a batch of tryPush calls once all packets are acknowledged', async () => { + const pubHandle = 5; + const manager = OutgoingDataTrackManager.withDescriptors( + new Map([ + [ + DataTrackHandle.fromNumber(pubHandle), + Descriptor.active( + { + sid: 'bogus-sid', + pubHandle, + name: 'test', + usesE2ee: false, + }, + null, + ), + ], + ]), + ); + const managerEvents = subscribeToEvents(manager, [ + 'packetAvailable', + 'packetsFlushed', + ]); + const localDataTrack = LocalDataTrack.withExplicitHandle( + { name: 'track name' }, + manager, + pubHandle, + ); + + // 1. Run a batch of tryPush calls + await localDataTrack.tryPush({ payload: new Uint8Array([0x01]) }); + await localDataTrack.tryPush({ payload: new Uint8Array([0x02]) }); + await localDataTrack.tryPush({ payload: new Uint8Array([0x03]) }); + + // 2. Three packetAvailable events should be emitted, one per pushed frame + await managerEvents.waitFor('packetAvailable'); + await managerEvents.waitFor('packetAvailable'); + await managerEvents.waitFor('packetAvailable'); + + // 3. After the batch is enqueued, call flush() to wait for the SFU to drain them + let flushed = false; + const flushPromise = localDataTrack.flush().then(() => { + flushed = true; + }); + + // 4. Acknowledge two of the three packets -- flush should not resolve yet + manager.handlePacketSendComplete(DataTrackHandle.fromNumber(pubHandle)); + manager.handlePacketSendComplete(DataTrackHandle.fromNumber(pubHandle)); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(flushed).toStrictEqual(false); + expect(managerEvents.areThereBufferedEvents('packetsFlushed')).toBe(false); + + // 5. Acknowledge the last packet -- flush resolves once the counter reaches 0 + manager.handlePacketSendComplete(DataTrackHandle.fromNumber(pubHandle)); + + const flushedEvent = await managerEvents.waitFor('packetsFlushed'); + expect(flushedEvent.handle).toStrictEqual(pubHandle); + + await flushPromise; + expect(flushed).toStrictEqual(true); + }); }); diff --git a/src/room/data-track/outgoing/OutgoingDataTrackManager.ts b/src/room/data-track/outgoing/OutgoingDataTrackManager.ts index 6ecc2c7b7f..3b40f1a0d6 100644 --- a/src/room/data-track/outgoing/OutgoingDataTrackManager.ts +++ b/src/room/data-track/outgoing/OutgoingDataTrackManager.ts @@ -18,6 +18,7 @@ import DataTrackOutgoingPipeline from './pipeline'; import { type DataTrackOptions, type EventPacketAvailable, + type EventPacketsFlushed, type EventSfuPublishRequest, type EventSfuUnpublishRequest, type EventTrackPublished, @@ -74,6 +75,11 @@ export type DataTrackOutgoingManagerCallbacks = { trackPublished: (event: EventTrackPublished) => void; /** A {@link LocalDataTrack} has been unpublished */ trackUnpublished: (event: EventTrackUnpublished) => void; + /** A {@link LocalDataTrack} has had all of its in flight packets sent via the rtc data channel. */ + packetsFlushed: (event: EventPacketsFlushed) => void; + /** The manager has been reset and all state has been cleared in preparation for the next room + * connection. */ + reset: () => void; }; type OutgoingDataTrackManagerOptions = { @@ -95,6 +101,11 @@ export default class OutgoingDataTrackManager extends (EventEmitter as new () => private descriptors = new Map(); + /** Number of packets for each data track which have been emitted via the `packetAvailable` event + * and which have not yet been sent via the rtc data channel yet. Once this goes to 0, then + * all in flight packets have been delivered, and the data tracks is "flushed". */ + private inFlightPacketCounter = new Map(); + constructor(options?: OutgoingDataTrackManagerOptions) { super(); this.e2eeManager = options?.e2eeManager ?? null; @@ -154,7 +165,9 @@ export default class OutgoingDataTrackManager extends (EventEmitter as new () => try { for await (const packet of descriptor.pipeline.processFrame(frame)) { - this.emit('packetAvailable', { bytes: packet.toBinary() }); + const prev = this.inFlightPacketCounter.get(handle) ?? 0; + this.inFlightPacketCounter.set(handle, prev + 1); + this.emit('packetAvailable', { handle, bytes: packet.toBinary() }); } } catch (err) { // NOTE: In the rust implementation this "dropped" error means something different (not enough room @@ -163,6 +176,27 @@ export default class OutgoingDataTrackManager extends (EventEmitter as new () => } } + /** The client has sent a packet over the rtc data channel. This signal is used for determining + * once all packets are sent and a data track has been "flushed". + * + * @internal */ + handlePacketSendComplete(handle: DataTrackHandle) { + const prev = this.inFlightPacketCounter.get(handle) ?? 0; + let counter = prev - 1; + + if (counter < 0) { + log.warn( + `OutgoingDataTrackManager.handlePacketSendComplete: inFlightPacketCounter was decremented below 0 (got ${this.inFlightPacketCounter} - resetting to 0. Were more packets send than were emitted?`, + ); + counter = 0; + } + this.inFlightPacketCounter.set(handle, counter); + + if (counter === 0) { + this.emit('packetsFlushed', { handle }); + } + } + /** * Client requested to publish a track. * @@ -266,6 +300,7 @@ export default class OutgoingDataTrackManager extends (EventEmitter as new () => await descriptor.unpublishingFuture.promise; + this.inFlightPacketCounter.delete(handle); this.emit('trackUnpublished', { sid: descriptor.info.sid }); } @@ -360,10 +395,13 @@ export default class OutgoingDataTrackManager extends (EventEmitter as new () => } /** - * Shuts down the manager and all associated tracks. + * Reset's the state of the manager and all associated tracks. Run on room disconnect to get + * the manager ready for the next room connection. * @internal **/ - async shutdown() { + async reset() { + this.handleAllocator.reset(); + for (const descriptor of this.descriptors.values()) { switch (descriptor.type) { case 'pending': @@ -379,5 +417,9 @@ export default class OutgoingDataTrackManager extends (EventEmitter as new () => } } this.descriptors.clear(); + + this.inFlightPacketCounter.clear(); + + this.emit('reset'); } } diff --git a/src/room/data-track/outgoing/types.ts b/src/room/data-track/outgoing/types.ts index d0f8bd576b..fdfae5e189 100644 --- a/src/room/data-track/outgoing/types.ts +++ b/src/room/data-track/outgoing/types.ts @@ -34,6 +34,8 @@ export type EventSfuUnpublishRequest = { /** A serialized packet is ready to be sent over the transport. */ export type EventPacketAvailable = { + /** The handle associated with the data track which this packet bytes belong to. */ + handle: DataTrackHandle; bytes: Uint8Array; }; @@ -43,3 +45,6 @@ export type EventTrackPublished = { track: LocalDataTrack }; /** A track has been unpublished by a remote participant and can no longer be subscribed to. */ export type EventTrackUnpublished = { sid: DataTrackSid }; + +/** A track has had all of its in flight packets sent via the rtc data channel. */ +export type EventPacketsFlushed = { handle: DataTrackHandle }; diff --git a/src/utils/subscribeToEvents.ts b/src/utils/subscribeToEvents.ts index 292f661197..8a76dafad7 100644 --- a/src/utils/subscribeToEvents.ts +++ b/src/utils/subscribeToEvents.ts @@ -8,10 +8,13 @@ export function subscribeToEvents< Callbacks extends EventMap, EventNames extends keyof Callbacks = keyof Callbacks, >(eventEmitter: TypedEventEmitter, eventNames: Array) { - const nextEventListeners = new Map>>( + // Wrap buffered events in a `{ event }` envelope so that no-payload events (like + // `reset: () => void`) survive the `if (earliestBufferedEvent)` check in waitFor -- + // an `undefined` payload would otherwise look the same as an empty buffer. + const nextEventListeners = new Map>>( eventNames.map((eventName) => [eventName, []]), ); - const buffers = new Map>( + const buffers = new Map>( eventNames.map((eventName) => [eventName, []]), ); @@ -20,11 +23,11 @@ export function subscribeToEvents< const listeners = nextEventListeners.get(eventName)!; if (listeners.length > 0) { for (const listener of listeners) { - listener.resolve?.(event); + listener.resolve?.({ event }); } nextEventListeners.set(eventName, []); } else { - buffers.get(eventName)!.push(event); + buffers.get(eventName)!.push({ event }); } }) as Callbacks[keyof Callbacks]; return [eventName, onEvent] as [keyof Callbacks, Callbacks[keyof Callbacks]]; @@ -48,14 +51,14 @@ export function subscribeToEvents< } const earliestBufferedEvent = buffer.shift(); if (earliestBufferedEvent) { - return earliestBufferedEvent as EventPayload; + return earliestBufferedEvent.event as EventPayload; } // Otherwise wait for the next event to come in. - const future = new Future(); + const future = new Future<{ event: unknown }, never>(); nextEventListeners.get(eventName)!.push(future); - const nextEvent = await future.promise; - return nextEvent as EventPayload; + const { event } = await future.promise; + return event as EventPayload; }, /** Are there events of the given name which are waiting to be processed? Use this to assert * that no unexpected events have been emitted. */