Skip to content

Commit 70c20e0

Browse files
committed
feat: streams are now using a RPCStream interface that extends ReadableWritablePair and includes a cancel(reason?: any) method and generic metadata
- created `RPCStream` interface, updating code - fixing tests - client now includes metadata in remote errors - `RPCServer` includes metadata in the `RPCStream` and propagates cancel/meta to middleware - improved cancellation feature of the server `RPCStream` [ci skip]
1 parent 21e2a6a commit 70c20e0

85 files changed

Lines changed: 726 additions & 516 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/PolykeyAgent.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ class PolykeyAgent {
452452
webSocketServerClient ??
453453
(await WebSocketServer.createWebSocketServer({
454454
connectionCallback: (streamPair) =>
455-
rpcServerClient!.handleStream(streamPair, {}),
455+
rpcServerClient!.handleStream(streamPair),
456456
fs,
457457
host: networkConfig_.clientHost,
458458
port: networkConfig_.clientPort,
@@ -775,7 +775,7 @@ class PolykeyAgent {
775775
host: _networkConfig.clientHost,
776776
port: _networkConfig.clientPort,
777777
connectionCallback: (streamPair) =>
778-
this.rpcServerClient.handleStream(streamPair, {}),
778+
this.rpcServerClient.handleStream(streamPair),
779779
});
780780
// Agent server
781781
await this.grpcServerAgent.start({

src/bin/utils/utils.ts

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,7 @@ function outputFormatter(msg: OutputObject): string | Uint8Array {
112112
let currError = msg.data;
113113
let indent = ' ';
114114
while (currError != null) {
115-
if (
116-
currError instanceof grpcErrors.ErrorPolykeyRemoteOLD ||
117-
currError instanceof rpcErrors.ErrorPolykeyRemote
118-
) {
115+
if (currError instanceof grpcErrors.ErrorPolykeyRemoteOLD) {
119116
output += `${currError.name}: ${currError.description}`;
120117
if (currError.message && currError.message !== '') {
121118
output += ` - ${currError.message}`;
@@ -130,6 +127,20 @@ function outputFormatter(msg: OutputObject): string | Uint8Array {
130127
output += `${indent}timestamp\t${currError.timestamp}\n`;
131128
output += `${indent}cause: `;
132129
currError = currError.cause;
130+
} else if (currError instanceof rpcErrors.ErrorPolykeyRemote) {
131+
output += `${currError.name}: ${currError.description}`;
132+
if (currError.message && currError.message !== '') {
133+
output += ` - ${currError.message}`;
134+
}
135+
if (currError.metadata != null) {
136+
output += '\n';
137+
for (const [key, value] of Object.entries(currError.metadata)) {
138+
output += `${indent}${key}\t${value}\n`;
139+
}
140+
}
141+
output += `${indent}timestamp\t${currError.timestamp}\n`;
142+
output += `${indent}cause: `;
143+
currError = currError.cause;
133144
} else if (currError instanceof ErrorPolykey) {
134145
output += `${currError.name}: ${currError.description}`;
135146
if (currError.message && currError.message !== '') {

src/client/handlers/gestaltsGestaltList.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ class GestaltsGestaltListHandler extends ServerHandler<
1515
> {
1616
public async *handle(
1717
_input,
18-
_connectionInfo,
18+
_cancel,
19+
_meta,
1920
ctx,
2021
): AsyncGenerator<ClientRPCResponseResult<GestaltMessage>> {
2122
const { db, gestaltGraph } = this.container;

src/client/handlers/identitiesAuthenticate.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ class IdentitiesAuthenticateHandler extends ServerHandler<
2121
input: ClientRPCRequestParams<{
2222
providerId: string;
2323
}>,
24-
_,
24+
_cancel,
25+
_meta,
2526
ctx,
2627
): AsyncGenerator<ClientRPCResponseResult<AuthProcessMessage>> {
2728
if (ctx.signal.aborted) throw ctx.signal.reason;

src/client/handlers/identitiesAuthenticatedGet.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ class IdentitiesAuthenticatedGetHandler extends ServerHandler<
2020
input: ClientRPCRequestParams<{
2121
providerId?: string;
2222
}>,
23-
_,
23+
_cancel,
24+
_meta,
2425
ctx,
2526
): AsyncGenerator<ClientRPCResponseResult<IdentityMessage>> {
2627
if (ctx.signal.aborted) throw ctx.signal.reason;

src/client/handlers/identitiesInfoConnectedGet.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ class IdentitiesInfoConnectedGetHandler extends ServerHandler<
1818
> {
1919
public async *handle(
2020
input: ClientRPCRequestParams<ProviderSearchMessage>,
21-
_,
21+
_cancel,
22+
_meta,
2223
ctx,
2324
): AsyncGenerator<ClientRPCResponseResult<IdentityInfoMessage>> {
2425
if (ctx.signal.aborted) throw ctx.signal.reason;

src/client/handlers/identitiesInfoGet.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ class IdentitiesInfoGetHandler extends ServerHandler<
1919
> {
2020
public async *handle(
2121
input: ClientRPCRequestParams<ProviderSearchMessage>,
22-
_,
22+
_cancel,
23+
_meta,
2324
ctx,
2425
): AsyncGenerator<ClientRPCResponseResult<IdentityInfoMessage>> {
2526
if (ctx.signal.aborted) throw ctx.signal.reason;

src/client/handlers/keysCertsChainGet.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ class KeysCertsChainGetHandler extends ServerHandler<
1212
> {
1313
public async *handle(
1414
_input,
15-
_connectionInfo,
15+
_cancel,
16+
_meta,
1617
ctx,
1718
): AsyncGenerator<ClientRPCResponseResult<CertMessage>> {
1819
const { certManager } = this.container;

src/client/handlers/nodesGetAll.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ class NodesGetAllHandler extends ServerHandler<
1717
> {
1818
public async *handle(
1919
_input,
20-
_connectionUInfo,
20+
_cancel,
21+
_meta,
2122
ctx,
2223
): AsyncGenerator<ClientRPCResponseResult<NodesGetMessage>> {
2324
if (ctx.signal.aborted) throw ctx.signal.reason;

src/client/handlers/nodesListConnections.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ class NodesListConnectionsHandler extends ServerHandler<
1313
> {
1414
public async *handle(
1515
_input,
16-
_connectionInfo,
16+
_cancel,
17+
_meta,
1718
ctx,
1819
): AsyncGenerator<ClientRPCResponseResult<NodeConnectionMessage>> {
1920
const { nodeConnectionManager } = this.container;

0 commit comments

Comments
 (0)