Skip to content

Commit 1269570

Browse files
authored
Merge pull request #506 from MatrixAI/feature-websocket_client
Websocket for Client Service API
2 parents 36d518f + c6feceb commit 1269570

22 files changed

Lines changed: 2406 additions & 411 deletions

package-lock.json

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
"tslib": "^2.4.0",
123123
"tsyringe": "^4.7.0",
124124
"utp-native": "^2.5.3",
125+
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.19.0",
125126
"ws": "^8.12.0"
126127
},
127128
"devDependencies": {

src/RPC/RPCServer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ import type {
1010
RawHandlerImplementation,
1111
ServerHandlerImplementation,
1212
UnaryHandlerImplementation,
13+
ConnectionInfo,
1314
} from './types';
1415
import type { ReadableWritablePair } from 'stream/web';
1516
import type { JSONValue } from '../types';
16-
import type { ConnectionInfo } from '../network/types';
1717
import type { RPCErrorEvent } from './utils';
1818
import type { MiddlewareFactory } from './types';
1919
import { ReadableStream } from 'stream/web';

src/RPC/handlers.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import type { JSONValue } from 'types';
22
import type { ContainerType } from 'RPC/types';
33
import type { ReadableStream } from 'stream/web';
44
import type { JsonRpcRequest } from 'RPC/types';
5-
import type { ConnectionInfo } from '../network/types';
5+
import type { ConnectionInfo } from './types';
66
import type { ContextCancellable } from '../contexts/types';
77

88
abstract class Handler<

src/RPC/types.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import type { JSONValue } from '../types';
2-
import type { ConnectionInfo } from '../network/types';
32
import type { ContextCancellable } from '../contexts/types';
43
import type { ReadableStream, ReadableWritablePair } from 'stream/web';
54
import type { Handler } from './handlers';
@@ -11,6 +10,8 @@ import type {
1110
ClientCaller,
1211
UnaryCaller,
1312
} from './callers';
13+
import type { NodeId } from '../nodes/types';
14+
import type { Certificate } from '../keys/types';
1415

1516
/**
1617
* This is the JSON RPC request object. this is the generic message type used for the RPC.
@@ -108,6 +109,24 @@ type JsonRpcMessage<T extends JSONValue = JSONValue> =
108109
| JsonRpcRequest<T>
109110
| JsonRpcResponse<T>;
110111

112+
/**
113+
* Proxy connection information
114+
* @property remoteNodeId - NodeId of the remote connecting node
115+
* @property remoteCertificates - Certificate chain of the remote connecting node
116+
* @property localHost - Proxy host of the local connecting node
117+
* @property localPort - Proxy port of the local connecting node
118+
* @property remoteHost - Proxy host of the remote connecting node
119+
* @property remotePort - Proxy port of the remote connecting node
120+
*/
121+
type ConnectionInfo = Partial<{
122+
remoteNodeId: NodeId;
123+
remoteCertificates: Array<Certificate>;
124+
localHost: string;
125+
localPort: number;
126+
remoteHost: string;
127+
remotePort: number;
128+
}>;
129+
111130
// Handler types
112131
type HandlerImplementation<I, O> = (
113132
input: I,
@@ -218,6 +237,7 @@ export type {
218237
JsonRpcRequest,
219238
JsonRpcResponse,
220239
JsonRpcMessage,
240+
ConnectionInfo,
221241
HandlerImplementation,
222242
RawHandlerImplementation,
223243
DuplexHandlerImplementation,

src/clientRPC/errors.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { ErrorPolykey, sysexits } from '../errors';
2+
3+
class ErrorRPC<T> extends ErrorPolykey<T> {}
4+
5+
class ErrorRPCClient<T> extends ErrorRPC<T> {}
6+
7+
class ErrorClientAuthMissing<T> extends ErrorRPCClient<T> {
8+
static description = 'Authorisation metadata is required but missing';
9+
exitCode = sysexits.NOPERM;
10+
}
11+
12+
class ErrorClientAuthFormat<T> extends ErrorRPCClient<T> {
13+
static description = 'Authorisation metadata has invalid format';
14+
exitCode = sysexits.USAGE;
15+
}
16+
17+
class ErrorClientAuthDenied<T> extends ErrorRPCClient<T> {
18+
static description = 'Authorisation metadata is incorrect or expired';
19+
exitCode = sysexits.NOPERM;
20+
}
21+
22+
export {
23+
ErrorRPC,
24+
ErrorRPCClient,
25+
ErrorClientAuthMissing,
26+
ErrorClientAuthFormat,
27+
ErrorClientAuthDenied,
28+
};

src/clientRPC/utils.ts

Lines changed: 6 additions & 214 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,9 @@
1-
import type { SessionToken } from '../sessions/types';
2-
import type KeyRing from '../keys/KeyRing';
3-
import type SessionManager from '../sessions/SessionManager';
41
import type { RPCRequestParams } from './types';
5-
import type { JsonRpcRequest } from '../RPC/types';
6-
import type { ReadableWritablePair } from 'stream/web';
7-
import type Logger from '@matrixai/logger';
8-
import type { ConnectionInfo, Host, Port } from '../network/types';
9-
import type RPCServer from '../RPC/RPCServer';
10-
import type { TLSSocket } from 'tls';
11-
import type { Server } from 'https';
12-
import type net from 'net';
13-
import type https from 'https';
14-
import { ReadableStream, WritableStream } from 'stream/web';
15-
import WebSocket, { WebSocketServer } from 'ws';
16-
import * as clientErrors from '../client/errors';
17-
import { promise } from '../utils';
2+
import type SessionManager from 'sessions/SessionManager';
3+
import type KeyRing from 'keys/KeyRing';
4+
import type { JsonRpcRequest } from 'RPC/types';
5+
import type { SessionToken } from 'sessions/types';
6+
import * as clientErrors from './errors';
187

198
async function authenticate(
209
sessionManager: SessionManager,
@@ -65,201 +54,4 @@ function encodeAuthFromPassword(password: string): string {
6554
return `Basic ${encoded}`;
6655
}
6756

68-
function readableFromWebSocket(
69-
ws: WebSocket,
70-
logger: Logger,
71-
): ReadableStream<Uint8Array> {
72-
return new ReadableStream<Uint8Array>({
73-
start: (controller) => {
74-
logger.info('starting');
75-
const messageHandler = (data) => {
76-
logger.debug(`message: ${data.toString()}`);
77-
ws.pause();
78-
const message = data as Buffer;
79-
if (message.length === 0) {
80-
logger.info('ENDING');
81-
ws.removeAllListeners('message');
82-
try {
83-
controller.close();
84-
} catch {
85-
// Ignore already closed
86-
}
87-
return;
88-
}
89-
controller.enqueue(message);
90-
};
91-
ws.on('message', messageHandler);
92-
ws.once('close', () => {
93-
logger.info('closed');
94-
ws.removeListener('message', messageHandler);
95-
try {
96-
controller.close();
97-
} catch {
98-
// Ignore already closed
99-
}
100-
});
101-
ws.once('error', (e) => {
102-
controller.error(e);
103-
});
104-
},
105-
cancel: () => {
106-
logger.info('cancelled');
107-
ws.close();
108-
},
109-
pull: () => {
110-
logger.debug('resuming');
111-
ws.resume();
112-
},
113-
});
114-
}
115-
116-
function writeableFromWebSocket(
117-
ws: WebSocket,
118-
holdOpen: boolean,
119-
logger: Logger,
120-
): WritableStream<Uint8Array> {
121-
return new WritableStream<Uint8Array>({
122-
start: (controller) => {
123-
logger.info('starting');
124-
ws.once('error', (e) => {
125-
logger.error(`error: ${e}`);
126-
controller.error(e);
127-
});
128-
ws.once('close', (code, reason) => {
129-
logger.info(
130-
`ws closing early! with code: ${code} and reason: ${reason.toString()}`,
131-
);
132-
controller.error(Error('TMP WebSocket Closed early'));
133-
});
134-
},
135-
close: () => {
136-
logger.info('stream closing');
137-
ws.send(Buffer.from([]));
138-
if (!holdOpen) ws.terminate();
139-
},
140-
abort: () => {
141-
logger.info('aborting');
142-
ws.close();
143-
},
144-
write: async (chunk, controller) => {
145-
logger.debug(`writing: ${chunk?.toString()}`);
146-
const wait = promise<void>();
147-
ws.send(chunk, (e) => {
148-
if (e != null) {
149-
logger.error(`error: ${e}`);
150-
controller.error(e);
151-
}
152-
wait.resolveP();
153-
});
154-
await wait.p;
155-
},
156-
});
157-
}
158-
159-
function webSocketToWebStreamPair(
160-
ws: WebSocket,
161-
holdOpen: boolean,
162-
logger: Logger,
163-
): ReadableWritablePair<Uint8Array, Uint8Array> {
164-
return {
165-
readable: readableFromWebSocket(ws, logger.getChild('readable')),
166-
writable: writeableFromWebSocket(ws, holdOpen, logger.getChild('writable')),
167-
};
168-
}
169-
170-
function startConnection(
171-
host: string,
172-
port: number,
173-
logger: Logger,
174-
): Promise<ReadableWritablePair<Uint8Array, Uint8Array>> {
175-
const ws = new WebSocket(`wss://${host}:${port}`, {
176-
// CheckServerIdentity: (
177-
// servername: string,
178-
// cert: WebSocket.CertMeta,
179-
// ): boolean => {
180-
// console.log('CHECKING IDENTITY');
181-
// console.log(servername);
182-
// console.log(cert);
183-
// return false;
184-
// },
185-
rejectUnauthorized: false,
186-
// Ca: tlsConfig.certChainPem
187-
});
188-
ws.once('close', () => logger.info('CLOSED'));
189-
// Ws.once('upgrade', () => {
190-
// // Const tlsSocket = request.socket as TLSSocket;
191-
// // Console.log(tlsSocket.getPeerCertificate());
192-
// logger.info('Test early cancellation');
193-
// // Request.destroy(Error('some error'));
194-
// // tlsSocket.destroy(Error('some error'));
195-
// // ws.close(12345, 'some reason');
196-
// // TODO: Use the existing verify method from the GRPC implementation
197-
// // TODO: Have this emit an error on verification failure.
198-
// // It's fine for the server side to close abruptly without error
199-
// });
200-
const prom = promise<ReadableWritablePair<Uint8Array, Uint8Array>>();
201-
ws.once('open', () => {
202-
logger.info('starting connection');
203-
prom.resolveP(webSocketToWebStreamPair(ws, true, logger));
204-
});
205-
return prom.p;
206-
}
207-
208-
function handleConnection(ws: WebSocket, logger: Logger): void {
209-
ws.once('close', () => logger.info('CLOSED'));
210-
const readable = readableFromWebSocket(ws, logger.getChild('readable'));
211-
const writable = writeableFromWebSocket(
212-
ws,
213-
false,
214-
logger.getChild('writable'),
215-
);
216-
void readable.pipeTo(writable).catch((e) => logger.error(e));
217-
}
218-
219-
function createClientServer(
220-
server: Server,
221-
rpcServer: RPCServer,
222-
logger: Logger,
223-
) {
224-
logger.info('created server');
225-
const wss = new WebSocketServer({
226-
server,
227-
});
228-
wss.on('error', (e) => logger.error(e));
229-
logger.info('created wss');
230-
wss.on('connection', (ws, req) => {
231-
logger.info('connection!');
232-
const socket = req.socket as TLSSocket;
233-
const streamPair = webSocketToWebStreamPair(ws, false, logger);
234-
rpcServer.handleStream(streamPair, {
235-
localHost: socket.localAddress! as Host,
236-
localPort: socket.localPort! as Port,
237-
remoteCertificates: socket.getPeerCertificate(),
238-
remoteHost: socket.remoteAddress! as Host,
239-
remotePort: socket.remotePort! as Port,
240-
} as unknown as ConnectionInfo);
241-
});
242-
wss.once('close', () => {
243-
wss.removeAllListeners('error');
244-
wss.removeAllListeners('connection');
245-
});
246-
return wss;
247-
}
248-
249-
async function listen(server: https.Server, host?: string, port?: number) {
250-
await new Promise<void>((resolve) => {
251-
server.listen(port, host ?? '127.0.0.1', undefined, () => resolve());
252-
});
253-
const addressInfo = server.address() as net.AddressInfo;
254-
return addressInfo.port;
255-
}
256-
257-
export {
258-
authenticate,
259-
decodeAuth,
260-
encodeAuthFromPassword,
261-
startConnection,
262-
handleConnection,
263-
createClientServer,
264-
listen,
265-
};
57+
export { authenticate, decodeAuth, encodeAuthFromPassword };

src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ interface FileSystem {
110110
readdir: typeof fs.promises.readdir;
111111
rename: typeof fs.promises.rename;
112112
open: typeof fs.promises.open;
113+
mkdtemp: typeof fs.promises.mkdtemp;
113114
};
114115
constants: typeof fs.constants;
115116
}

0 commit comments

Comments
 (0)