Skip to content

Commit f5030d6

Browse files
committed
fix: test fixes
1 parent 3c1d525 commit f5030d6

4 files changed

Lines changed: 27 additions & 60 deletions

File tree

src/rpc/RPCClient.ts

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import type {
1414
MapCallers,
1515
} from './types';
1616
import type { ContextTimed } from '../contexts/types';
17-
import { TransformStream } from 'stream/web';
1817
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
1918
import Logger from '@matrixai/logger';
2019
import { Timer } from '@matrixai/timer';
@@ -23,6 +22,8 @@ import * as rpcErrors from './errors';
2322
import * as rpcUtils from './utils/utils';
2423
import { never, promise } from '../utils';
2524

25+
const timerCleanupReasonSymbol = Symbol('timerCleanUpReasonSymbol');
26+
2627
// eslint-disable-next-line
2728
interface RPCClient<M extends ClientManifest> extends CreateDestroy {}
2829
@CreateDestroy()
@@ -276,7 +277,7 @@ class RPCClient<M extends ClientManifest> {
276277
});
277278
const cleanUp = () => {
278279
// Clean up the timer and signal
279-
if (ctx.timer == null) timer.cancel(Error('TMP Clean up reason'));
280+
if (ctx.timer == null) timer.cancel(timerCleanupReasonSymbol);
280281
signal.removeEventListener('abort', abortHandler);
281282
};
282283
// Setting up abort events for timeout
@@ -352,14 +353,14 @@ class RPCClient<M extends ClientManifest> {
352353
};
353354
}
354355

