Skip to content

Commit 0eb011e

Browse files
committed
refactor: put rpc request headers into an enum
1 parent 068325a commit 0eb011e

6 files changed

Lines changed: 41 additions & 58 deletions

File tree

src/room/rpc/client/RpcClientManager.test.ts

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,7 @@ import { CLIENT_PROTOCOL_DATA_STREAM_RPC, CLIENT_PROTOCOL_DEFAULT } from '../../
55
import type RTCEngine from '../../RTCEngine';
66
import OutgoingDataStreamManager from '../../data-stream/outgoing/OutgoingDataStreamManager';
77
import { sleep } from '../../utils';
8-
import {
9-
RPC_REQUEST_DATA_STREAM_TOPIC,
10-
RPC_REQUEST_ID_ATTR,
11-
RPC_REQUEST_METHOD_ATTR,
12-
RPC_REQUEST_VERSION_ATTR,
13-
RpcError,
14-
} from '../utils';
8+
import { RPC_REQUEST_DATA_STREAM_TOPIC, RpcError, RpcRequestAttrs } from '../utils';
159
import RpcClientManager from './RpcClientManager';
1610
import type { RpcClientManagerCallbacks } from './events';
1711

@@ -183,9 +177,9 @@ describe('RpcClientManager', () => {
183177
topic: RPC_REQUEST_DATA_STREAM_TOPIC,
184178
destinationIdentities: ['destination-identity'],
185179
attributes: expect.objectContaining({
186-
[RPC_REQUEST_ID_ATTR]: requestId,
187-
[RPC_REQUEST_METHOD_ATTR]: 'test-method',
188-
[RPC_REQUEST_VERSION_ATTR]: '2',
180+
[RpcRequestAttrs.RPC_REQUEST_ID]: requestId,
181+
[RpcRequestAttrs.RPC_REQUEST_METHOD]: 'test-method',
182+
[RpcRequestAttrs.RPC_REQUEST_VERSION]: '2',
189183
}),
190184
}),
191185
);
@@ -223,9 +217,9 @@ describe('RpcClientManager', () => {
223217
topic: RPC_REQUEST_DATA_STREAM_TOPIC,
224218
destinationIdentities: ['destination-identity'],
225219
attributes: expect.objectContaining({
226-
[RPC_REQUEST_ID_ATTR]: requestId,
227-
[RPC_REQUEST_METHOD_ATTR]: 'test-method',
228-
[RPC_REQUEST_VERSION_ATTR]: '2',
220+
[RpcRequestAttrs.RPC_REQUEST_ID]: requestId,
221+
[RpcRequestAttrs.RPC_REQUEST_METHOD]: 'test-method',
222+
[RpcRequestAttrs.RPC_REQUEST_VERSION]: '2',
229223
}),
230224
}),
231225
);

src/room/rpc/client/RpcClientManager.ts

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,10 @@ import {
1111
MAX_V1_PAYLOAD_BYTES,
1212
type PerformRpcParams,
1313
RPC_REQUEST_DATA_STREAM_TOPIC,
14-
RPC_REQUEST_ID_ATTR,
15-
RPC_REQUEST_METHOD_ATTR,
16-
RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR,
17-
RPC_REQUEST_VERSION_ATTR,
1814
RPC_VERSION_V1,
1915
RPC_VERSION_V2,
2016
RpcError,
17+
RpcRequestAttrs,
2118
byteLength,
2219
} from '../utils';
2320
import type { RpcClientManagerCallbacks } from './events';
@@ -149,10 +146,10 @@ export default class RpcClientManager extends (EventEmitter as new () => TypedEm
149146
topic: RPC_REQUEST_DATA_STREAM_TOPIC,
150147
destinationIdentities: [destinationIdentity],
151148
attributes: {
152-
[RPC_REQUEST_ID_ATTR]: requestId,
153-
[RPC_REQUEST_METHOD_ATTR]: method,
154-
[RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR]: `${responseTimeout}`,
155-
[RPC_REQUEST_VERSION_ATTR]: `${RPC_VERSION_V2}`,
149+
[RpcRequestAttrs.RPC_REQUEST_ID]: requestId,
150+
[RpcRequestAttrs.RPC_REQUEST_METHOD]: method,
151+
[RpcRequestAttrs.RPC_REQUEST_RESPONSE_TIMEOUT_MS]: `${responseTimeout}`,
152+
[RpcRequestAttrs.RPC_REQUEST_VERSION]: `${RPC_VERSION_V2}`,
156153
},
157154
});
158155

