Skip to content

Commit 8e19b76

Browse files
CMCDragonkaitegefaulkes
authored andcommitted
fix: change of head, tail destructuring for raw handlers, head and tail reconstruction via transform stream, and reverse pair propagation of cancellation event
1 parent a2ae510 commit 8e19b76

6 files changed

Lines changed: 89 additions & 63 deletions

File tree

src/rpc/RPCServer.ts

Lines changed: 60 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import type {
1515
import type { ReadableWritablePair } from 'stream/web';
1616
import type { JSONValue } from '../types';
1717
import type { MiddlewareFactory } from './types';
18+
import { TransformStream } from 'stream/web';
1819
import { ReadableStream } from 'stream/web';
1920
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
2021
import Logger from '@matrixai/logger';
@@ -191,14 +192,24 @@ class RPCServer extends EventTarget {
191192
O extends JSONValue,
192193
>(method: string, handler: DuplexHandlerImplementation<I, O>): void {
193194
const rawSteamHandler: RawHandlerImplementation = (
194-
[input, header],
195+
[header, input],
195196
connectionInfo,
196197
ctx,
197198
) => {
198199
// Setting up middleware
199-
const middleware = this.middlewareFactory(header);
200+
const middleware = this.middlewareFactory();
200201
// Forward from the client to the server
201-
const forwardStream = input.pipeThrough(middleware.forward);
202+
const headerStream = new TransformStream({
203+
start(controller) {
204+
controller.enqueue(Buffer.from(JSON.stringify(header)));
205+
},
206+
transform(chunk, controller) {
207+
controller.enqueue(chunk);
208+
},
209+
});
210+
const forwardStream = input
211+
.pipeThrough(headerStream)
212+
.pipeThrough(middleware.forward);
202213
// Reverse from the server to the client
203214
const reverseStream = middleware.reverse.writable;
204215
// Generator derived from handler
@@ -210,17 +221,22 @@ class RPCServer extends EventTarget {
210221
yield data.params as I;
211222
}
212223
};
213-
for await (const response of handler(inputGen(), connectionInfo, ctx)) {
224+
const handlerG = handler(inputGen(), connectionInfo, ctx);
225+
for await (const response of handlerG) {
214226
const responseMessage: JSONRPCResponseResult = {
215227
jsonrpc: '2.0',
216228
result: response,
217229
id: null,
218230
};
219-
yield responseMessage;
231+
try {
232+
yield responseMessage;
233+
} catch(e) {
234+
// This catches any exceptions thrown into the reverse stream
235+
await handlerG.throw(e);
236+
}
220237
}
221238
};
222239
const outputGenerator = outputGen();
223-
let reason: any | undefined = undefined;
224240
const reverseMiddlewareStream = new ReadableStream<JSONRPCResponse>({
225241
pull: async (controller) => {
226242
try {
@@ -231,39 +247,46 @@ class RPCServer extends EventTarget {
231247
}
232248
controller.enqueue(value);
233249
} catch (e) {
234-
if (reason == null) {
235-
// We want to convert this error to an error message and pass it along
236-
const rpcError: JSONRPCError = {
237-
code: e.exitCode ?? sysexits.UNKNOWN,
238-
message: e.description ?? '',
239-
data: rpcUtils.fromError(e, this.sensitive),
240-
};
241-
const rpcErrorMessage: JSONRPCResponseError = {
242-
jsonrpc: '2.0',
243-
error: rpcError,
244-
id: null,
245-
};
246-
controller.enqueue(rpcErrorMessage);
247-
} else {
248-
// These errors are emitted to the event system
249-
// This contains the original error from enqueuing
250-
this.dispatchEvent(
251-
new rpcEvents.RPCErrorEvent({
252-
detail: new rpcErrors.ErrorRPCSendErrorFailed(undefined, {
253-
cause: [e, reason],
254-
}),
255-
}),
256-
);
257-
}
250+
const rpcError: JSONRPCError = {
251+
code: e.exitCode ?? sysexits.UNKNOWN,
252+
message: e.description ?? '',
253+
data: rpcUtils.fromError(e, this.sensitive),
254+
};
255+
const rpcErrorMessage: JSONRPCResponseError = {
256+
jsonrpc: '2.0',
257+
error: rpcError,
258+
id: null,
259+
};
260+
controller.enqueue(rpcErrorMessage);
258261
await forwardStream.cancel(
259262
new rpcErrors.ErrorRPCHandlerFailed('Error clean up'),
260263
);
261264
controller.close();
262265
}
263266
},
264-
cancel: async (_reason) => {
265-
reason = _reason;
266-
await outputGenerator.throw(_reason);
267+
cancel: async (reason) => {
268+
try {
269+
// Throw the reason into the reverse stream
270+
await outputGenerator.throw(reason);
271+
} catch (e) {
272+
// If the e is the same as the reason
273+
// then the handler did not care about the reason
274+
// and we just discard it
275+
if (e !== reason) {
276+
this.dispatchEvent(
277+
new rpcEvents.RPCErrorEvent({
278+
detail: new rpcErrors.ErrorRPCSendErrorFailed(
279+
'Stream has been cancelled',
280+
{
281+
cause: e,
282+
}
283+
),
284+
}),
285+
);
286+
}
287+
}
288+
// await outputGenerator.nexj
289+
// handlerAbortController.abort(reason);
267290
},
268291
});
269292
void reverseMiddlewareStream.pipeTo(reverseStream).catch(() => {});
@@ -281,6 +304,9 @@ class RPCServer extends EventTarget {
281304
connectionInfo,
282305
ctx,
283306
) {
307+
// The `input` is expected to be an async iterable with only 1 value.
308+
// Unlike generators, there is no `next()` method.
309+
// So we use `break` after the first iteration.
284310
for await (const inputVal of input) {
285311
yield await handler(inputVal, connectionInfo, ctx);
286312
break;
@@ -369,7 +395,7 @@ class RPCServer extends EventTarget {
369395
return;
370396
}
371397
const outputStream = handler(
372-
[inputStream, leadingMetadataMessage],
398+
[leadingMetadataMessage, inputStream],
373399
connectionInfo,
374400
{ signal: abortController.signal },
375401
);

src/rpc/handlers.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ abstract class RawHandler<
2020
Container extends ContainerType = ContainerType,
2121
> extends Handler<Container> {
2222
abstract handle(
23-
input: [ReadableStream<Uint8Array>, JSONRPCRequest],
23+
input: [JSONRPCRequest, ReadableStream<Uint8Array>],
2424
connectionInfo: ConnectionInfo,
2525
ctx: ContextCancellable,
2626
): ReadableStream<Uint8Array>;

src/rpc/types.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ type HandlerImplementation<I, O> = (
182182
) => O;
183183

184184
type RawHandlerImplementation = HandlerImplementation<
185-
[ReadableStream<Uint8Array>, JSONRPCRequest],
185+
[JSONRPCRequest, ReadableStream<Uint8Array>],
186186
ReadableStream<Uint8Array>
187187
>;
188188

@@ -212,7 +212,18 @@ type StreamFactory = () => Promise<
212212
ReadableWritablePair<Uint8Array, Uint8Array>
213213
>;
214214

215-
type MiddlewareFactory<FR, FW, RR, RW> = (header?: JSONRPCRequest) => {
215+
/**
216+
* Middleware factory creates middlewares.
217+
* Each middleware is a pair of forward and reverse.
218+
* Each forward and reverse is a `ReadableWritablePair`.
219+
* The forward pair is used transform input from client to server.
220+
* The reverse pair is used to transform output from server to client.
221+
* FR, FW is the readable and writable types of the forward pair.
222+
* RR, RW is the readable and writable types of the reverse pair.
223+
* FW -> FR is the direction of data flow from client to server.
224+
* RW -> RR is the direction of data flow from server to client.
225+
*/
226+
type MiddlewareFactory<FR, FW, RR, RW> = () => {
216227
forward: ReadableWritablePair<FR, FW>;
217228
reverse: ReadableWritablePair<RR, RW>;
218229
};

src/rpc/utils/middleware.ts

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,10 @@ const jsonStreamParsers = require('@streamparser/json');
2020
* specific type of message
2121
* @param byteLimit - sets the number of bytes buffered before throwing an
2222
* error. This is used to avoid infinitely buffering the input.
23-
* @param firstMessage - This is a single message that is inserted into the
24-
* front of the stream.
2523
*/
2624
function binaryToJsonMessageStream<T extends JSONRPCMessage>(
2725
messageParser: (message: unknown) => T,
2826
byteLimit: number = 1024 * 1024,
29-
firstMessage?: T,
3027
): TransformStream<Uint8Array, T> {
3128
const parser = new jsonStreamParsers.JSONParser({
3229
separator: '',
@@ -43,7 +40,6 @@ function binaryToJsonMessageStream<T extends JSONRPCMessage>(
4340
await waitP.p;
4441
},
4542
start: (controller) => {
46-
if (firstMessage != null) controller.enqueue(firstMessage);
4743
parser.onValue = (value) => {
4844
const jsonMessage = messageParser(value.value);
4945
controller.enqueue(jsonMessage);
@@ -84,17 +80,12 @@ function jsonMessageToBinaryStream(): TransformStream<
8480
* This function is a factory for creating a pass-through streamPair. It is used
8581
* as the default middleware for the middleware wrappers.
8682
*/
87-
const defaultMiddleware: MiddlewareFactory<
88-
JSONRPCRequest,
89-
JSONRPCRequest,
90-
JSONRPCResponse,
91-
JSONRPCResponse
92-
> = () => {
83+
function defaultMiddleware() {
9384
return {
94-
forward: new TransformStream(),
95-
reverse: new TransformStream(),
85+
forward: new TransformStream<JSONRPCRequest, JSONRPCRequest>(),
86+
reverse: new TransformStream<JSONRPCResponse, JSONRPCResponse>(),
9687
};
97-
};
88+
}
9889

9990
/**
10091
* This convenience factory for creating wrapping middleware with the basic
@@ -103,28 +94,26 @@ const defaultMiddleware: MiddlewareFactory<
10394
* JsonRPCMessages and pipe it through the provided middleware.
10495
* The reverse path will pipe the output stream through the provided middleware
10596
* and then transform it back to a binary stream.
106-
* @param middleware - The provided middleware
97+
* @param middlewareFactory - The provided middleware
10798
*/
108-
const defaultServerMiddlewareWrapper = (
109-
middleware: MiddlewareFactory<
99+
function defaultServerMiddlewareWrapper(
100+
middlewareFactory: MiddlewareFactory<
110101
JSONRPCRequest,
111102
JSONRPCRequest,
112103
JSONRPCResponse,
113104
JSONRPCResponse
114105
> = defaultMiddleware,
115-
) => {
116-
return (header: JSONRPCRequest) => {
106+
): MiddlewareFactory<JSONRPCRequest, Uint8Array, Uint8Array, JSONRPCResponse> {
107+
return () => {
117108
const inputTransformStream = binaryToJsonMessageStream(
118109
rpcUtils.parseJSONRPCRequest,
119-
undefined,
120-
header,
121110
);
122111
const outputTransformStream = new TransformStream<
123112
JSONRPCResponseResult,
124113
JSONRPCResponseResult
125114
>();
126115

127-
const middleMiddleware = middleware(header);
116+
const middleMiddleware = middlewareFactory();
128117

129118
const forwardReadable = inputTransformStream.readable.pipeThrough(
130119
middleMiddleware.forward,
@@ -144,7 +133,7 @@ const defaultServerMiddlewareWrapper = (
144133
},
145134
};
146135
};
147-
};
136+
}
148137

149138
/**
150139
* This convenience factory for creating wrapping middleware with the basic
@@ -171,7 +160,7 @@ const defaultClientMiddlewareWrapper = (
171160
return () => {
172161
const outputTransformStream = binaryToJsonMessageStream(
173162
rpcUtils.parseJSONRPCResponse,
174-
undefined,
163+
// Undefined,
175164
);
176165
const inputTransformStream = new TransformStream<
177166
JSONRPCRequest,

tests/rpc/RPC.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ describe('RPC', () => {
4040
let header: JSONRPCRequest | undefined;
4141
class TestMethod extends RawHandler {
4242
public handle(
43-
input: [ReadableStream<Uint8Array>, JSONRPCRequest],
43+
input: [JSONRPCRequest, ReadableStream<Uint8Array>],
4444
): ReadableStream<Uint8Array> {
45-
const [stream, header_] = input;
45+
const [header_, stream] = input;
4646
header = header_;
4747
return stream;
4848
}

tests/rpc/RPCServer.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ describe(`${RPCServer.name}`, () => {
6767
rpcTestUtils.binaryStreamToSnippedStream([4, 7, 13, 2, 6]),
6868
);
6969
class TestHandler extends RawHandler {
70-
public handle([input, _header]): ReadableStream<Uint8Array> {
70+
public handle([_header, input]): ReadableStream<Uint8Array> {
7171
void (async () => {
7272
for await (const _ of input) {
7373
// No touch, only consume

0 commit comments

Comments
 (0)