Skip to content

Commit fb47068

Browse files
sirtimidclaude
andauthored
feat(ocap-kernel): leverage libp2p v3 features in remote communications (#915)
Closes #903 ## Summary Adopts several libp2p v3 features to improve error handling, observability, and robustness of the remote communications layer: - **Async protocol handlers with auto-abort** — inbound connection handler is now `async`, so handler rejections automatically abort the stream instead of silently dropping errors - **Fine-grained stream close events** — v3 streams emit close events with `{ error, local }` properties for better diagnostics distinguishing local vs remote and clean vs error closes - **Stream inactivity timeout** — configurable `streamInactivityTimeoutMs` (default 120s) detects dead streams via bidirectional silence, with a 5s minimum floor to prevent self-DoS - **`peer:disconnect` safety net** — listens for peer-level disconnect events as a fallback when `readChannel` cleanup misses a disconnection, with `signal.aborted` guard to avoid spurious reconnection during shutdown - **`StreamResetError` detection** — recognizes remote stream resets without permanently marking the peer as intentionally closed (prevents malicious peers from suppressing reconnection) - **Graceful `closeChannel` rewrite** — uses `channel.stream.close()` directly instead of the fragile `unwrap()` pattern, with abort fallback - **Graceful shutdown** — `stop()` sends FIN to remote peers so they see clean stream ends rather than `StreamResetError` - **`Channel` type gains `stream`** — direct access to stream lifecycle APIs, avoiding `unwrap()` - **`TooManyOutboundProtocolStreamsError` handling** — re-throws immediately since trying other addresses won't help - **`writeWithTimeout` stream status guard** — fast-fails on already-closed streams before attempting a write - **`RemoteHandle.giveUp()` method** — transport-level give-up now properly cleans up (stops ACK retransmission + rejects pending messages + rejects pending URL redemptions) - **Flaky multi-peer reconnection E2E test** — fixed by adding `maxConnectionAttemptsPerMinute: 500` (rate limiter was the root cause), parallel restarts, and `ackTimeoutMs: 5000` ## Test plan New unit tests cover stream status fast-fail (`it.each` over 4 statuses), `peer:disconnect` handler (registration, forwarding, relay filtering, shutdown guard), stream close event diagnostics (4 scenarios via `it.each`), `TooManyOutboundProtocolStreamsError` rethrow, `StreamResetError` reconnection behavior, `closeChannel` graceful/abort/double-fail paths, `stop()` stream closing, `RemoteHandle.giveUp()`, and `streamInactivityTimeoutMs` query string round-trip. New E2E test file (`libp2p-v3-features.test.ts`) covers stream inactivity timeout recovery end-to-end. ## Suggested review order 1. `packages/ocap-kernel/src/remotes/types.ts` — `Channel` type gains `stream`, new `PeerDisconnectHandler` and `streamInactivityTimeoutMs` option. Read this first to understand the type changes everything else builds on. 2. `packages/ocap-kernel/src/remotes/platform/constants.ts` — new `STREAM_INACTIVITY_TIMEOUT_MS` and `MIN_STREAM_INACTIVITY_TIMEOUT_MS` constants. 3. `packages/ocap-kernel/src/remotes/platform/channel-utils.ts` + test — `writeWithTimeout` stream status guard (small, self-contained). 4. `packages/ocap-kernel/src/remotes/platform/connection-factory.ts` + test — `peer:disconnect` listener, async inbound handler, `closeChannel` rewrite, `TooManyOutboundProtocolStreamsError`. Core plumbing. 5. `packages/ocap-kernel/src/remotes/platform/transport.ts` + test — biggest file: `StreamResetError` handling, `registerChannel` stream events/timeout, `peer:disconnect` safety net, graceful `stop()`. Builds on 3 and 4. 6. `packages/ocap-kernel/src/remotes/kernel/RemoteHandle.ts` + test — new `giveUp()` method and `#handleAckTimeout` cleanup path. 7. `packages/ocap-kernel/src/remotes/kernel/RemoteManager.ts` + test — `#handleRemoteGiveUp` wiring to `giveUp()`. 8. `packages/kernel-node-runtime/test/e2e/remote-comms.test.ts` — flaky multi-peer reconnection fix (rate limit + parallel restarts). 9. `packages/kernel-node-runtime/test/e2e/libp2p-v3-features.test.ts` — new E2E tests for the v3 features. 10. `packages/kernel-browser-runtime/src/utils/comms-query-string.ts` + test — `streamInactivityTimeoutMs` query string support (trivial). 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Touches core remote-communications transport/retry logic (stream lifecycle, reconnection triggers, and give-up cleanup), which can affect message delivery and connection stability; changes are well-covered by new/updated unit and E2E tests. > > **Overview** > Improves remote comms robustness by adopting libp2p v3 stream/connection signals: `Channel` now carries the underlying `stream`, inbound protocol handlers can be async, and the transport configures per-stream inactivity timeouts and logs detailed stream close diagnostics. > > Reworks failure handling and cleanup paths: fast-fail writes on non-`open` streams, treat `StreamResetError`/`peer:disconnect` as reconnection signals (with relay filtering and shutdown guards), handle `TooManyOutboundProtocolStreamsError` as a non-retryable dial error, and make shutdown/`closeChannel` prefer graceful `stream.close()` with abort fallback. > > Fixes give-up semantics so pending messages and URL redemptions are always rejected via new `RemoteHandle.giveUp()` and updated `RemoteManager` wiring, adds `streamInactivityTimeoutMs` support to comms query strings, and expands/adjusts unit + E2E coverage (including a new `libp2p-v3-features` E2E suite and less-flaky multi-peer reconnection timing/rate-limit settings). > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 52fb402. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent da4f95c commit fb47068

18 files changed

Lines changed: 965 additions & 135 deletions

packages/kernel-browser-runtime/src/utils/comms-query-string.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ describe('comms-query-string', () => {
6868
allowedWsHosts: ['relay.example.com'],
6969
maxRetryAttempts: 5,
7070
maxQueue: 200,
71+
streamInactivityTimeoutMs: 60000,
7172
};
7273
const params = createCommsQueryString(options);
7374
expect(parseCommsQueryString(`?${params.toString()}`)).toStrictEqual(

packages/kernel-browser-runtime/src/utils/comms-query-string.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ const NUMBER_PARAM_NAMES = [
5757
'handshakeTimeoutMs',
5858
'writeTimeoutMs',
5959
'ackTimeoutMs',
60+
'streamInactivityTimeoutMs',
6061
] as const satisfies readonly NumberParamKey[];
6162

6263
const NonNegativeInteger = min(integer(), 0);

packages/kernel-cli/src/commands/relay.test.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ vi.mock('../utils.ts', () => ({
2727
waitFor: vi.fn(),
2828
}));
2929

30-
const mockLogger = { info: vi.fn(), error: vi.fn() } as unknown as Logger;
30+
const mockLogger: Logger = {
31+
info: vi.fn(),
32+
error: vi.fn(),
33+
} as unknown as Logger;
3134

3235
const makeLibp2pMock = (
3336
addrs: string[] = ['/ip4/127.0.0.1/tcp/9001/ws/p2p/QmFoo'],
Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
import type { Libp2p } from '@libp2p/interface';
2+
import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs';
3+
import { startRelay } from '@metamask/kernel-utils/libp2p';
4+
import { Kernel, kunser, makeKernelStore } from '@metamask/ocap-kernel';
5+
import { delay } from '@ocap/repo-tools/test-utils';
6+
import { mkdtemp, rm } from 'node:fs/promises';
7+
import { tmpdir } from 'node:os';
8+
import { join } from 'node:path';
9+
import {
10+
describe,
11+
it,
12+
expect,
13+
beforeAll,
14+
afterAll,
15+
beforeEach,
16+
afterEach,
17+
} from 'vitest';
18+
19+
import { makeTestKernel } from '../helpers/kernel.ts';
20+
import {
21+
makeRemoteVatConfig,
22+
restartKernelAndReloadVat,
23+
sendRemoteMessage,
24+
setupAliceAndBob,
25+
} from '../helpers/remote-comms.ts';
26+
import { stopWithTimeout } from '../helpers/stop-with-timeout.ts';
27+
28+
const NETWORK_TIMEOUT = 30_000;
29+
const relayPeerId = '12D3KooWJBDqsyHQF2MWiCdU4kdqx4zTsSTLRdShg7Ui6CRWB4uc';
30+
const testRelays = [`/ip4/127.0.0.1/tcp/9001/ws/p2p/${relayPeerId}`];
31+
32+
const testBackoffOptions = {
33+
reconnectionBaseDelayMs: 10,
34+
reconnectionMaxDelayMs: 50,
35+
handshakeTimeoutMs: 3_000,
36+
writeTimeoutMs: 3_000,
37+
ackTimeoutMs: 2_000,
38+
};
39+
40+
describe.sequential('libp2p v3 Features E2E', () => {
41+
let relay: Libp2p;
42+
let kernel1: Kernel;
43+
let kernel2: Kernel;
44+
let dbFilename1: string;
45+
let dbFilename2: string;
46+
let tempDir: string;
47+
let kernelStore1: ReturnType<typeof makeKernelStore>;
48+
let kernelStore2: ReturnType<typeof makeKernelStore>;
49+
50+
beforeAll(async () => {
51+
relay = await startRelay(console);
52+
});
53+
54+
afterAll(async () => {
55+
if (relay) {
56+
await relay.stop();
57+
}
58+
});
59+
60+
beforeEach(async () => {
61+
tempDir = await mkdtemp(join(tmpdir(), 'ocap-v3-'));
62+
dbFilename1 = join(tempDir, 'kernel1.db');
63+
dbFilename2 = join(tempDir, 'kernel2.db');
64+
65+
const kernelDatabase1 = await makeSQLKernelDatabase({
66+
dbFilename: dbFilename1,
67+
});
68+
kernelStore1 = makeKernelStore(kernelDatabase1);
69+
70+
const kernelDatabase2 = await makeSQLKernelDatabase({
71+
dbFilename: dbFilename2,
72+
});
73+
kernelStore2 = makeKernelStore(kernelDatabase2);
74+
75+
kernel1 = await makeTestKernel(kernelDatabase1);
76+
kernel2 = await makeTestKernel(kernelDatabase2);
77+
});
78+
79+
afterEach(async () => {
80+
const STOP_TIMEOUT = 3000;
81+
await Promise.all([
82+
kernel1 &&
83+
stopWithTimeout(
84+
async () => kernel1.stop(),
85+
STOP_TIMEOUT,
86+
'kernel1.stop',
87+
),
88+
kernel2 &&
89+
stopWithTimeout(
90+
async () => kernel2.stop(),
91+
STOP_TIMEOUT,
92+
'kernel2.stop',
93+
),
94+
]);
95+
if (tempDir) {
96+
await rm(tempDir, { recursive: true, force: true });
97+
}
98+
});
99+
100+
describe('peer:disconnect Reconnection', () => {
101+
it(
102+
'recovers queued message after peer:disconnect triggers reconnection',
103+
async () => {
104+
const { aliceRef, bobURL } = await setupAliceAndBob(
105+
kernel1,
106+
kernel2,
107+
kernelStore1,
108+
kernelStore2,
109+
testRelays,
110+
testBackoffOptions,
111+
);
112+
113+
// Establish initial communication
114+
const initial = await sendRemoteMessage(
115+
kernel1,
116+
aliceRef,
117+
bobURL,
118+
'hello',
119+
['Alice'],
120+
);
121+
expect(initial).toContain('vat Bob got "hello" from Alice');
122+
123+
// Stop kernel2 — triggers both readChannel error and peer:disconnect.
124+
// The peer:disconnect event acts as a safety net ensuring reconnection
125+
// is attempted even after readChannel clears the channel.
126+
await kernel2.stop();
127+
128+
// Queue a message while kernel2 is down — this triggers reconnection
129+
const recoveryPromise = kernel1.queueMessage(
130+
aliceRef,
131+
'sendRemoteMessage',
132+
[bobURL, 'hello', ['Alice']],
133+
);
134+
135+
// Restart kernel2 — reconnection loop delivers the queued message
136+
const bobConfig = makeRemoteVatConfig('Bob');
137+
const restartResult = await restartKernelAndReloadVat(
138+
dbFilename2,
139+
false,
140+
testRelays,
141+
bobConfig,
142+
testBackoffOptions,
143+
);
144+
// eslint-disable-next-line require-atomic-updates
145+
kernel2 = restartResult.kernel;
146+
147+
const result = kunser(await recoveryPromise) as string;
148+
expect(result).toContain('vat Bob got "hello" from Alice');
149+
150+
// Verify ongoing connectivity after peer:disconnect recovery
151+
const followUp = await sendRemoteMessage(
152+
kernel1,
153+
aliceRef,
154+
bobURL,
155+
'hello',
156+
['Alice'],
157+
);
158+
expect(followUp).toContain('vat Bob got "hello" from Alice');
159+
},
160+
NETWORK_TIMEOUT * 2,
161+
);
162+
});
163+
164+
describe('Stream Inactivity Timeout', () => {
165+
it(
166+
'recovers communication after idle period exceeds inactivity timeout',
167+
async () => {
168+
// Use a short inactivity timeout to test the auto-abort behavior.
169+
// Must be >= MIN_STREAM_INACTIVITY_TIMEOUT_MS (5 s) since the
170+
// transport clamps lower values.
171+
const shortTimeoutOptions = {
172+
...testBackoffOptions,
173+
streamInactivityTimeoutMs: 6_000,
174+
};
175+
176+
const { aliceRef, bobURL } = await setupAliceAndBob(
177+
kernel1,
178+
kernel2,
179+
kernelStore1,
180+
kernelStore2,
181+
testRelays,
182+
shortTimeoutOptions,
183+
);
184+
185+
// Establish initial communication
186+
const initial = await sendRemoteMessage(
187+
kernel1,
188+
aliceRef,
189+
bobURL,
190+
'hello',
191+
['Alice'],
192+
);
193+
expect(initial).toContain('vat Bob got "hello" from Alice');
194+
195+
// Wait longer than the inactivity timeout (6s + buffer).
196+
// The stream should auto-abort due to inactivityTimeout,
197+
// triggering connection loss handling and reconnection.
198+
await delay(8_000);
199+
200+
// Send another message — the transport layer should
201+
// reconnect since the previous stream was aborted by inactivity.
202+
const afterIdle = await sendRemoteMessage(
203+
kernel1,
204+
aliceRef,
205+
bobURL,
206+
'hello',
207+
['Alice'],
208+
);
209+
expect(afterIdle).toContain('vat Bob got "hello" from Alice');
210+
},
211+
NETWORK_TIMEOUT * 2,
212+
);
213+
});
214+
215+
describe('Fast Failure on Closed Streams', () => {
216+
it(
217+
'handles rapid sends during disconnect without hanging',
218+
async () => {
219+
const { aliceRef, bobURL, peerId2 } = await setupAliceAndBob(
220+
kernel1,
221+
kernel2,
222+
kernelStore1,
223+
kernelStore2,
224+
testRelays,
225+
testBackoffOptions,
226+
);
227+
228+
// Establish initial connectivity
229+
await sendRemoteMessage(kernel1, aliceRef, bobURL, 'hello', ['Alice']);
230+
231+
// Intentionally close the connection — writes should fail fast
232+
// via stream.status check rather than waiting for timeout
233+
await kernel1.closeConnection(peerId2);
234+
235+
const start = Date.now();
236+
// This should fail quickly (stream.status !== 'open') rather than
237+
// waiting for the full write timeout (3000ms)
238+
await expect(
239+
kernel1.queueMessage(aliceRef, 'sendRemoteMessage', [
240+
bobURL,
241+
'hello',
242+
['Alice'],
243+
]),
244+
).rejects.toMatchObject({
245+
body: expect.stringContaining(
246+
'Message delivery failed after intentional close',
247+
),
248+
});
249+
const elapsed = Date.now() - start;
250+
251+
// Should fail well under the write timeout
252+
expect(elapsed).toBeLessThan(testBackoffOptions.writeTimeoutMs);
253+
},
254+
NETWORK_TIMEOUT,
255+
);
256+
});
257+
258+
describe('Connection Type Awareness', () => {
259+
it(
260+
'establishes relayed connections and communicates successfully',
261+
async () => {
262+
// Both kernels connect via relay — the connection.direct property
263+
// should be false (relayed), and the log should include "relayed".
264+
// This validates that the connection type detection works with real
265+
// libp2p connections.
266+
const { aliceRef, bobURL, peerId1, peerId2 } = await setupAliceAndBob(
267+
kernel1,
268+
kernel2,
269+
kernelStore1,
270+
kernelStore2,
271+
testRelays,
272+
testBackoffOptions,
273+
);
274+
275+
// Verify distinct peer IDs (confirms real libp2p nodes)
276+
expect(peerId1).not.toBe(peerId2);
277+
278+
// Bidirectional communication through relay
279+
const response = await sendRemoteMessage(
280+
kernel1,
281+
aliceRef,
282+
bobURL,
283+
'hello',
284+
['Alice'],
285+
);
286+
expect(response).toContain('vat Bob got "hello" from Alice');
287+
},
288+
NETWORK_TIMEOUT,
289+
);
290+
});
291+
});

0 commit comments

Comments
 (0)