Skip to content

Commit 5b49e7e

Browse files
authored
feat(streams): Add support for remote iterators and generators between vats (#574)
Ferrying the `async function* ()` and `for await (...)` syntax between vats is complicated slightly by the use of the unserializable `Symbol.asyncIterator` to implement these features. This PR adds two functions to `@metamask/streams` which are intended to be used in conjunction. - `makeFarGenerator` - which makes a remotable reference from an async generator - `makeEventualIterator` - which makes a for-awaitable iterator from a remote generator presence
1 parent 0e841a8 commit 5b49e7e

16 files changed

Lines changed: 957 additions & 0 deletions

packages/kernel-test/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
"@agoric/store": "0.9.3-u19.0",
4848
"@endo/eventual-send": "^1.3.1",
4949
"@endo/exo": "^1.5.9",
50+
"@endo/far": "^1.1.11",
5051
"@endo/marshal": "^1.6.4",
5152
"@endo/patterns": "^1.5.0",
5253
"@endo/promise-kit": "^1.1.10",
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs';
2+
import { waitUntilQuiescent } from '@metamask/kernel-utils';
3+
import { Kernel } from '@metamask/ocap-kernel';
4+
import { describe, expect, it } from 'vitest';
5+
6+
import {
7+
getBundleSpec,
8+
makeKernel,
9+
makeTestLogger,
10+
runTestVats,
11+
extractTestLogs,
12+
} from './utils.ts';
13+
14+
const testSubcluster = {
15+
bootstrap: 'consumer',
16+
forceReset: true,
17+
vats: {
18+
consumer: {
19+
bundleSpec: getBundleSpec('async-generator-iterator-vat'),
20+
parameters: {
21+
name: 'alice',
22+
},
23+
},
24+
producer: {
25+
bundleSpec: getBundleSpec('async-generator-iterator-vat'),
26+
parameters: {
27+
name: 'bob',
28+
},
29+
},
30+
},
31+
};
32+
33+
describe(
34+
'Async generator consumption between vats',
35+
{
36+
timeout: 2000,
37+
},
38+
() => {
39+
let kernel: Kernel;
40+
41+
it('alice can consume async generator from Bob using for await', async () => {
42+
const kernelDatabase = await makeSQLKernelDatabase({
43+
dbFilename: ':memory:',
44+
});
45+
const { logger, entries } = makeTestLogger();
46+
kernel = await makeKernel(kernelDatabase, true, logger);
47+
48+
await runTestVats(kernel, testSubcluster);
49+
await waitUntilQuiescent(100);
50+
51+
const aliceLogs = extractTestLogs(entries, 'alice');
52+
const bobLogs = extractTestLogs(entries, 'bob');
53+
expect(aliceLogs).toStrictEqual([
54+
'alice buildRootObject',
55+
'alice is bootstrap',
56+
'alice iterating 0',
57+
'alice iterating 1',
58+
'alice iterating 2',
59+
'alice iterating 3',
60+
'alice iterating 4',
61+
]);
62+
63+
expect(bobLogs).toStrictEqual([
64+
'bob buildRootObject',
65+
'bob generating 0',
66+
'bob generating 1',
67+
'bob generating 2',
68+
'bob generating 3',
69+
'bob generating 4',
70+
]);
71+
});
72+
},
73+
);
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { E, Far } from '@endo/far';
2+
// ESLint's import-x/no-unresolved rule with commonjs:false doesn't support subpath exports
3+
// eslint-disable-next-line import-x/no-unresolved
4+
import { makeEventualIterator, makeFarGenerator } from '@metamask/streams/vat';
5+
6+
/**
7+
* Build function for testing async generators.
8+
*
9+
* @param {object} vatPowers - The powers of the vat.
10+
* @param {object} vatPowers.logger - The logger to use.
11+
* @param {object} parameters - The parameters of the vat.
12+
* @param {string} parameters.name - The name of the vat.
13+
* @returns {object} The root object for the vat.
14+
*/
15+
export function buildRootObject({ logger }, { name }) {
16+
const tlogger = logger.subLogger({ tags: ['test', name] });
17+
const tlog = (...args) => tlogger.log(...args);
18+
19+
tlog(`${name} buildRootObject`);
20+
21+
return Far('root', {
22+
async bootstrap({ consumer, producer }, _services) {
23+
tlog(`${name} is bootstrap`);
24+
await E(consumer).iterate(producer);
25+
},
26+
27+
generate: async (stop) =>
28+
makeFarGenerator(
29+
(async function* () {
30+
for (let i = 0; i < stop; i++) {
31+
tlog(`${name} generating ${i}`);
32+
yield i;
33+
}
34+
// Note the IIFE.
35+
})(),
36+
),
37+
38+
iterate: async (producer) => {
39+
const remoteGenerator = await E(producer).generate(5);
40+
for await (const value of makeEventualIterator(remoteGenerator)) {
41+
tlog(`${name} iterating ${value}`);
42+
}
43+
},
44+
});
45+
}

packages/streams/package.json

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@
3939
"default": "./dist/browser/index.cjs"
4040
}
4141
},
42+
"./vat": {
43+
"import": {
44+
"types": "./dist/vat/index.d.mts",
45+
"default": "./dist/vat/index.mjs"
46+
},
47+
"require": {
48+
"types": "./dist/vat/index.d.cts",
49+
"default": "./dist/vat/index.cjs"
50+
}
51+
},
4252
"./package.json": "./package.json"
4353
},
4454
"main": "./dist/index.cjs",
@@ -68,6 +78,9 @@
6878
"test:watch": "vitest --config vitest.config.ts"
6979
},
7080
"dependencies": {
81+
"@endo/exo": "^1.5.9",
82+
"@endo/far": "^1.1.11",
83+
"@endo/patterns": "^1.5.0",
7184
"@endo/promise-kit": "^1.1.10",
7285
"@endo/stream": "^1.2.10",
7386
"@metamask/kernel-errors": "workspace:^",

packages/streams/src/index.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ describe('index', () => {
88
'NodeWorkerDuplexStream',
99
'NodeWorkerReader',
1010
'NodeWorkerWriter',
11+
'makeEventualIterator',
12+
'makeFarGenerator',
1113
'split',
1214
]);
1315
});

