Skip to content

Commit 3a98d99

Browse files
authored
Add local data track flush method (#1925)
1 parent fb32606 commit 3a98d99

10 files changed

Lines changed: 369 additions & 19 deletions

.changeset/afraid-cooks-visit.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'livekit-client': patch
3+
---
4+
5+
Add local data track flush method

src/room/Room.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,10 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
290290
.on('trackUnpublished', (event) => {
291291
this.emit(RoomEvent.LocalDataTrackUnpublished, event.sid);
292292
})
293-
.on('packetAvailable', ({ bytes }) => {
294-
this.engine.sendLossyBytes(bytes, DataChannelKind.DATA_TRACK_LOSSY, 'wait');
293+
.on('packetAvailable', ({ handle, bytes }) => {
294+
this.engine
295+
.sendLossyBytes(bytes, DataChannelKind.DATA_TRACK_LOSSY, 'wait')
296+
.finally(() => this.outgoingDataTrackManager.handlePacketSendComplete(handle));
295297
});
296298

297299
this.disconnectLock = new Mutex();
@@ -1730,6 +1732,8 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
17301732
this.bufferedEvents = [];
17311733
this.transcriptionReceivedTimes.clear();
17321734
this.incomingDataStreamManager.clearControllers();
1735+
this.incomingDataTrackManager.reset();
1736+
this.outgoingDataTrackManager.reset();
17331737
if (this.state === ConnectionState.Disconnected) {
17341738
return;
17351739
}

src/room/data-track/LocalDataTrack.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import log, { LoggerNames, type StructuredLogger, getLogger } from '../../logger';
2+
import { Future } from '../utils';
23
import { type DataTrackFrame, DataTrackFrameInternal } from './frame';
34
import type { DataTrackHandle } from './handle';
45
import type OutgoingDataTrackManager from './outgoing/OutgoingDataTrackManager';
@@ -28,14 +29,35 @@ export default class LocalDataTrack implements ILocalTrack, IDataTrack {
2829

2930
protected log: StructuredLogger = log;
3031

32+
/** Resolves once the data track has sent all pending packets the rtc data channel buffer. */
33+
protected flushedFuture = new Future<void, never>();
34+
3135
/** @internal */
3236
constructor(options: DataTrackOptions, manager: OutgoingDataTrackManager) {
3337
this.options = options;
3438
this.manager = manager;
3539

3640
this.log = getLogger(LoggerNames.DataTracks);
41+
42+
this.manager.on('packetsFlushed', this.handleManagerPacketsFlushed);
43+
this.manager.on('reset', this.handleManagerReset);
3744
}
3845

46+
private handleManagerReset = () => {
47+
// When the associated manager resets, mark any in flight flushes as complete
48+
// There's nothing actionable a user can do to get these to complete so no
49+
// error is being thrown.
50+
this.handleManagerPacketsFlushed();
51+
52+
this.manager.off('packetsFlushed', this.handleManagerReset);
53+
this.manager.off('reset', this.handleManagerReset);
54+
};
55+
56+
private handleManagerPacketsFlushed = () => {
57+
this.flushedFuture.resolve?.();
58+
this.flushedFuture = new Future();
59+
};
60+
3961
/** @internal */
4062
static withExplicitHandle(
4163
options: DataTrackOptions,
@@ -104,6 +126,35 @@ export default class LocalDataTrack implements ILocalTrack, IDataTrack {
104126
}
105127
}
106128

129+
/**
130+
* When called, waits for all in flight packets to be sent before resolving.
131+
*
132+
* Use this to:
133+
*
134+
* 1. Send frames exactly in order:
135+
* ```ts
136+
* await track.tryPush(/* ... *\/);
137+
* await track.flush();
138+
* await track.tryPush(/* ... *\/);
139+
* await track.flush();
140+
* // ... etc ...
141+
* ```
142+
*
143+
* 2. Wait for frames to all be delivered before unpublishing a local data track:
144+
*
145+
* ```ts
146+
* await track.tryPush(/* ... *\/);
147+
* await track.tryPush(/* ... *\/);
148+
* await track.tryPush(/* ... *\/);
149+
* // ... etc ...
150+
* await track.flush();
151+
* await track.unpublish();
152+
* ```
153+
**/
154+
async flush(): Promise<void> {
155+
return this.flushedFuture.promise;
156+
}
157+
107158
/**
108159
* Unpublish the track from the SFU. Once this is called, any further calls to {@link tryPush}
109160
* will fail.

src/room/data-track/handle.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,8 @@ export class DataTrackHandleAllocator {
6666
}
6767
return this.value;
6868
}
69+
70+
reset() {
71+
this.value = 0;
72+
}
6973
}

src/room/data-track/incoming/IncomingDataTrackManager.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ describe('DataTrackIncomingManager', () => {
327327
expect(sfuUpdateSubscriptionEvent.subscribe).toStrictEqual(false);
328328

329329
// 7. Make sure shutting down the manager doesn't throw errors
330-
manager.shutdown();
330+
manager.reset();
331331
});
332332

333333
it('should NOT terminate the sfu subscription if the abortsignal is triggered on one of two active subscriptions', async () => {
@@ -675,7 +675,7 @@ describe('DataTrackIncomingManager', () => {
675675
);
676676

677677
// 4. Shutdown the manager, and make sure it doesn't throw
678-
manager.shutdown();
678+
manager.reset();
679679

680680
// 5. Make sure the trackUnpublished event fires for the descriptor
681681
const trackUnpublishedEvent = await managerEvents.waitFor('trackUnpublished');

src/room/data-track/incoming/IncomingDataTrackManager.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ export default class IncomingDataTrackManager extends (EventEmitter as new () =>
9292
*
9393
* This is an index that allows track descriptors to be looked up
9494
* by subscriber handle in O(1) time, to make routing incoming packets
95-
* a (hot code path) faster.
95+
* (a hot code path) faster.
9696
*/
9797
private subscriptionHandles = new Map<DataTrackHandle, DataTrackSid>();
9898

@@ -626,8 +626,9 @@ export default class IncomingDataTrackManager extends (EventEmitter as new () =>
626626
}
627627
}
628628

629-
/** Shutdown the manager, ending any subscriptions. */
630-
shutdown() {
629+
/** Resets the manager, ending any subscriptions, and getting it ready for the next room
630+
* connection. */
631+
reset() {
631632
for (const descriptor of this.descriptors.values()) {
632633
this.emit('trackUnpublished', {
633634
sid: descriptor.info.sid,
@@ -643,5 +644,6 @@ export default class IncomingDataTrackManager extends (EventEmitter as new () =>
643644
}
644645
}
645646
this.descriptors.clear();
647+
this.subscriptionHandles.clear();
646648
}
647649
}

0 commit comments

Comments
 (0)