Skip to content

Commit e627f9b

Browse files
committed
feat: migrate rpc to use data streams for sending rpc requests
1 parent b6c177e commit e627f9b

5 files changed

Lines changed: 69 additions & 212 deletions

File tree

src/room/Room.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1853,7 +1853,6 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
18531853
rpc.id,
18541854
rpc.method,
18551855
rpc.payload,
1856-
rpc.compressedPayload,
18571856
rpc.responseTimeoutMs,
18581857
rpc.version,
18591858
() => this.remoteParticipants.has(packet.participantIdentity),

src/room/rpc.test.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ describe('RpcClientManager', () => {
4848
expect(packet.value.value.id).toStrictEqual(requestId);
4949
expect(packet.value.value.method).toStrictEqual('test-method');
5050
expect(packet.value.value.payload).toStrictEqual('request-payload');
51-
expect(packet.value.value.compressedPayload).toStrictEqual(new Uint8Array());
5251

5352
rpcClientManager.handleIncomingRpcAck(requestId);
5453

@@ -192,7 +191,6 @@ describe('RpcServerManager', () => {
192191
requestId,
193192
'test-method',
194193
'request payload',
195-
new Uint8Array(),
196194
responseTimeoutMs,
197195
1,
198196
() => true,
@@ -225,7 +223,6 @@ describe('RpcServerManager', () => {
225223
'test-request-id',
226224
methodName,
227225
'test payload',
228-
new Uint8Array(),
229226
5000,
230227
1,
231228
() => true,
@@ -262,7 +259,6 @@ describe('RpcServerManager', () => {
262259
'test-error-request-id',
263260
methodName,
264261
'test payload',
265-
new Uint8Array(),
266262
5000,
267263
1,
268264
() => true,
@@ -300,7 +296,6 @@ describe('RpcServerManager', () => {
300296
'test-rpc-error-request-id',
301297
methodName,
302298
'test payload',
303-
new Uint8Array(),
304299
5000,
305300
1,
306301
() => true,

src/room/rpc/RpcClientManager.ts

Lines changed: 24 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import { EngineEvent } from '../events';
1111
import type Participant from '../participant/Participant';
1212
import { Future, compareVersions } from '../utils';
1313
import {
14-
DATA_STREAM_MIN_BYTES,
1514
MAX_LEGACY_PAYLOAD_BYTES,
1615
type PerformRpcParams,
1716
RPC_DATA_STREAM_TOPIC,
@@ -20,9 +19,7 @@ import {
2019
RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR,
2120
RpcError,
2221
byteLength,
23-
gzipCompress,
2422
gzipCompressToWriter,
25-
gzipDecompress,
2623
gzipDecompressFromReader,
2724
} from './utils';
2825

@@ -151,70 +148,38 @@ export default class RpcClientManager {
151148
responseTimeout: number,
152149
remoteClientProtocol: number,
153150
) {
154-
const payloadBytes = byteLength(payload);
155-
156-
let mode: 'regular' | 'compressed' | 'compressed-data-stream' = 'regular';
157151
if (remoteClientProtocol >= CLIENT_PROTOCOL_GZIP_RPC) {
158-
mode = 'compressed';
159-
}
160-
if (mode === 'compressed' && payloadBytes > DATA_STREAM_MIN_BYTES) {
161-
mode = 'compressed-data-stream';
152+
// Send payload as a compressed data stream
153+
const writer = await this.outgoingDataStreamManager.streamBytes({
154+
topic: RPC_DATA_STREAM_TOPIC,
155+
destinationIdentities: [destinationIdentity],
156+
mimeType: 'application/octet-stream',
157+
attributes: {
158+
[RPC_REQUEST_ID_ATTR]: requestId,
159+
[RPC_REQUEST_METHOD_ATTR]: method,
160+
[RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR]: `${responseTimeout}`,
161+
},
162+
});
163+
await gzipCompressToWriter(payload, writer);
164+
await writer.close();
165+
return;
162166
}
163167

164-
switch (mode) {
165-
case 'compressed-data-stream': {
166-
// Large payload: create the data stream tagged with the request ID,
167-
// send the RPC request with empty payload/compressedPayload, then
168-
// stream compressed chunks for lower TTFB
169-
const writer = await this.outgoingDataStreamManager.streamBytes({
170-
topic: RPC_DATA_STREAM_TOPIC,
171-
destinationIdentities: [destinationIdentity],
172-
mimeType: 'application/octet-stream',
173-
attributes: {
174-
[RPC_REQUEST_ID_ATTR]: requestId,
175-
[RPC_REQUEST_METHOD_ATTR]: method,
176-
[RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR]: `${responseTimeout}`,
177-
},
178-
});
179-
await gzipCompressToWriter(payload, writer);
180-
await writer.close();
181-
return;
182-
}
183-
184-
case 'compressed':
185-
// Medium payload: compress inline
186-
const compressedPayload = await gzipCompress(payload);
187-
await this.sendRpcRequestPacket(
188-
destinationIdentity,
189-
requestId,
190-
method,
191-
'',
192-
compressedPayload,
193-
responseTimeout,
194-
);
195-
break;
196-
197-
case 'regular':
198-
default:
199-
// Small payload: just include the payload directly, uncompressed
200-
await this.sendRpcRequestPacket(
201-
destinationIdentity,
202-
requestId,
203-
method,
204-
payload,
205-
undefined,
206-
responseTimeout,
207-
);
208-
break;
209-
}
168+
// Legacy client: send uncompressed payload inline
169+
await this.sendRpcRequestPacket(
170+
destinationIdentity,
171+
requestId,
172+
method,
173+
payload,
174+
responseTimeout,
175+
);
210176
}
211177

212178
private async sendRpcRequestPacket(
213179
destinationIdentity: string,
214180
requestId: string,
215181
method: string,
216-
payload: string | undefined,
217-
compressedPayload: Uint8Array | undefined,
182+
payload: string,
218183
responseTimeout: number,
219184
) {
220185
const packet = new DataPacket({
@@ -225,8 +190,7 @@ export default class RpcClientManager {
225190
value: new RpcRequest({
226191
id: requestId,
227192
method,
228-
payload: payload ?? '',
229-
compressedPayload: compressedPayload ?? new Uint8Array(),
193+
payload,
230194
responseTimeoutMs: responseTimeout,
231195
version: 1,
232196
}),
@@ -251,22 +215,6 @@ export default class RpcClientManager {
251215
return true;
252216
}
253217

254-
case 'compressedPayload': {
255-
let payload;
256-
try {
257-
payload = await gzipDecompress(rpcResponse.value.value);
258-
} catch (e) {
259-
this.log.error('Failed to decompress RPC response', e);
260-
this.handleIncomingRpcResponseFailure(
261-
rpcResponse.requestId,
262-
RpcError.builtIn('APPLICATION_ERROR'),
263-
);
264-
return true;
265-
}
266-
this.handleIncomingRpcResponseSuccess(rpcResponse.requestId, payload);
267-
return true;
268-
}
269-
270218
case 'error': {
271219
const error = RpcError.fromProto(rpcResponse.value.value);
272220
this.handleIncomingRpcResponseFailure(rpcResponse.requestId, error);

0 commit comments

Comments
 (0)