Skip to content

Commit 682dec1

Browse files
authored
Add 64k max data packet size (#1962)
1 parent 98fe36c commit 682dec1

5 files changed

Lines changed: 83 additions & 1 deletion

File tree

.changeset/large-pillows-grow.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"livekit-client": patch
3+
---
4+
5+
Enforce a maximum size of the sdp defined maxMessageSize on individual data packets sent via `publishData`

src/room/PCTransport.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,10 @@ export default class PCTransport extends (EventEmitter as new () => TypedEmitter
498498
return this.pc.getStats();
499499
}
500500

501+
getMaxMessageSize() {
502+
return this._pc?.sctp?.maxMessageSize;
503+
}
504+
501505
async getConnectedAddress(): Promise<string | undefined> {
502506
if (!this._pc) {
503507
return;

src/room/PCTransportManager.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,10 @@ export class PCTransportManager {
310310
return matchingTransceiver?.mid;
311311
}
312312

313+
getMaxPublisherMessageSize() {
314+
return this.publisher.getMaxMessageSize();
315+
}
316+
313317
addPublisherTrack(track: MediaStreamTrack) {
314318
return this.publisher.addTrack(track);
315319
}

src/room/RTCEngine.test.ts

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import { DataPacket, DataPacket_Kind, UserPacket } from '@livekit/protocol';
12
import { afterEach, describe, expect, it, vi } from 'vitest';
2-
import RTCEngine from './RTCEngine';
3+
import RTCEngine, { DataChannelKind } from './RTCEngine';
34
import { roomOptionDefaults } from './defaults';
5+
import { PublishDataError } from './errors';
46

57
describe('RTCEngine', () => {
68
const originalRTCRtpSender = window.RTCRtpSender;
@@ -218,4 +220,59 @@ describe('RTCEngine', () => {
218220
expect((sender as unknown as { transform: unknown }).transform).toBe(transform);
219221
expect(createEncodedStreams).not.toHaveBeenCalled();
220222
});
223+
224+
describe('sendDataPacket', () => {
225+
const MAX_DATA_PACKET_SIZE = 64 * 1024 - 1; // 65535 bytes (64 KB - 1)
226+
function stubConnectedEngine(engine: RTCEngine) {
227+
const send = vi.fn();
228+
Object.assign(engine as unknown as Record<string, unknown>, {
229+
ensurePublisherConnected: vi.fn().mockResolvedValue(undefined),
230+
waitForBufferStatusLow: vi.fn().mockResolvedValue(undefined),
231+
updateAndEmitDCBufferStatus: vi.fn(),
232+
dataChannelForKind: vi.fn(() => ({ send })),
233+
pcManager: {
234+
getMaxPublisherMessageSize: vi.fn(() => MAX_DATA_PACKET_SIZE),
235+
},
236+
});
237+
return send;
238+
}
239+
240+
it('rejects packets larger than the max data packet size', async () => {
241+
const engine = new RTCEngine(roomOptionDefaults);
242+
const send = stubConnectedEngine(engine);
243+
244+
// The serialized packet includes protobuf framing on top of the payload, so a payload at the
245+
// limit is already guaranteed to exceed it once serialized.
246+
const packet = new DataPacket({
247+
kind: DataPacket_Kind.RELIABLE,
248+
value: {
249+
case: 'user',
250+
value: new UserPacket({ payload: new Uint8Array(MAX_DATA_PACKET_SIZE) }),
251+
},
252+
});
253+
254+
await expect(engine.sendDataPacket(packet, DataChannelKind.RELIABLE)).rejects.toBeInstanceOf(
255+
PublishDataError,
256+
);
257+
expect(send).not.toHaveBeenCalled();
258+
});
259+
260+
it('sends packets within the max data packet size', async () => {
261+
const engine = new RTCEngine(roomOptionDefaults);
262+
const send = stubConnectedEngine(engine);
263+
264+
const packet = new DataPacket({
265+
kind: DataPacket_Kind.RELIABLE,
266+
value: {
267+
case: 'user',
268+
value: new UserPacket({ payload: new Uint8Array(1024) }),
269+
},
270+
});
271+
272+
await expect(
273+
engine.sendDataPacket(packet, DataChannelKind.RELIABLE),
274+
).resolves.toBeUndefined();
275+
expect(send).toHaveBeenCalledTimes(1);
276+
});
277+
});
221278
});

src/room/RTCEngine.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ import {
7373
ConnectionError,
7474
ConnectionErrorReason,
7575
NegotiationError,
76+
PublishDataError,
7677
SignalReconnectError,
7778
TrackInvalidError,
7879
UnexpectedConnectionState,
@@ -108,6 +109,7 @@ const leaveReconnect = 'leave-reconnect';
108109
const reliabeReceiveStateTTL = 30_000;
109110
const lossyDataChannelBufferThresholdMin = 8 * 1024;
110111
const lossyDataChannelBufferThresholdMax = 256 * 1024;
112+
111113
const initialMediaSectionsAudio = 3;
112114
const initialMediaSectionsVideo = 3;
113115

@@ -1535,6 +1537,16 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
15351537

15361538
const msg = packet.toBinary() as Uint8Array<ArrayBuffer>;
15371539

1540+
const maxPublisherMessageSizeBytes = this.pcManager?.getMaxPublisherMessageSize();
1541+
if (
1542+
typeof maxPublisherMessageSizeBytes !== 'undefined' &&
1543+
msg.byteLength > maxPublisherMessageSizeBytes
1544+
) {
1545+
throw new PublishDataError(
1546+
`cannot publish data packet larger than ${maxPublisherMessageSizeBytes} bytes (got ${msg.byteLength})`,
1547+
);
1548+
}
1549+
15381550
switch (kind) {
15391551
case DataChannelKind.LOSSY:
15401552
case DataChannelKind.DATA_TRACK_LOSSY:

0 commit comments

Comments
 (0)