Skip to content

Commit 9ca5e30

Browse files
authored
refactor(ocap-kernel): Use JSON-RPC notifications for vat syscalls (#528)
Fixes #525 Replaces our janky use of JSON-RPC requests with notifications in the `VatSupervisor`. Also makes some changes with a view to recognizing that vat syscalls are handled synchronously by the kernel.
1 parent c94b22d commit 9ca5e30

8 files changed

Lines changed: 85 additions & 93 deletions

File tree

packages/kernel-rpc-methods/src/types.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ export type MethodSignature<
1212
export type MethodSpec<
1313
Method extends string,
1414
Params extends JsonRpcParams,
15-
Result extends Json | Promise<Json> | void,
16-
> = Result extends void
15+
Result extends Json | Promise<Json> | void | Promise<void>,
16+
> = Result extends void | Promise<void>
1717
? { method: Method; params: Struct<Params>; result?: undefined }
1818
: {
1919
method: Method;
@@ -74,7 +74,7 @@ export type ExtractResult<
7474

7575
export type HandlerFunction<
7676
Params extends JsonRpcParams,
77-
Result extends Json | Promise<Json> | void,
77+
Result extends Json | Promise<Json> | void | Promise<void>,
7878
Hooks extends Record<string, unknown>,
7979
> = (hooks: Hooks, params: Params) => Result;
8080

@@ -83,7 +83,7 @@ export type HandlerFunction<
8383
export type Handler<
8484
Method extends string,
8585
Params extends JsonRpcParams,
86-
Result extends Json | Promise<Json> | void,
86+
Result extends Json | Promise<Json> | void | Promise<void>,
8787
Hooks extends Record<string, unknown>,
8888
> = MethodSpec<Method, Params, Result> & {
8989
hooks: { [Key in keyof Hooks]: true };

packages/logger/src/stream.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import type { JsonRpcCall, JsonRpcMessage } from '@metamask/kernel-utils';
33
import { split } from '@metamask/streams';
44
import type { DuplexStream } from '@metamask/streams';
55
import { isJsonRpcNotification } from '@metamask/utils';
6-
import type { JsonRpcRequest } from '@metamask/utils';
76

87
import type { LogEntry, LogLevel } from './types.ts';
98

@@ -84,7 +83,7 @@ harden(isLoggerMessage);
8483
*/
8584
export const isKernelMessage = (
8685
message: JsonRpcMessage,
87-
): message is JsonRpcRequest => !isLoggerMessage(message);
86+
): message is JsonRpcMessage => !isLoggerMessage(message);
8887
harden(isKernelMessage);
8988

9089
/**

packages/ocap-kernel/src/VatHandle.test.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,30 @@ describe('VatHandle', () => {
9090
await delay(10);
9191
expect(logger.error).toHaveBeenCalledWith(
9292
'Unexpected read error',
93-
expect.any(Error),
93+
expect.objectContaining({
94+
message: expect.stringMatching(/Message failed type validation/u),
95+
}),
96+
);
97+
});
98+
99+
it('throws if handleMessage throws', async () => {
100+
const logger = {
101+
error: vi.fn(),
102+
subLogger: vi.fn(() => logger),
103+
} as unknown as Logger;
104+
const { stream } = await makeVat({ logger });
105+
await stream.receiveInput({
106+
id: 'v0:1',
107+
method: 'ping',
108+
params: [],
109+
jsonrpc: '2.0',
110+
});
111+
await delay(10);
112+
expect(logger.error).toHaveBeenCalledWith(
113+
'Unexpected read error',
114+
expect.objectContaining({
115+
message: expect.stringMatching(/^Received unexpected message/u),
116+
}),
94117
);
95118
});
96119
});

packages/ocap-kernel/src/VatHandle.ts

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import type {
1111
import type { VatStore } from '@metamask/kernel-store';
1212
import type { JsonRpcMessage } from '@metamask/kernel-utils';
1313
import { Logger } from '@metamask/logger';
14-
import { serializeError } from '@metamask/rpc-errors';
1514
import type { DuplexStream } from '@metamask/streams';
16-
import { isJsonRpcRequest, isJsonRpcResponse } from '@metamask/utils';
15+
import { isJsonRpcNotification, isJsonRpcResponse } from '@metamask/utils';
16+
import type { JsonRpcNotification, JsonRpcResponse } from '@metamask/utils';
1717

1818
import type { KernelQueue } from './KernelQueue.ts';
1919
import { vatMethodSpecs, vatSyscallHandlers } from './rpc/index.ts';
@@ -30,10 +30,14 @@ import type {
3030
} from './types.ts';
3131
import { VatSyscall } from './VatSyscall.ts';
3232

33+
type MessageFromVat = JsonRpcResponse | JsonRpcNotification;
34+
35+
type VatStream = DuplexStream<MessageFromVat, JsonRpcMessage>;
36+
3337
type VatConstructorProps = {
3438
vatId: VatId;
3539
vatConfig: VatConfig;
36-
vatStream: DuplexStream<JsonRpcMessage, JsonRpcMessage>;
40+
vatStream: VatStream;
3741
kernelStore: KernelStore;
3842
kernelQueue: KernelQueue;
3943
logger?: Logger | undefined;
@@ -44,7 +48,7 @@ export class VatHandle {
4448
readonly vatId: VatId;
4549

4650
/** Communications channel to and from the vat itself */
47-
readonly #vatStream: DuplexStream<JsonRpcMessage, JsonRpcMessage>;
51+
readonly #vatStream: VatStream;
4852

4953
/** The vat's configuration */
5054
readonly config: VatConfig;
@@ -110,8 +114,8 @@ export class VatHandle {
110114
`${this.vatId}:`,
111115
);
112116
this.#rpcService = new RpcService(vatSyscallHandlers, {
113-
handleSyscall: async (params) => {
114-
return this.#vatSyscall.handleSyscall(params as VatSyscallObject);
117+
handleSyscall: (params) => {
118+
this.#vatSyscall.handleSyscall(params as VatSyscallObject);
115119
},
116120
});
117121
}
@@ -181,25 +185,13 @@ export class VatHandle {
181185
async #handleMessage(message: JsonRpcMessage): Promise<void> {
182186
if (isJsonRpcResponse(message)) {
183187
this.#rpcClient.handleResponse(message.id as string, message);
184-
} else if (isJsonRpcRequest(message)) {
185-
try {
186-
this.#rpcService.assertHasMethod(message.method);
187-
const result = await this.#rpcService.execute(
188-
message.method,
189-
message.params,
190-
);
191-
await this.#vatStream.write({
192-
id: message.id,
193-
result,
194-
jsonrpc: '2.0',
195-
});
196-
} catch (error) {
197-
await this.#vatStream.write({
198-
id: message.id,
199-
error: serializeError(error),
200-
jsonrpc: '2.0',
201-
});
202-
}
188+
} else if (isJsonRpcNotification(message)) {
189+
this.#rpcService.assertHasMethod(message.method);
190+
await this.#rpcService.execute(message.method, message.params);
191+
} else {
192+
// We don't expect any JSON-RPC requests from the vat, but the stream may permit them
193+
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
194+
throw new Error(`Received unexpected message: ${message}`);
203195
}
204196
}
205197