355-
// FIXME: the CTX timeout here is just for stream creation. We can't/wont do
356-
// keep alive timeout for raw streams.
357356
/**
358357
* Generic caller for raw RPC calls.
359358
* This returns a `ReadableWritablePair` of the raw RPC stream.
360359
* When finished the streams must be ended manually. Failing to do so will
361360
* hold the connection open and result in a resource leak until the
362361
* call times out.
362+
* Raw streams don't support the keep alive timeout. Timeout will only apply\
363+
* to the creation of the stream.
363364
* @param method - Method name of the RPC call
364365
* @param headerParams - Parameters for the header message. The header is a
365366
* single RPC message that is sent to specify the method for the RPC call.
@@ -376,7 +377,7 @@ class RPCClient<M extends ClientManifest> {
376377
const signal = abortController.signal;
377378
// A promise that will reject if there is an abort signal or timeout
378379
const abortRaceProm = promise<never>();
379-
// Prevent unhandled rejection when we're don with the promise
380+
// Prevent unhandled rejection when we're done with the promise
380381
abortRaceProm.p.catch(() => {});
381382
let abortHandler: () => void;
382383
if (ctx.signal != null) {
@@ -393,10 +394,9 @@ class RPCClient<M extends ClientManifest> {
393394
new Timer({
394395
delay: this.streamKeepAliveTimeoutTime,
395396
});
396-
// Ignore unhandled rejections
397397
const cleanUp = () => {
398398
// Clean up the timer and signal
399-
if (ctx.timer == null) timer.cancel(Error('TMP Clean up reason'));
399+
if (ctx.timer == null) timer.cancel(timerCleanupReasonSymbol);
400400
signal.removeEventListener('abort', abortHandler);
401401
};
402402
const timeoutError = new rpcErrors.ErrorRPCTimedOut();
@@ -423,27 +423,16 @@ class RPCClient<M extends ClientManifest> {
423423
};
424424
try {
425425
rpcStream = await Promise.race([setupStream(), abortRaceProm.p]);
426-
} catch (e) {
426+
} finally {
427427
cleanUp();
428-
throw e;
429428
}
430-
// Need to tell when a stream has ended to clean up the timer
431-
const forwardStream = new TransformStream<Uint8Array, Uint8Array>();
432-
const reverseStream = new TransformStream<Uint8Array, Uint8Array>();
433-
434-
void Promise.all([
435-
rpcStream.readable.pipeTo(reverseStream.writable),
436-
forwardStream.readable.pipeTo(rpcStream.writable),
437-
]).finally(() => {
438-
cleanUp();
439-
});
440429
const metadata = {
441430
...(rpcStream.meta ?? {}),
442431
command: method,
443432
};
444433
return {
445-
writable: forwardStream.writable,
446-
readable: reverseStream.readable,
434+
writable: rpcStream.writable,
435+
readable: rpcStream.readable,
447436
cancel: rpcStream.cancel,
448437
meta: metadata,
449438
};

src/rpc/RPCServer.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,12 @@ class RPCServer extends EventTarget {
207207
this.logger.info(`Destroyed ${this.constructor.name}`);
208208
}
209209

210+
/**
211+
* Registers a raw stream handler. This is the basis for all handlers as
212+
* handling the streams is done with raw streams only.
213+
* The raw streams do not automatically refresh the timeout timer when
214+
* messages are sent or received.
215+
*/
210216
protected registerRawStreamHandler(
211217
method: string,
212218
handler: RawHandlerImplementation,

tests/rpc/RPCClient.test.ts

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -785,42 +785,6 @@ describe(`${RPCClient.name}`, () => {
785785
expect(ctx?.signal.aborted).toBeTrue();
786786
expect(ctx?.signal.reason).toBe(rejectReason);
787787
});
788-
test('raw caller uses default timeout awaiting stream', async () => {
789-
const forwardPassThroughStream = new TransformStream<
790-
Uint8Array,
791-
Uint8Array
792-
>();
793-
const reversePassThroughStream = new TransformStream<
794-
Uint8Array,
795-
Uint8Array
796-
>();
797-
const streamPair: RPCStream<Uint8Array, Uint8Array> = {
798-
cancel: () => {},
799-
meta: undefined,
800-
writable: forwardPassThroughStream.writable,
801-
readable: reversePassThroughStream.readable,
802-
};
803-
let ctx: ContextTimed | undefined;
804-
const rpcClient = await RPCClient.createRPCClient({
805-
manifest: {},
806-
streamFactory: async (ctx_) => {
807-
ctx = ctx_;
808-
return streamPair;
809-
},
810-
streamKeepAliveTimeoutTime: 200,
811-
logger,
812-
});
813-
814-
// Timing out on stream.
815-
// Stream creation needs to read the header to complete.
816-
await Promise.all([
817-
rpcClient.rawStreamCaller('testMethod', {}),
818-
forwardPassThroughStream.readable.getReader().read(),
819-
]);
820-
await ctx?.timer;
821-
expect(ctx?.signal.aborted).toBeTrue();
822-
expect(ctx?.signal.reason).toBeInstanceOf(rpcErrors.ErrorRPCTimedOut);
823-
});
824788
test('raw caller times out awaiting stream', async () => {
825789
const forwardPassThroughStream = new TransformStream<
826790
Uint8Array,

tests/rpc/utils/utils.test.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { testProp } from '@fast-check/jest';
1+
import { testProp, fc } from '@fast-check/jest';
22
import * as rpcUtils from '@/rpc/utils';
33
import 'ix/add/asynciterable-operators/toarray';
44
import * as rpcTestUtils from '../utils';
@@ -12,6 +12,14 @@ describe('utils tests', () => {
1212
},
1313
{ numRuns: 1000 },
1414
);
15-
// TODO:
16-
// - Test for badly structured data
15+
testProp(
16+
'malformed data cases parsing errors',
17+
[fc.json()],
18+
async (message) => {
19+
expect(() =>
20+
rpcUtils.parseJSONRPCMessage(Buffer.from(JSON.stringify(message))),
21+
).toThrow();
22+
},
23+
{ numRuns: 1000 },
24+
);
1725
});

0 commit comments

Comments
 (0)