Skip to content

Commit 3c1d525

Browse files
committed
fix: adding and propagating config values
[ci skip]
1 parent efd3d2a commit 3c1d525

7 files changed

Lines changed: 59 additions & 36 deletions

File tree

src/PolykeyAgent.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,13 @@ type NetworkConfig = {
5656
// RPCServer for client service
5757
clientHost?: Host;
5858
clientPort?: Port;
59-
maxReadBufferBytes?: number;
60-
idleTimeoutTime?: number;
59+
maxReadableStreamBytes?: number;
60+
connectionIdleTimeoutTime?: number;
6161
pingIntervalTime?: number;
6262
pingTimeoutTime?: number;
63+
handlerTimeoutTime?: number;
64+
handlerTimeoutGraceTime?: number;
65+
clientParserBufferByteLimit?: number;
6366
};
6467

6568
interface PolykeyAgent extends CreateDestroyStartStop {}
@@ -439,8 +442,11 @@ class PolykeyAgent {
439442
}),
440443
middlewareFactory: rpcUtilsMiddleware.defaultServerMiddlewareWrapper(
441444
clientUtilsMiddleware.middlewareServer(sessionManager, keyRing),
445+
networkConfig_.clientParserBufferByteLimit,
442446
),
443447
sensitive: false,
448+
handlerTimeoutTime: networkConfig_.handlerTimeoutTime,
449+
handlerTimeoutGraceTime: networkConfig_.handlerTimeoutGraceTime,
444450
logger: logger.getChild('RPCServerClient'),
445451
});
446452
}
@@ -451,14 +457,14 @@ class PolykeyAgent {
451457
webSocketServerClient =
452458
webSocketServerClient ??
453459
(await WebSocketServer.createWebSocketServer({
454-
connectionCallback: (streamPair) =>
455-
rpcServerClient!.handleStream(streamPair),
460+
connectionCallback: (rpcStream) =>
461+
rpcServerClient!.handleStream(rpcStream),
456462
fs,
457463
host: networkConfig_.clientHost,
458464
port: networkConfig_.clientPort,
459465
tlsConfig,
460-
maxReadableStreamBytes: networkConfig_.maxReadBufferBytes,
461-
connectionIdleTimeoutTime: networkConfig_.idleTimeout,
466+
maxReadableStreamBytes: networkConfig_.maxReadableStreamBytes,
467+
connectionIdleTimeoutTime: networkConfig_.connectionIdleTimeoutTime,
462468
pingIntervalTime: networkConfig_.pingIntervalTime,
463469
pingTimeoutTime: networkConfig_.pingTimeoutTime,
464470
logger: logger.getChild('WebSocketServer'),

src/PolykeyClient.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ class PolykeyClient<M extends ClientManifest> {
2929
session,
3030
manifest,
3131
streamFactory,
32+
streamKeepAliveTimeoutTime,
33+
parserBufferByteLimit,
3234
fs = require('fs'),
3335
logger = new Logger(this.name),
3436
fresh = false,
@@ -37,6 +39,8 @@ class PolykeyClient<M extends ClientManifest> {
3739
session?: Session;
3840
manifest: RPCClient<M> | M;
3941
streamFactory: StreamFactory;
42+
streamKeepAliveTimeoutTime?: number;
43+
parserBufferByteLimit?: number;
4044
fs?: FileSystem;
4145
logger?: Logger;
4246
fresh?: boolean;
@@ -63,7 +67,9 @@ class PolykeyClient<M extends ClientManifest> {
6367
middlewareFactory:
6468
rpcUtilsMiddleware.defaultClientMiddlewareWrapper(
6569
clientUtilsMiddleware.middlewareClient(session),
70+
parserBufferByteLimit,
6671
),
72+
streamKeepAliveTimeoutTime,
6773
logger: logger.getChild(RPCClient.name),
6874
});
6975
const pkClient = new this({

src/config.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,17 @@ const config = {
9898
// GRPCServer for agent service
9999
agentHost: '127.0.0.1' as Host,
100100
agentPort: 0 as Port,
101-
// GRPCServer for client service
101+
// RPCServer for client service
102102
clientHost: '127.0.0.1' as Host,
103103
clientPort: 0 as Port,
104-
maxReadBufferBytes: 1_000_000_000, // About 1 GB
104+
// Times and limits for client communication
105+
connectionIdleTimeoutTime: 120, // 2 minutes
105106
pingIntervalTime: 1_000, // 1 second
106107
pingTimeoutTime: 10_000, // 10 seconds
108+
handlerTimeoutTime: 60_000, // 1 minute
109+
handlerTimeoutGraceTime: 2_000, // 2 seconds
110+
maxReadableStreamBytes: 1_000_000_000, // About 1 GB
111+
clientParserBufferByteLimit: 1_000_000, // About 1MB
107112
},
108113
proxyConfig: {
109114
connConnectTime: 2000,

src/rpc/RPCServer.ts

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ class RPCServer extends EventTarget {
7070
manifest,
7171
middlewareFactory = rpcUtilsMiddleware.defaultServerMiddlewareWrapper(),
7272
sensitive = false,
73-
streamKeepAliveTimeoutTime = 60_000, // 1 minute
74-
timeoutForceCloseTime = 2_000, // 2 seconds
73+
handlerTimeoutTime = 60_000, // 1 minute
74+
handlerTimeoutGraceTime = 2_000, // 2 seconds
7575
logger = new Logger(this.name),
7676
}: {
7777
manifest: ServerManifest;
@@ -82,17 +82,17 @@ class RPCServer extends EventTarget {
8282
JSONRPCResponse
8383
>;
8484
sensitive?: boolean;
85-
streamKeepAliveTimeoutTime?: number;
86-
timeoutForceCloseTime?: number;
85+
handlerTimeoutTime?: number;
86+
handlerTimeoutGraceTime?: number;
8787
logger?: Logger;
8888
}): Promise<RPCServer> {
8989
logger.info(`Creating ${this.name}`);
9090
const rpcServer = new this({
9191
manifest,
9292
middlewareFactory,
9393
sensitive,
94-
streamKeepAliveTimeoutTime: streamKeepAliveTimeoutTime,
95-
timeoutForceCloseTime: timeoutForceCloseTime,
94+
handlerTimeoutTime,
95+
handlerTimeoutGraceTime,
9696
logger,
9797
});
9898
logger.info(`Created ${this.name}`);
@@ -102,8 +102,8 @@ class RPCServer extends EventTarget {
102102
protected logger: Logger;
103103
protected handlerMap: Map<string, RawHandlerImplementation> = new Map();
104104
protected defaultTimeoutMap: Map<string, number | undefined> = new Map();
105-
protected streamKeepAliveTimeoutTime: number;
106-
protected timeoutForceCloseTime: number;
105+
protected handlerTimeoutTime: number;
106+
protected handlerTimeoutGraceTime: number;
107107
protected activeStreams: Set<PromiseCancellable<void>> = new Set();
108108
protected sensitive: boolean;
109109
protected middlewareFactory: MiddlewareFactory<
@@ -117,8 +117,8 @@ class RPCServer extends EventTarget {
117117
manifest,
118118
middlewareFactory,
119119
sensitive,
120-
streamKeepAliveTimeoutTime = 60_000, // 1 minuet
121-
timeoutForceCloseTime = 2_000, // 2 seconds
120+
handlerTimeoutTime = 60_000, // 1 minuet
121+
handlerTimeoutGraceTime = 2_000, // 2 seconds
122122
logger,
123123
}: {
124124
manifest: ServerManifest;
@@ -129,8 +129,8 @@ class RPCServer extends EventTarget {
129129
Uint8Array,
130130
JSONRPCResponseResult
131131
>;
132-
streamKeepAliveTimeoutTime?: number;
133-
timeoutForceCloseTime?: number;
132+
handlerTimeoutTime?: number;
133+
handlerTimeoutGraceTime?: number;
134134
sensitive: boolean;
135135
logger: Logger;
136136
}) {
@@ -188,8 +188,8 @@ class RPCServer extends EventTarget {
188188
}
189189
this.middlewareFactory = middlewareFactory;
190190
this.sensitive = sensitive;
191-
this.streamKeepAliveTimeoutTime = streamKeepAliveTimeoutTime;
192-
this.timeoutForceCloseTime = timeoutForceCloseTime;
191+
this.handlerTimeoutTime = handlerTimeoutTime;
192+
this.handlerTimeoutGraceTime = handlerTimeoutGraceTime;
193193
this.logger = logger;
194194
}
195195

@@ -413,7 +413,7 @@ class RPCServer extends EventTarget {
413413
const abortController = new AbortController();
414414
// Setting up timeout timer logic
415415
const timer = new Timer({
416-
delay: this.streamKeepAliveTimeoutTime,
416+
delay: this.handlerTimeoutTime,
417417
handler: () => {
418418
abortController.abort(new rpcErrors.ErrorRPCTimedOut());
419419
},
@@ -424,7 +424,7 @@ class RPCServer extends EventTarget {
424424
let graceTimer: Timer<void> | undefined;
425425
const handleAbort = () => {
426426
const graceTimer = new Timer({
427-
delay: this.timeoutForceCloseTime,
427+
delay: this.handlerTimeoutGraceTime,
428428
handler: () => {
429429
rpcStream.cancel(abortController.signal.reason);
430430
},
@@ -501,7 +501,7 @@ class RPCServer extends EventTarget {
501501
}
502502
// Setting up Timeout logic
503503
const timeout = this.defaultTimeoutMap.get(method);
504-
if (timeout != null && timeout < this.streamKeepAliveTimeoutTime) {
504+
if (timeout != null && timeout < this.handlerTimeoutTime) {
505505
// Reset timeout with new delay if it is less than the default
506506
timer.reset(timeout);
507507
} else {

src/rpc/utils/middleware.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ import { promise } from '../../utils/index';
1818
* also infers the type of the stream output.
1919
* @param messageParser - Validates the JSONRPC messages, so you can select for a
2020
* specific type of message
21-
* @param byteLimit - sets the number of bytes buffered before throwing an
21+
* @param bufferByteLimit - sets the number of bytes buffered before throwing an
2222
* error. This is used to avoid infinitely buffering the input.
2323
*/
2424
function binaryToJsonMessageStream<T extends JSONRPCMessage>(
2525
messageParser: (message: unknown) => T,
26-
byteLimit: number = 1024 * 1024,
26+
bufferByteLimit: number = 1024 * 1024,
2727
): TransformStream<Uint8Array, T> {
2828
const parser = new JSONParser({
2929
separator: '',
@@ -53,7 +53,7 @@ function binaryToJsonMessageStream<T extends JSONRPCMessage>(
5353
} catch (e) {
5454
throw new rpcErrors.ErrorRPCParse(undefined, { cause: e });
5555
}
56-
if (bytesWritten > byteLimit) {
56+
if (bytesWritten > bufferByteLimit) {
5757
throw new rpcErrors.ErrorRPCMessageLength();
5858
}
5959
},
@@ -95,6 +95,7 @@ function defaultMiddleware() {
9595
* The reverse path will pipe the output stream through the provided middleware
9696
* and then transform it back to a binary stream.
9797
* @param middlewareFactory - The provided middleware
98+
* @param parserBufferByteLimit
9899
*/
99100
function defaultServerMiddlewareWrapper(
100101
middlewareFactory: MiddlewareFactory<
@@ -103,10 +104,12 @@ function defaultServerMiddlewareWrapper(
103104
JSONRPCResponse,
104105
JSONRPCResponse
105106
> = defaultMiddleware,
107+
parserBufferByteLimit: number = 1024 * 1024,
106108
): MiddlewareFactory<JSONRPCRequest, Uint8Array, Uint8Array, JSONRPCResponse> {
107109
return (ctx, cancel, meta) => {
108110
const inputTransformStream = binaryToJsonMessageStream(
109111
rpcUtils.parseJSONRPCRequest,
112+
parserBufferByteLimit,
110113
);
111114
const outputTransformStream = new TransformStream<
112115
JSONRPCResponseResult,
@@ -143,6 +146,8 @@ function defaultServerMiddlewareWrapper(
143146
* The reverse path will parse and validate the output and pipe it through the
144147
* provided middleware.
145148
* @param middleware - the provided middleware
149+
* @param parserBufferByteLimit - Max number of bytes to buffer when parsing the stream. Exceeding this results in an
150+
* `ErrorRPCMessageLength` error.
146151
*/
147152
const defaultClientMiddlewareWrapper = (
148153
middleware: MiddlewareFactory<
@@ -151,6 +156,7 @@ const defaultClientMiddlewareWrapper = (
151156
JSONRPCResponse,
152157
JSONRPCResponse
153158
> = defaultMiddleware,
159+
parserBufferByteLimit?: number,
154160
): MiddlewareFactory<
155161
Uint8Array,
156162
JSONRPCRequest,
@@ -160,7 +166,7 @@ const defaultClientMiddlewareWrapper = (
160166
return (ctx, cancel, meta) => {
161167
const outputTransformStream = binaryToJsonMessageStream(
162168
rpcUtils.parseJSONRPCResponse,
163-
// Undefined,
169+
parserBufferByteLimit,
164170
);
165171
const inputTransformStream = new TransformStream<
166172
JSONRPCRequest,

tests/client/timeoutMiddleware.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ describe('timeoutMiddleware', () => {
164164
middlewareFactory: rpcUtilsMiddleware.defaultServerMiddlewareWrapper(
165165
timeoutMiddleware.timeoutMiddlewareServer,
166166
),
167-
streamKeepAliveTimeoutTime: 100,
167+
handlerTimeoutTime: 100,
168168
logger,
169169
});
170170
clientServer = await WebSocketServer.createWebSocketServer({

tests/rpc/RPCServer.test.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -789,7 +789,7 @@ describe(`${RPCServer.name}`, () => {
789789
manifest: {
790790
testMethod: new TestHandler({}),
791791
},
792-
streamKeepAliveTimeoutTime: 100,
792+
handlerTimeoutTime: 100,
793793
logger,
794794
});
795795
const [outputResult, outputStream] = rpcTestUtils.streamToArray();
@@ -821,7 +821,7 @@ describe(`${RPCServer.name}`, () => {
821821
test('timeout with default time before handler selected', async () => {
822822
const rpcServer = await RPCServer.createRPCServer({
823823
manifest: {},
824-
streamKeepAliveTimeoutTime: 100,
824+
handlerTimeoutTime: 100,
825825
logger,
826826
});
827827
const readWriteStream: RPCStream<Uint8Array, Uint8Array> = {
@@ -880,7 +880,7 @@ describe(`${RPCServer.name}`, () => {
880880
testShort: new TestMethodShortTimeout({}),
881881
testLong: new TestMethodLongTimeout({}),
882882
},
883-
streamKeepAliveTimeoutTime: 50,
883+
handlerTimeoutTime: 50,
884884
logger,
885885
});
886886
const streamShort = rpcTestUtils.messagesToReadableStream([
@@ -1045,8 +1045,8 @@ describe(`${RPCServer.name}`, () => {
10451045
manifest: {
10461046
testMethod: new TestHandler({}),
10471047
},
1048-
streamKeepAliveTimeoutTime: 50,
1049-
timeoutForceCloseTime: 100,
1048+
handlerTimeoutTime: 50,
1049+
handlerTimeoutGraceTime: 100,
10501050
logger,
10511051
});
10521052
const [, outputStream] = rpcTestUtils.streamToArray();
@@ -1136,7 +1136,7 @@ describe(`${RPCServer.name}`, () => {
11361136
manifest: {
11371137
testMethod: new TestHandler({}),
11381138
},
1139-
timeoutForceCloseTime: 0,
1139+
handlerTimeoutGraceTime: 0,
11401140
logger,
11411141
});
11421142
const [, outputStream] = rpcTestUtils.streamToArray();

0 commit comments

Comments
 (0)