Skip to content

Commit 7dcfdc4

Browse files
authored
Properly handle max payload length of zero (#1966)
1 parent 682dec1 commit 7dcfdc4

3 files changed

Lines changed: 115 additions & 3 deletions

File tree

.changeset/chatty-oranges-clap.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'livekit-client': patch
3+
---
4+
5+
Properly handle maxPayloadLength of 0, upper bound maxMessageSize

src/room/RTCEngine.test.ts

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,15 +223,18 @@ describe('RTCEngine', () => {
223223

224224
describe('sendDataPacket', () => {
225225
const MAX_DATA_PACKET_SIZE = 64 * 1024 - 1; // 65535 bytes (64 KB - 1)
226-
function stubConnectedEngine(engine: RTCEngine) {
226+
function stubConnectedEngine(
227+
engine: RTCEngine,
228+
maxDataPacketSize: number = MAX_DATA_PACKET_SIZE,
229+
) {
227230
const send = vi.fn();
228231
Object.assign(engine as unknown as Record<string, unknown>, {
229232
ensurePublisherConnected: vi.fn().mockResolvedValue(undefined),
230233
waitForBufferStatusLow: vi.fn().mockResolvedValue(undefined),
231234
updateAndEmitDCBufferStatus: vi.fn(),
232235
dataChannelForKind: vi.fn(() => ({ send })),
233236
pcManager: {
234-
getMaxPublisherMessageSize: vi.fn(() => MAX_DATA_PACKET_SIZE),
237+
getMaxPublisherMessageSize: vi.fn(() => maxDataPacketSize),
235238
},
236239
});
237240
return send;
@@ -257,6 +260,25 @@ describe('RTCEngine', () => {
257260
expect(send).not.toHaveBeenCalled();
258261
});
259262

263+
it('does not reject packets if the max data packet size is 0', async () => {
264+
const engine = new RTCEngine(roomOptionDefaults);
265+
const send = stubConnectedEngine(engine, 0);
266+
267+
const packet = new DataPacket({
268+
kind: DataPacket_Kind.RELIABLE,
269+
value: {
270+
case: 'user',
271+
value: new UserPacket({ payload: new Uint8Array(100) }),
272+
},
273+
});
274+
275+
// Sending the packet should succeed, there isn't a size limit
276+
await expect(
277+
engine.sendDataPacket(packet, DataChannelKind.RELIABLE),
278+
).resolves.toBeUndefined();
279+
expect(send).toHaveBeenCalledTimes(1);
280+
});
281+
260282
it('sends packets within the max data packet size', async () => {
261283
const engine = new RTCEngine(roomOptionDefaults);
262284
const send = stubConnectedEngine(engine);
@@ -275,4 +297,57 @@ describe('RTCEngine', () => {
275297
expect(send).toHaveBeenCalledTimes(1);
276298
});
277299
});
300+
301+
describe('handleDataChannelClose', () => {
302+
function stubCloseEnv(
303+
engine: RTCEngine,
304+
{ closed, publisherState }: { closed: boolean; publisherState: RTCPeerConnectionState },
305+
) {
306+
const error = vi.fn();
307+
Object.assign(engine as unknown as Record<string, unknown>, {
308+
_isClosed: closed,
309+
log: { error },
310+
pcManager: {
311+
publisher: { getConnectionState: () => publisherState },
312+
},
313+
});
314+
return error;
315+
}
316+
317+
function fireClose(engine: RTCEngine, kind: DataChannelKind) {
318+
(
319+
engine as unknown as {
320+
handleDataChannelClose: (kind: DataChannelKind) => () => void;
321+
}
322+
).handleDataChannelClose(kind)();
323+
}
324+
325+
it('logs an error when a publisher channel closes while connected', () => {
326+
const engine = new RTCEngine(roomOptionDefaults);
327+
const error = stubCloseEnv(engine, { closed: false, publisherState: 'connected' });
328+
329+
fireClose(engine, DataChannelKind.RELIABLE);
330+
331+
expect(error).toHaveBeenCalledOnce();
332+
expect(error.mock.calls[0][0]).toContain('RELIABLE');
333+
});
334+
335+
it('stays quiet when the engine is already closed', () => {
336+
const engine = new RTCEngine(roomOptionDefaults);
337+
const error = stubCloseEnv(engine, { closed: true, publisherState: 'connected' });
338+
339+
fireClose(engine, DataChannelKind.RELIABLE);
340+
341+
expect(error).not.toHaveBeenCalled();
342+
});
343+
344+
it('stays quiet when the publisher PC is no longer connected', () => {
345+
const engine = new RTCEngine(roomOptionDefaults);
346+
const error = stubCloseEnv(engine, { closed: false, publisherState: 'closed' });
347+
348+
fireClose(engine, DataChannelKind.RELIABLE);
349+
350+
expect(error).not.toHaveBeenCalled();
351+
});
352+
});
278353
});

src/room/RTCEngine.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@ export enum DataChannelKind {
127127
DATA_TRACK_LOSSY = 2,
128128
}
129129

130+
// Default data-channel max message size (bytes), used when the remote SDP
131+
// answer does not advertise an `a=max-message-size` attribute (RFC 8841).
132+
// `0` means "no limit".
133+
const DEFAULT_MAX_MESSAGE_SIZE = 64_000;
134+
130135
/** @internal */
131136
export default class RTCEngine extends (EventEmitter as new () => TypedEventEmitter<EngineEventCallbacks>) {
132137
client: SignalClient;
@@ -864,14 +869,17 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
864869
if (this.lossyDC) {
865870
this.lossyDC.onmessage = null;
866871
this.lossyDC.onerror = null;
872+
this.lossyDC.onclose = null;
867873
}
868874
if (this.reliableDC) {
869875
this.reliableDC.onmessage = null;
870876
this.reliableDC.onerror = null;
877+
this.reliableDC.onclose = null;
871878
}
872879
if (this.dataTrackDC) {
873880
this.dataTrackDC.onmessage = null;
874881
this.dataTrackDC.onerror = null;
882+
this.dataTrackDC.onclose = null;
875883
}
876884

877885
// create data channels
@@ -897,6 +905,11 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
897905
this.reliableDC.onerror = this.handleDataError;
898906
this.dataTrackDC.onerror = this.handleDataError;
899907

908+
// detect unexpected publisher data channel closes
909+
this.lossyDC.onclose = this.handleDataChannelClose(DataChannelKind.LOSSY);
910+
this.reliableDC.onclose = this.handleDataChannelClose(DataChannelKind.RELIABLE);
911+
this.dataTrackDC.onclose = this.handleDataChannelClose(DataChannelKind.DATA_TRACK_LOSSY);
912+
900913
// set up dc buffer threshold, set to 64kB (otherwise 0 by default)
901914
this.lossyDC.bufferedAmountLowThreshold = 65535;
902915
this.reliableDC.bufferedAmountLowThreshold = 65535;
@@ -1036,6 +1049,18 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
10361049
}
10371050
};
10381051

1052+
private handleDataChannelClose = (kind: DataChannelKind) => () => {
1053+
// A publisher DC closing while the session is up and the publisher PC is still
1054+
// connected is the signature of an oversized message having aborted the channel
1055+
// (see livekit/rust-sdks#1137). Surface it; do not attempt renegotiation.
1056+
if (!this._isClosed && this.pcManager?.publisher.getConnectionState() === 'connected') {
1057+
this.log.error(
1058+
`publisher data channel '${DataChannelKind[kind]}' closed unexpectedly`,
1059+
this.logContext,
1060+
);
1061+
}
1062+
};
1063+
10391064
private handleBufferedAmountLow = (channelKind: DataChannelKind) => {
10401065
this.updateAndEmitDCBufferStatus(channelKind);
10411066
};
@@ -1537,9 +1562,16 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
15371562

15381563
const msg = packet.toBinary() as Uint8Array<ArrayBuffer>;
15391564

1540-
const maxPublisherMessageSizeBytes = this.pcManager?.getMaxPublisherMessageSize();
1565+
// Clamp to the SDK default - libwebrtc advertises larger (~256 KiB)
1566+
// than LiveKit/pion can deliver end-to-end (~64 KiB), so we trust
1567+
// the answer up untilthe built in ceiling.
1568+
const maxPublisherMessageSizeBytes = Math.min(
1569+
this.pcManager?.getMaxPublisherMessageSize() ?? DEFAULT_MAX_MESSAGE_SIZE,
1570+
DEFAULT_MAX_MESSAGE_SIZE,
1571+
);
15411572
if (
15421573
typeof maxPublisherMessageSizeBytes !== 'undefined' &&
1574+
maxPublisherMessageSizeBytes !== 0 /* 0 means "no limit" */ &&
15431575
msg.byteLength > maxPublisherMessageSizeBytes
15441576
) {
15451577
throw new PublishDataError(

0 commit comments

Comments
 (0)