packages/ocap-kernel/src/VatSupervisor.ts

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ const makeLiveSlots: MakeLiveSlotsFn = localMakeLiveSlots;
3131
// eslint-disable-next-line n/no-unsupported-features/node-builtins
3232
export type FetchBlob = (bundleURL: string) => Promise<Response>;
3333

34+
type SupervisorRpcClient = Pick<
35+
RpcClient<typeof vatSyscallMethodSpecs>,
36+
'notify' | 'handleResponse'
37+
>;
38+
3439
type SupervisorConstructorProps = {
3540
id: VatId;
3641
kernelStream: DuplexStream<JsonRpcMessage, JsonRpcMessage>;
@@ -54,7 +59,7 @@ export class VatSupervisor {
5459
readonly #logger: Logger;
5560

5661
/** RPC client for sending syscall requests to the kernel */
57-
readonly #rpcClient: RpcClient<typeof vatSyscallMethodSpecs>;
62+
readonly #rpcClient: SupervisorRpcClient;
5863

5964
/** RPC service for handling requests from the kernel */
6065
readonly #rpcService: RpcService<typeof vatHandlers>;
@@ -74,9 +79,6 @@ export class VatSupervisor {
7479
/** Capability to fetch the bundle of code to run in this vat. */
7580
readonly #fetchBlob: FetchBlob;
7681

77-
/** Result promises from all syscalls sent to the kernel in the current crank */
78-
readonly #syscallsInFlight: Promise<unknown>[] = [];
79-
8082
/**
8183
* Construct a new VatSupervisor instance.
8284
*
@@ -177,17 +179,15 @@ export class VatSupervisor {
177179
* @returns a syscall success result.
178180
*/
179181
executeSyscall(vso: VatSyscallObject): VatSyscallResult {
180-
this.#syscallsInFlight.push(
181-
// IMPORTANT: Syscall architecture design explanation:
182-
// - Vats operate on an "optimistic execution" model - they send syscalls and continue execution
183-
// without waiting for responses, assuming success.
184-
// - The Kernel processes syscalls asynchronously and failures are catched in VatHandle.
185-
this.#rpcClient
186-
.call('syscall', coerceVatSyscallObject(vso))
187-
// We catch these rejections here to prevent unhandled promise rejections,
188-
// as they're an expected part of the architecture, not errors
189-
.catch(() => undefined),
190-
);
182+
// IMPORTANT: Syscall architecture design explanation:
183+
// - Vats operate on an "optimistic execution" model - they send syscalls and continue execution
184+
// without waiting for responses, assuming success.
185+
// - The Kernel processes syscalls synchronously on receipt and failures are caught in VatHandle.
186+
// - The vat is terminated and the crank is rolled back if a syscall fails.
187+
this.#rpcClient
188+
.notify('syscall', coerceVatSyscallObject(vso))
189+
// Just to please the linter (notifications never reject)
190+
.catch(() => undefined);
191191
return ['ok', null];
192192
}
193193

@@ -204,13 +204,6 @@ export class VatSupervisor {
204204
// Handle delivery errors
205205
deliveryError = error instanceof Error ? error.message : String(error);
206206
this.#logger.error(`Delivery error in vat ${this.id}:`, deliveryError);
207-
} finally {
208-
// Clean up at the end of a crank
209-
this.#syscallsInFlight.length = 0;
210-
// Reject all pending RPC requests to maintain the optimistic execution model
211-
// This prevents late responses from affecting the vat in unexpected ways
212-
// between cranks.
213-
this.#rpcClient.rejectAll(new Error('end of crank'));
214207
}
215208

216209
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,35 @@
11
import { describe, it, expect, vi } from 'vitest';
22

33
import { vatSyscallHandler } from './vat-syscall.ts';
4-
import type { HandleSyscall } from './vat-syscall.ts';
54

65
describe('vatSyscallHandler', () => {
7-
it('should initialize a vat', async () => {
8-
const handleSyscall = vi.fn(async () =>
9-
Promise.resolve({
10-
checkpoint: 'fake',
11-
}),
12-
) as unknown as HandleSyscall;
13-
const result = await vatSyscallHandler.implementation({ handleSyscall }, [
6+
it('should initialize a vat', () => {
7+
const handleSyscall = vi.fn();
8+
vatSyscallHandler.implementation({ handleSyscall }, [
9+
'send',
10+
'test',
11+
{
12+
methargs: { body: 'test', slots: [] },
13+
result: null,
14+
},
15+
]);
16+
17+
expect(handleSyscall).toHaveBeenCalledTimes(1);
18+
expect(handleSyscall).toHaveBeenCalledWith([
1419
'send',
1520
'test',
1621
{
1722
methargs: { body: 'test', slots: [] },
1823
result: null,
1924
},
2025
]);
21-
expect(result).toStrictEqual({
22-
checkpoint: 'fake',
23-
});
2426
});
2527

26-
it('should propagate errors from hooks', async () => {
28+
it('should propagate errors from hooks', () => {
2729
const handleSyscall = vi.fn(() => {
2830
throw new Error('fake');
2931
});
30-
await expect(
32+
expect(() =>
3133
vatSyscallHandler.implementation({ handleSyscall }, [
3234
'send',
3335
'test',
@@ -36,6 +38,6 @@ describe('vatSyscallHandler', () => {
3638
result: null,
3739
},
3840
]),
39-
).rejects.toThrow('fake');
41+
).toThrow('fake');
4042
});
4143
});

packages/ocap-kernel/src/rpc/vat-syscall/vat-syscall.ts

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import type { VatSyscallResult } from '@agoric/swingset-liveslots';
21
import type { Handler, MethodSpec } from '@metamask/kernel-rpc-methods';
32
import {
43
tuple,
@@ -7,7 +6,6 @@ import {
76
string,
87
union,
98
boolean,
10-
Struct,
119
} from '@metamask/superstruct';
1210
import type { Infer } from '@metamask/superstruct';
1311

@@ -65,27 +63,12 @@ const VatSyscallParamsStruct = union([
6563

6664
type VatSyscallParams = Infer<typeof VatSyscallParamsStruct>;
6765

68-
const VatSyscallResultStruct: Struct<VatSyscallResult> = union([
69-
tuple([
70-
literal('ok'),
71-
union([CapDataStruct, string(), array(string()), literal(null)]),
72-
]),
73-
tuple([literal('error'), string()]),
74-
]);
75-
76-
export const vatSyscallSpec: MethodSpec<
77-
'syscall',
78-
VatSyscallParams,
79-
Promise<VatSyscallResult>
80-
> = {
66+
export const vatSyscallSpec: MethodSpec<'syscall', VatSyscallParams, void> = {
8167
method: 'syscall',
8268
params: VatSyscallParamsStruct,
83-
result: VatSyscallResultStruct,
8469
} as const;
8570

86-
export type HandleSyscall = (
87-
params: VatSyscallParams,
88-
) => Promise<VatSyscallResult>;
71+
export type HandleSyscall = (params: VatSyscallParams) => void;
8972

9073
type SyscallHooks = {
9174
handleSyscall: HandleSyscall;
@@ -94,12 +77,12 @@ type SyscallHooks = {
9477
export const vatSyscallHandler: Handler<
9578
'syscall',
9679
VatSyscallParams,
97-
Promise<VatSyscallResult>,
80+
void,
9881
SyscallHooks
9982
> = {
10083
...vatSyscallSpec,
10184
hooks: { handleSyscall: true },
102-
implementation: async ({ handleSyscall }, params) => {
103-
return await handleSyscall(params);
85+
implementation: ({ handleSyscall }, params) => {
86+
handleSyscall(params);
10487
},
10588
} as const;

vitest.config.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,10 @@ export default defineConfig({
134134
lines: 73.58,
135135
},
136136
'packages/ocap-kernel/**': {
137-
statements: 91.36,
137+
statements: 91.39,
138138
functions: 95.07,
139-
branches: 81.99,
140-
lines: 91.33,
139+
branches: 82.16,
140+
lines: 91.37,
141141
},
142142
'packages/streams/**': {
143143
statements: 100,

0 commit comments

Comments
 (0)