Skip to content

Commit 60e3d13

Browse files
committed
fix: handler receives signal if output stream fails
Enabling CI.
1 parent a6f69f2 commit 60e3d13

3 files changed

Lines changed: 24 additions & 4 deletions

File tree

src/rpc/RPCServer.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,13 @@ class RPCServer extends EventTarget {
196196
connectionInfo,
197197
ctx,
198198
) => {
199+
// Setting up abort controller
200+
const abortController = new AbortController();
201+
if (ctx.signal.aborted) abortController.abort(ctx.signal.reason);
202+
ctx.signal.addEventListener('abort', () => {
203+
abortController.abort(ctx.signal.reason);
204+
});
205+
const signal = abortController.signal;
199206
// Setting up middleware
200207
const middleware = this.middlewareFactory();
201208
// Forward from the client to the server
@@ -214,14 +221,14 @@ class RPCServer extends EventTarget {
214221
const reverseStream = middleware.reverse.writable;
215222
// Generator derived from handler
216223
const outputGen = async function* (): AsyncGenerator<JSONRPCResponse> {
217-
if (ctx.signal.aborted) throw ctx.signal.reason;
224+
if (signal.aborted) throw signal.reason;
218225
// Input generator derived from the forward stream
219226
const inputGen = async function* (): AsyncIterable<I> {
220227
for await (const data of forwardStream) {
221228
yield data.params as I;
222229
}
223230
};
224-
const handlerG = handler(inputGen(), connectionInfo, ctx);
231+
const handlerG = handler(inputGen(), connectionInfo, { signal });
225232
for await (const response of handlerG) {
226233
const responseMessage: JSONRPCResponseResult = {
227234
jsonrpc: '2.0',
@@ -271,6 +278,8 @@ class RPCServer extends EventTarget {
271278
),
272279
}),
273280
);
281+
// Abort with the reason
282+
abortController.abort(reason);
274283
// If the output stream path fails then we need to end the generator
275284
// early.
276285
await outputGenerator.return(undefined);

src/rpc/handlers.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ abstract class DuplexHandler<
3131
Input extends JSONValue = JSONValue,
3232
Output extends JSONValue = JSONValue,
3333
> extends Handler<Container, Input, Output> {
34+
/**
35+
* Note that if the output has an error, the handler will not see this as an
36+
* error. If you need to handle any clean up it should be handled in a
37+
* `finally` block and check the abort signal for potential errors.
38+
*/
3439
abstract handle(
3540
input: AsyncIterable<Input>,
3641
connectionInfo: ConnectionInfo,

tests/rpc/RPCServer.test.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,13 +506,15 @@ describe(`${RPCServer.name}`, () => {
506506
},
507507
{ numRuns: 1 },
508508
);
509-
testProp.only(
509+
testProp(
510510
'should emit stream error if output stream fails',
511511
[specificMessageArb],
512512
async (messages) => {
513513
const handlerEndedProm = promise();
514+
let ctx: ContextCancellable | undefined;
514515
class TestMethod extends DuplexHandler {
515-
public async *handle(input): AsyncIterable<JSONValue> {
516+
public async *handle(input, _, _ctx): AsyncIterable<JSONValue> {
517+
ctx = _ctx;
516518
// Echo input
517519
try {
518520
yield* input;
@@ -564,6 +566,10 @@ describe(`${RPCServer.name}`, () => {
564566
expect(event.detail.cause).toBe(readerReason);
565567
// Check that the handler was cleaned up.
566568
await expect(handlerEndedProm.p).toResolve();
569+
// Check that an abort signal happened
570+
expect(ctx).toBeDefined();
571+
expect(ctx?.signal.aborted).toBeTrue();
572+
expect(ctx?.signal.reason).toBe(readerReason);
567573
await rpcServer.destroy();
568574
},
569575
{ numRuns: 1 },

0 commit comments

Comments
 (0)