Skip to content

Commit ff7a5e9

Browse files
authored
Buffer stream events until connected (#1867)
1 parent d1e0a5d commit ff7a5e9

3 files changed

Lines changed: 32 additions & 2 deletions

File tree

.changeset/soft-olives-clean.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"livekit-client": patch
3+
---
4+
5+
Buffer stream events until connected

src/room/Room.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2475,6 +2475,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
24752475
return false;
24762476
}
24772477
this.state = state;
2478+
this.incomingDataStreamManager.setConnected(state === ConnectionState.Connected);
24782479
this.emit(RoomEvent.ConnectionStateChanged, this.state);
24792480
return true;
24802481
}

src/room/data-stream/incoming/IncomingDataStreamManager.ts

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,25 @@ export default class IncomingDataStreamManager {
2727

2828
private textStreamHandlers = new Map<string, TextStreamHandler>();
2929

30+
private isConnected = false;
31+
32+
private bufferedPackets: Array<{ packet: DataPacket; encryptionType: Encryption_Type }> = [];
33+
34+
setConnected(connected: boolean) {
35+
this.isConnected = connected;
36+
if (connected) {
37+
this.flushBufferedPackets();
38+
}
39+
}
40+
41+
private flushBufferedPackets() {
42+
const packets = this.bufferedPackets;
43+
this.bufferedPackets = [];
44+
for (const { packet, encryptionType } of packets) {
45+
this.handleDataStreamPacket(packet, encryptionType);
46+
}
47+
}
48+
3049
registerTextStreamHandler(topic: string, callback: TextStreamHandler) {
3150
if (this.textStreamHandlers.has(topic)) {
3251
throw new DataStreamError(
@@ -58,6 +77,7 @@ export default class IncomingDataStreamManager {
5877
clearControllers() {
5978
this.byteStreamControllers.clear();
6079
this.textStreamControllers.clear();
80+
this.bufferedPackets = [];
6181
}
6282

6383
validateParticipantHasNoActiveDataStreams(participantIdentity: string) {
@@ -88,7 +108,11 @@ export default class IncomingDataStreamManager {
88108
}
89109
}
90110

91-
async handleDataStreamPacket(packet: DataPacket, encryptionType: Encryption_Type) {
111+
handleDataStreamPacket(packet: DataPacket, encryptionType: Encryption_Type) {
112+
if (!this.isConnected) {
113+
this.bufferedPackets.push({ packet, encryptionType });
114+
return;
115+
}
92116
switch (packet.value.case) {
93117
case 'streamHeader':
94118
return this.handleStreamHeader(
@@ -105,7 +129,7 @@ export default class IncomingDataStreamManager {
105129
}
106130
}
107131

108-
private async handleStreamHeader(
132+
private handleStreamHeader(
109133
streamHeader: DataStream_Header,
110134
participantIdentity: string,
111135
encryptionType: Encryption_Type,

0 commit comments

Comments
 (0)