Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/afraid-cooks-visit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Add local data track flush method
8 changes: 6 additions & 2 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,10 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
.on('trackUnpublished', (event) => {
this.emit(RoomEvent.LocalDataTrackUnpublished, event.sid);
})
.on('packetAvailable', ({ bytes }) => {
this.engine.sendLossyBytes(bytes, DataChannelKind.DATA_TRACK_LOSSY, 'wait');
.on('packetAvailable', ({ handle, bytes }) => {
this.engine
.sendLossyBytes(bytes, DataChannelKind.DATA_TRACK_LOSSY, 'wait')
Comment thread
1egoman marked this conversation as resolved.
.finally(() => this.outgoingDataTrackManager.handlePacketSendComplete(handle));
});

this.disconnectLock = new Mutex();
Expand Down Expand Up @@ -1749,6 +1751,8 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.bufferedEvents = [];
this.transcriptionReceivedTimes.clear();
this.incomingDataStreamManager.clearControllers();
this.incomingDataTrackManager.reset();
this.outgoingDataTrackManager.reset();
if (this.state === ConnectionState.Disconnected) {
return;
}
Expand Down
51 changes: 51 additions & 0 deletions src/room/data-track/LocalDataTrack.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import log, { LoggerNames, type StructuredLogger, getLogger } from '../../logger';
import { Future } from '../utils';
import { type DataTrackFrame, DataTrackFrameInternal } from './frame';
import type { DataTrackHandle } from './handle';
import type OutgoingDataTrackManager from './outgoing/OutgoingDataTrackManager';
Expand Down Expand Up @@ -28,14 +29,35 @@ export default class LocalDataTrack implements ILocalTrack, IDataTrack {

protected log: StructuredLogger = log;

/** Resolves once the data track has sent all pending packets the rtc data channel buffer. */
protected flushedFuture = new Future<void, never>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what should happen to this future if the room disconnects while flushing?

Copy link
Copy Markdown
Contributor Author

@1egoman 1egoman Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - I think the two options are rejecting with an error or just resolving it normally. Also I would argue that the better signal might be engine close, not room disconnect, though I think in practice the former should always happen when the latter occurs.

Any thoughts on either of these points before I add this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

engine close also sounds fine to me!

no strong opinion on whether it should be rejecting or resolving, but something should happen!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet, done in 8c3b58e and da85f11 (new test).


/** @internal */
constructor(options: DataTrackOptions, manager: OutgoingDataTrackManager) {
this.options = options;
this.manager = manager;

this.log = getLogger(LoggerNames.DataTracks);

this.manager.on('packetsFlushed', this.handleManagerPacketsFlushed);
this.manager.on('reset', this.handleManagerReset);
}

private handleManagerReset = () => {
// When the associated manager resets, mark any in flight flushes as complete
// There's nothing actionable a user can do to get these to complete so no
// error is being thrown.
this.handleManagerPacketsFlushed();

this.manager.off('packetsFlushed', this.handleManagerReset);
this.manager.off('reset', this.handleManagerReset);
};

private handleManagerPacketsFlushed = () => {
this.flushedFuture.resolve?.();
this.flushedFuture = new Future();
};

/** @internal */
static withExplicitHandle(
options: DataTrackOptions,
Expand Down Expand Up @@ -104,6 +126,35 @@ export default class LocalDataTrack implements ILocalTrack, IDataTrack {
}
}

/**
* When called, waits for all in flight packets to be sent before resolving.
*
* Use this to:
*
* 1. Send frames exactly in order:
* ```ts
* await track.tryPush(/* ... *\/);
* await track.flush();
* await track.tryPush(/* ... *\/);
* await track.flush();
* // ... etc ...
* ```
*
* 2. Wait for frames to all be delivered before unpublishing a local data track:
*
* ```ts
* await track.tryPush(/* ... *\/);
* await track.tryPush(/* ... *\/);
* await track.tryPush(/* ... *\/);
* // ... etc ...
* await track.flush();
* await track.unpublish();
* ```
**/
async flush(): Promise<void> {
return this.flushedFuture.promise;
}

/**
* Unpublish the track from the SFU. Once this is called, any further calls to {@link tryPush}
* will fail.
Expand Down
4 changes: 4 additions & 0 deletions src/room/data-track/handle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,8 @@ export class DataTrackHandleAllocator {
}
return this.value;
}

reset() {
this.value = 0;
}
}
4 changes: 2 additions & 2 deletions src/room/data-track/incoming/IncomingDataTrackManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ describe('DataTrackIncomingManager', () => {
expect(sfuUpdateSubscriptionEvent.subscribe).toStrictEqual(false);

// 7. Make sure shutting down the manager doesn't throw errors
manager.shutdown();
manager.reset();
});

it('should NOT terminate the sfu subscription if the abortsignal is triggered on one of two active subscriptions', async () => {
Expand Down Expand Up @@ -675,7 +675,7 @@ describe('DataTrackIncomingManager', () => {
);

// 4. Shutdown the manager, and make sure it doesn't throw
manager.shutdown();
manager.reset();

// 5. Make sure the trackUnpublished event fires for the descriptor
const trackUnpublishedEvent = await managerEvents.waitFor('trackUnpublished');
Expand Down
8 changes: 5 additions & 3 deletions src/room/data-track/incoming/IncomingDataTrackManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export default class IncomingDataTrackManager extends (EventEmitter as new () =>
*
* This is an index that allows track descriptors to be looked up
* by subscriber handle in O(1) time, to make routing incoming packets
* a (hot code path) faster.
* (a hot code path) faster.
*/
private subscriptionHandles = new Map<DataTrackHandle, DataTrackSid>();

Expand Down Expand Up @@ -626,8 +626,9 @@ export default class IncomingDataTrackManager extends (EventEmitter as new () =>
}
}

/** Shutdown the manager, ending any subscriptions. */
shutdown() {
/** Resets the manager, ending any subscriptions, and getting it ready for the next room
* connection. */
reset() {
for (const descriptor of this.descriptors.values()) {
this.emit('trackUnpublished', {
sid: descriptor.info.sid,
Expand All @@ -643,5 +644,6 @@ export default class IncomingDataTrackManager extends (EventEmitter as new () =>
}
}
this.descriptors.clear();
this.subscriptionHandles.clear();
}
}
Loading
Loading