packages/streams/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,5 @@ export {
66
NodeWorkerDuplexStream,
77
} from './node/NodeWorkerStream.ts';
88
export { split } from './split.ts';
9+
export { makeEventualIterator } from './vat/eventual-iterator.ts';
10+
export { makeFarGenerator } from './vat/far-generator.ts';
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// This type is used in the docs.
2+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
3+
import type { makeFarGenerator } from './far-generator.ts';
4+
import { makeRefIterator } from './ref-reader.ts';
5+
6+
/**
7+
* Make an iterator from a remote generator. Intended to be used in conjunction
8+
* with {@link makeFarGenerator}. This is the consuming end of the pair.
9+
*
10+
* Enables async iterator syntax like below.
11+
* ```ts
12+
* const eventualIterator = makeEventualIterator(remoteGeneratorRef);
13+
* for await (const value of eventualIterator) {
14+
* console.log(`A faraway vat yielded: ${value}`);
15+
* }
16+
* ```
17+
*
18+
* @param iteratorRef - The remotable presence to make an iterator from.
19+
* @returns An iterator that wraps the remotable presence, enabling async iterator syntax.
20+
*/
21+
export const makeEventualIterator = makeRefIterator;
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import { E } from '@endo/far';
2+
import { makePromiseKit } from '@endo/promise-kit';
3+
import type { Reader, Writer } from '@endo/stream';
4+
import { makePipe } from '@endo/stream';
5+
import { describe, it, expect, vi, beforeEach } from 'vitest';
6+
import type { Mocked } from 'vitest';
7+
8+
import { makeFarGenerator } from './far-generator.ts';
9+
10+
vi.mock('@endo/far', () => ({
11+
E: vi.fn((obj) => obj),
12+
}));
13+
14+
vi.mock('@endo/stream', () => ({
15+
makePipe: vi.fn(),
16+
}));
17+
18+
vi.mock('./reader-ref.ts', () => ({
19+
makeIteratorRef: vi.fn((reader) => reader),
20+
}));
21+
22+
describe('far-generator', () => {
23+
const mockWriter: Mocked<Writer<unknown>> = {
24+
next: vi.fn(),
25+
return: vi.fn(),
26+
throw: vi.fn(),
27+
[Symbol.asyncIterator]: vi.fn(),
28+
};
29+
const mockReader: Mocked<Reader<unknown>> = {
30+
next: vi.fn(),
31+
return: vi.fn(),
32+
throw: vi.fn(),
33+
[Symbol.asyncIterator]: vi.fn(() => mockReader),
34+
};
35+
beforeEach(() => {
36+
vi.clearAllMocks();
37+
vi.mocked(makePipe).mockReturnValue([mockWriter, mockReader]);
38+
});
39+
40+
describe('makeFarGenerator', () => {
41+
it('should wrap a generator', async () => {
42+
const result = makeFarGenerator(
43+
(async function* () {
44+
// Empty generator
45+
})(),
46+
);
47+
48+
// Verify that makeIteratorRef was called with the reader
49+
const { makeIteratorRef } = await import('./reader-ref.ts');
50+
expect(makeIteratorRef).toHaveBeenCalledTimes(1);
51+
expect(result).toBeDefined();
52+
});
53+
54+
it('should pipe values from the generator to the writer', async () => {
55+
const generated = makePromiseKit<string>();
56+
const result = makeFarGenerator(
57+
(async function* () {
58+
yield 'test';
59+
generated.resolve('yielded');
60+
})(),
61+
);
62+
63+
await E(result).next();
64+
65+
expect(await generated.promise).toBe('yielded');
66+
67+
expect(mockWriter.next).toHaveBeenCalledOnce();
68+
expect(mockWriter.next).toHaveBeenCalledWith('test');
69+
});
70+
71+
it('calls writer.throw if the generator throws', async () => {
72+
const generated = makePromiseKit<string>();
73+
const error = new Error('test');
74+
const result = makeFarGenerator(
75+
// eslint-disable-next-line require-yield
76+
(async function* () {
77+
generated.resolve('threw');
78+
throw error;
79+
})(),
80+
);
81+
82+
await E(result).next();
83+
84+
expect(await generated.promise).toBe('threw');
85+
86+
expect(mockWriter.throw).toHaveBeenCalledOnce();
87+
expect(mockWriter.throw).toHaveBeenCalledWith(error);
88+
});
89+
});
90+
});
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { E } from '@endo/far';
2+
import type { FarRef } from '@endo/far';
3+
import { makePipe } from '@endo/stream';
4+
import type { Writer, Reader } from '@endo/stream';
5+
6+
// This type is used in the docs.
7+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
8+
import type { makeEventualIterator } from './eventual-iterator.ts';
9+
import { makeIteratorRef } from './reader-ref.ts';
10+
11+
/**
12+
* Make a remotable generator. Intended to be used in conjunction with
13+
* {@link makeEventualIterator}. This is the producing end of the pair.
14+
*
15+
* @param generator - The generator to make remotable.
16+
* @returns A remotable reference to the generator.
17+
*/
18+
export const makeFarGenerator = <Item>(
19+
generator: AsyncGenerator<Item>,
20+
): FarRef<AsyncIterator<Item>> => {
21+
const [writer, reader] = makePipe<Item>() as unknown as [
22+
Writer<Item>,
23+
Reader<Item>,
24+
];
25+
(async () => {
26+
for await (const value of generator) {
27+
await E(writer).next(value);
28+
}
29+
await E(writer).return(undefined);
30+
})().catch(async (error) => await E(writer).throw(error));
31+
return makeIteratorRef<Item>(reader);
32+
};
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import { describe, it, expect } from 'vitest';
2+
3+
import * as indexModule from './index.ts';
4+
5+
describe('index', () => {
6+
it('has the expected exports', () => {
7+
expect(Object.keys(indexModule).sort()).toStrictEqual([
8+
'makeEventualIterator',
9+
'makeFarGenerator',
10+
]);
11+
});
12+
});

0 commit comments

Comments
 (0)