@@ -185,9 +182,9 @@ export default class RpcClientManager extends (EventEmitter as new () => TypedEm
185182
* @internal
186183
*/
187184
async handleIncomingDataStream(reader: TextStreamReader, attributes: Record<string, string>) {
188-
const associatedRequestId = attributes[RPC_REQUEST_ID_ATTR];
185+
const associatedRequestId = attributes[RpcRequestAttrs.RPC_REQUEST_ID];
189186
if (!associatedRequestId) {
190-
this.log.warn(`RPC data stream malformed: ${RPC_REQUEST_ID_ATTR} not set.`);
187+
this.log.warn(`RPC data stream malformed: ${RpcRequestAttrs.RPC_REQUEST_ID} not set.`);
191188
// NOTE: no response can be sent here, because there's no request id so associate
192189
// so logging is the best we can do here.
193190
return;

src/room/rpc/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ export {
66
type PerformRpcParams,
77
RPC_REQUEST_DATA_STREAM_TOPIC,
88
RPC_RESPONSE_DATA_STREAM_TOPIC,
9-
RPC_REQUEST_ID_ATTR,
9+
RpcRequestAttrs,
1010
RpcError,
1111
type RpcInvocationData,
1212
byteLength,

src/room/rpc/server/RpcServerManager.test.ts

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,7 @@ import { subscribeToEvents } from '../../../utils/subscribeToEvents';
55
import { CLIENT_PROTOCOL_DATA_STREAM_RPC, CLIENT_PROTOCOL_DEFAULT } from '../../../version';
66
import type RTCEngine from '../../RTCEngine';
77
import OutgoingDataStreamManager from '../../data-stream/outgoing/OutgoingDataStreamManager';
8-
import {
9-
RPC_REQUEST_ID_ATTR,
10-
RPC_REQUEST_METHOD_ATTR,
11-
RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR,
12-
RPC_REQUEST_VERSION_ATTR,
13-
RPC_RESPONSE_DATA_STREAM_TOPIC,
14-
RpcError,
15-
} from '../utils';
8+
import { RPC_RESPONSE_DATA_STREAM_TOPIC, RpcError, RpcRequestAttrs } from '../utils';
169
import RpcServerManager from './RpcServerManager';
1710
import type { RpcServerManagerCallbacks } from './events';
1811

@@ -216,10 +209,10 @@ describe('RpcServerManager', () => {
216209

217210
function makeDataStreamAttrs(requestId: string, method: string, responseTimeout: number) {
218211
return {
219-
[RPC_REQUEST_ID_ATTR]: requestId,
220-
[RPC_REQUEST_METHOD_ATTR]: method,
221-
[RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR]: `${responseTimeout}`,
222-
[RPC_REQUEST_VERSION_ATTR]: '2',
212+
[RpcRequestAttrs.RPC_REQUEST_ID]: requestId,
213+
[RpcRequestAttrs.RPC_REQUEST_METHOD]: method,
214+
[RpcRequestAttrs.RPC_REQUEST_RESPONSE_TIMEOUT_MS]: `${responseTimeout}`,
215+
[RpcRequestAttrs.RPC_REQUEST_VERSION]: '2',
223216
};
224217
}
225218

@@ -254,7 +247,7 @@ describe('RpcServerManager', () => {
254247
expect.objectContaining({
255248
topic: RPC_RESPONSE_DATA_STREAM_TOPIC,
256249
destinationIdentities: ['caller-identity'],
257-
attributes: { [RPC_REQUEST_ID_ATTR]: requestId },
250+
attributes: { [RpcRequestAttrs.RPC_REQUEST_ID]: requestId },
258251
}),
259252
);
260253
expect(mockStreamTextWriter.write).toHaveBeenCalledWith('response payload');

src/room/rpc/server/RpcServerManager.ts

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,11 @@ import type OutgoingDataStreamManager from '../../data-stream/outgoing/OutgoingD
88
import type Participant from '../../participant/Participant';
99
import {
1010
MAX_V1_PAYLOAD_BYTES,
11-
RPC_REQUEST_ID_ATTR,
12-
RPC_REQUEST_METHOD_ATTR,
13-
RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR,
14-
RPC_REQUEST_VERSION_ATTR,
1511
RPC_RESPONSE_DATA_STREAM_TOPIC,
1612
RPC_VERSION_V2,
1713
RpcError,
1814
type RpcInvocationData,
15+
RpcRequestAttrs,
1916
byteLength,
2017
} from '../utils';
2118
import type { RpcServerManagerCallbacks } from './events';
@@ -105,7 +102,10 @@ export default class RpcServerManager extends (EventEmitter as new () => TypedEm
105102
`Uncaught error returned by RPC handler for ${rpcRequest.method}. Returning APPLICATION_ERROR instead.`,
106103
error,
107104
);
108-
responseError = RpcError.builtIn('APPLICATION_ERROR');
105+
responseError = RpcError.builtIn(
106+
'APPLICATION_ERROR',
107+
`Uncaught error: ${(error as Error)?.message ?? error}`,
108+
);
109109
}
110110

111111
this.publishRpcResponsePacket(callerIdentity, rpcRequest.id, null, responseError);
@@ -125,14 +125,17 @@ export default class RpcServerManager extends (EventEmitter as new () => TypedEm
125125
callerIdentity: Participant['identity'],
126126
dataStreamAttrs: Record<string, string>,
127127
) {
128-
const requestId = dataStreamAttrs[RPC_REQUEST_ID_ATTR];
129-
const method = dataStreamAttrs[RPC_REQUEST_METHOD_ATTR];
130-
const responseTimeout = parseInt(dataStreamAttrs[RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR], 10);
131-
const version = parseInt(dataStreamAttrs[RPC_REQUEST_VERSION_ATTR], 10);
128+
const requestId = dataStreamAttrs[RpcRequestAttrs.RPC_REQUEST_ID];
129+
const method = dataStreamAttrs[RpcRequestAttrs.RPC_REQUEST_METHOD];
130+
const responseTimeout = parseInt(
131+
dataStreamAttrs[RpcRequestAttrs.RPC_REQUEST_RESPONSE_TIMEOUT_MS],
132+
10,
133+
);
134+
const version = parseInt(dataStreamAttrs[RpcRequestAttrs.RPC_REQUEST_VERSION], 10);
132135

133136
if (!requestId || !method || Number.isNaN(responseTimeout) || Number.isNaN(version)) {
134137
this.log.warn(
135-
`RPC data stream malformed: ${RPC_REQUEST_ID_ATTR} / ${RPC_REQUEST_METHOD_ATTR} / ${RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR} / ${RPC_REQUEST_VERSION_ATTR} not set.`,
138+
`RPC data stream malformed: ${RpcRequestAttrs.RPC_REQUEST_ID} / ${RpcRequestAttrs.RPC_REQUEST_METHOD} / ${RpcRequestAttrs.RPC_REQUEST_RESPONSE_TIMEOUT_MS} / ${RpcRequestAttrs.RPC_REQUEST_VERSION} not set.`,
136139
);
137140
this.publishRpcResponsePacket(
138141
callerIdentity,
@@ -262,7 +265,7 @@ export default class RpcServerManager extends (EventEmitter as new () => TypedEm
262265
const writer = await this.outgoingDataStreamManager.streamText({
263266
topic: RPC_RESPONSE_DATA_STREAM_TOPIC,
264267
destinationIdentities: [destinationIdentity],
265-
attributes: { [RPC_REQUEST_ID_ATTR]: requestId },
268+
attributes: { [RpcRequestAttrs.RPC_REQUEST_ID]: requestId },
266269
});
267270
await writer.write(payload);
268271
await writer.close();

src/room/rpc/utils.ts

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -157,16 +157,12 @@ export const RPC_REQUEST_DATA_STREAM_TOPIC = 'lk.rpc_request';
157157
export const RPC_RESPONSE_DATA_STREAM_TOPIC = 'lk.rpc_response';
158158

159159
/** @internal */
160-
export const RPC_REQUEST_ID_ATTR = 'lk.rpc_request_id';
161-
162-
/** @internal */
163-
export const RPC_REQUEST_METHOD_ATTR = 'lk.rpc_request_method';
164-
165-
/** @internal */
166-
export const RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR = 'lk.rpc_request_response_timeout_ms';
167-
168-
/** @internal */
169-
export const RPC_REQUEST_VERSION_ATTR = 'lk.rpc_request_version';
160+
export enum RpcRequestAttrs {
161+
RPC_REQUEST_ID = 'lk.rpc_request_id',
162+
RPC_REQUEST_METHOD = 'lk.rpc_request_method',
163+
RPC_REQUEST_RESPONSE_TIMEOUT_MS = 'lk.rpc_request_response_timeout_ms',
164+
RPC_REQUEST_VERSION = 'lk.rpc_request_version',
165+
}
170166

171167
/** Initial version of rpc which uses RpcRequest / RpcResponse messages.
172168
* @internal

0 commit comments

Comments
 (0)