Skip to content

Commit b0876c8

Browse files
committed
fix: RPCServer handling output stream cancellation
[ci skip]
1 parent 8e19b76 commit b0876c8

3 files changed

Lines changed: 89 additions & 40 deletions

File tree

src/rpc/RPCServer.ts

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -228,12 +228,7 @@ class RPCServer extends EventTarget {
228228
result: response,
229229
id: null,
230230
};
231-
try {
232-
yield responseMessage;
233-
} catch(e) {
234-
// This catches any exceptions thrown into the reverse stream
235-
await handlerG.throw(e);
236-
}
231+
yield responseMessage;
237232
}
238233
};
239234
const outputGenerator = outputGen();
@@ -258,35 +253,27 @@ class RPCServer extends EventTarget {
258253
id: null,
259254
};
260255
controller.enqueue(rpcErrorMessage);
261-
await forwardStream.cancel(
262-
new rpcErrors.ErrorRPCHandlerFailed('Error clean up'),
263-
);
256+
// Clean up the input stream here, ignore error if already ended
257+
await forwardStream
258+
.cancel(new rpcErrors.ErrorRPCHandlerFailed('Error clean up'))
259+
.catch(() => {});
264260
controller.close();
265261
}
266262
},
267263
cancel: async (reason) => {
268-
try {
269-
// Throw the reason into the reverse stream
270-
await outputGenerator.throw(reason);
271-
} catch (e) {
272-
// If the e is the same as the reason
273-
// then the handler did not care about the reason
274-
// and we just discard it
275-
if (e !== reason) {
276-
this.dispatchEvent(
277-
new rpcEvents.RPCErrorEvent({
278-
detail: new rpcErrors.ErrorRPCSendErrorFailed(
279-
'Stream has been cancelled',
280-
{
281-
cause: e,
282-
}
283-
),
284-
}),
285-
);
286-
}
287-
}
288-
// await outputGenerator.nexj
289-
// handlerAbortController.abort(reason);
264+
this.dispatchEvent(
265+
new rpcEvents.RPCErrorEvent({
266+
detail: new rpcErrors.ErrorRPCOutputStreamError(
267+
'Stream has been cancelled',
268+
{
269+
cause: reason,
270+
},
271+
),
272+
}),
273+
);
274+
// If the output stream path fails then we need to end the generator
275+
// early.
276+
await outputGenerator.return(undefined);
290277
},
291278
});
292279
void reverseMiddlewareStream.pipeTo(reverseStream).catch(() => {});

src/rpc/errors.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ class ErrorRPCMissingResponse<T> extends ErrorRPC<T> {
3838
exitCode = sysexits.UNAVAILABLE;
3939
}
4040

41-
class ErrorRPCSendErrorFailed<T> extends ErrorRPC<T> {
42-
static description = 'Failed to send error message';
41+
class ErrorRPCOutputStreamError<T> extends ErrorRPC<T> {
42+
static description = 'Output stream failed, unable to send data';
4343
exitCode = sysexits.UNAVAILABLE;
4444
}
4545

@@ -102,6 +102,6 @@ export {
102102
ErrorRPCHandlerFailed,
103103
ErrorRPCMessageLength,
104104
ErrorRPCMissingResponse,
105-
ErrorRPCSendErrorFailed,
105+
ErrorRPCOutputStreamError,
106106
ErrorPolykeyRemote,
107107
};

tests/rpc/RPCServer.test.ts

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {
2424
UnaryHandler,
2525
} from '@/rpc/handlers';
2626
import * as middlewareUtils from '@/rpc/utils/middleware';
27+
import { promise } from '@/utils';
2728
import * as rpcTestUtils from './utils';
2829

2930
describe(`${RPCServer.name}`, () => {
@@ -454,13 +455,70 @@ describe(`${RPCServer.name}`, () => {
454455
},
455456
);
456457
testProp(
457-
'should emit stream error',
458+
'should emit stream error if input stream fails',
458459
[specificMessageArb],
459460
async (messages) => {
461+
const handlerEndedProm = promise();
462+
class TestMethod extends DuplexHandler {
463+
public async *handle(input): AsyncIterable<JSONValue> {
464+
try {
465+
for await (const _ of input) {
466+
// Consume but don't yield anything
467+
}
468+
} finally {
469+
handlerEndedProm.resolveP();
470+
}
471+
}
472+
}
473+
const rpcServer = await RPCServer.createRPCServer({
474+
manifest: {
475+
testMethod: new TestMethod({}),
476+
},
477+
logger,
478+
});
479+
let resolve;
480+
rpcServer.addEventListener('error', (thing: RPCErrorEvent) => {
481+
resolve(thing);
482+
});
483+
const passThroughStreamIn = new TransformStream<Uint8Array, Uint8Array>();
484+
const [outputResult, outputStream] = rpcTestUtils.streamToArray<Buffer>();
485+
const readWriteStream: ReadableWritablePair<Uint8Array, Uint8Array> = {
486+
readable: passThroughStreamIn.readable,
487+
writable: outputStream,
488+
};
489+
rpcServer.handleStream(readWriteStream, {} as ConnectionInfo);
490+
const writer = passThroughStreamIn.writable.getWriter();
491+
// Write messages
492+
for (const message of messages) {
493+
await writer.write(Buffer.from(JSON.stringify(message)));
494+
}
495+
// Abort stream
496+
const writerReason = Symbol('writerAbort');
497+
await writer.abort(writerReason);
498+
// We should get an error RPC message
499+
await expect(outputResult).toResolve();
500+
const errorMessage = JSON.parse((await outputResult)[0].toString());
501+
// Parse without error
502+
rpcUtils.parseJSONRPCResponseError(errorMessage);
503+
// Check that the handler was cleaned up.
504+
await expect(handlerEndedProm.p).toResolve();
505+
await rpcServer.destroy();
506+
},
507+
{ numRuns: 1 },
508+
);
509+
testProp.only(
510+
'should emit stream error if output stream fails',
511+
[specificMessageArb],
512+
async (messages) => {
513+
const handlerEndedProm = promise();
460514
class TestMethod extends DuplexHandler {
461515
public async *handle(input): AsyncIterable<JSONValue> {
462516
// Echo input
463-
yield* input;
517+
try {
518+
yield* input;
519+
} finally {
520+
handlerEndedProm.resolveP();
521+
}
464522
}
465523
}
466524
const rpcServer = await RPCServer.createRPCServer({
@@ -494,14 +552,18 @@ describe(`${RPCServer.name}`, () => {
494552
await reader.read();
495553
}
496554
// Abort stream
497-
const writerReason = Symbol('writerAbort');
555+
// const writerReason = Symbol('writerAbort');
498556
const readerReason = Symbol('readerAbort');
499-
await writer.abort(writerReason);
557+
// Await writer.abort(writerReason);
500558
await reader.cancel(readerReason);
501559
// We should get an error event
502560
const event = await errorProm;
503-
expect(event.detail.cause).toContain(writerReason);
504-
expect(event.detail.cause).toContain(readerReason);
561+
await writer.close();
562+
// Expect(event.detail.cause).toContain(writerReason);
563+
expect(event.detail).toBeInstanceOf(rpcErrors.ErrorRPCOutputStreamError);
564+
expect(event.detail.cause).toBe(readerReason);
565+
// Check that the handler was cleaned up.
566+
await expect(handlerEndedProm.p).toResolve();
505567
await rpcServer.destroy();
506568
},
507569
{ numRuns: 1 },

0 commit comments

Comments
 (0)