Skip to content

Commit eeae1ee

Browse files
sirtimidclaude
andauthored
fix(ocap-kernel): peer-incarnation restart detection across receiver state loss (#948)
## What Closes [#944](#944): a remote peer restarting with a fresh incarnation (e.g. plugin kernel reload) caused the receiving kernel to silently drop the new connection's seq=1 messages. ## Why The receiver's persisted `RemoteHandle.#highestReceivedSeq` outlived the in-memory PeerStateManager that was supposed to detect the restart. The handshake then mis-classified a real restart as a first connection, and dedup ate the messages. ## How - Persist the peer's last-observed incarnation in a new `peerIncarnation.{peerId}` KV namespace, so the comparison survives receiver state loss. - Fire `onIncarnationChange` on every successful handshake; the kernel compares observed against persisted and returns whether a restart was detected. The transport uses that verdict to close the channel and re-dial, symmetric on both inbound and outbound paths. - On detected restart, clear the peer's c-list contributions (`forgetEndpointImports`) and reject promises it was deciding, atomically with the persisted incarnation update via a savepoint. In-memory state changes and run-queue notifications are deferred until after commit so a kv rollback can't leave the views inconsistent. - Add `PeerRestartedError`, `IntentionalCloseError`, `NetworkStoppedError` to `@metamask/kernel-errors` (with `isTerminalSendError`) so the kernel-side abort-retransmit predicate stops relying on string matching. ## Test plan - [x] Unit tests: `yarn workspace @metamask/kernel-errors test:dev:quiet`, `yarn workspace @MetaMask/ocap-kernel test:dev:quiet`, `kernel-browser-runtime`, `kernel-node-runtime` - [x] `yarn build`, `yarn workspace @MetaMask/ocap-kernel lint:fix` - [x] E2E `packages/extension/test/e2e/remote-comms.test.ts` (timeout budget raised to fit the 40s URL redemption ceiling) - [x] E2E `packages/kernel-node-runtime/test/e2e/remote-comms.test.ts` #944 regression 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Touches remote-communications handshake, kernel persistence, and retransmit logic; mistakes could cause dropped messages, excessive reconnects, or incorrect promise rejection. Mitigated by extensive new unit/e2e coverage and fail-closed defaults. > > **Overview** > Fixes a remote-comms restart edge case by **persisting the last-seen peer incarnation** and consulting it on *every* handshake, allowing restart detection even after receiver-side in-memory state loss. > > Updates the transport and platform-services RPC plumbing so `onIncarnationChange(peerId, observedIncarnation)` returns a boolean verdict; the transport uses this verdict to **drop/close channels and suppress potentially stale outbound messages** (with explicit fail-closed behavior on RPC/handler errors). > > Hardens kernel restart handling by splitting peer-restart cleanup into persisted vs in-memory phases (savepoint-protected), adding `forgetEndpointImports` c-list teardown, improving retransmit to send sequentially and abort on terminal send errors, and introducing new sentinel errors (`PeerRestartedError`, `IntentionalCloseError`, `NetworkStoppedError`) plus `isTerminalSendError`. E2E timeouts/tests are adjusted/added to cover the regression scenario. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 6aae9ea. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 306d874 commit eeae1ee

34 files changed

Lines changed: 2315 additions & 219 deletions

packages/extension/test/e2e/remote-comms.test.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { rm } from 'node:fs/promises';
55

66
import { loadExtension, sessionPath } from '../helpers.ts';
77

8-
test.describe.configure({ mode: 'serial', timeout: 60_000 });
8+
test.describe.configure({ mode: 'serial', timeout: 90_000 });
99

1010
/**
1111
* End-to-end tests for remote communications functionality.
@@ -140,7 +140,11 @@ test.describe('Remote Communications', () => {
140140
const messageResponse = popupPage1.locator(
141141
'[data-testid="message-response"]',
142142
);
143-
await expect(messageResponse).toBeVisible({ timeout: 30_000 });
143+
// Budget: redemption timeout is `ackTimeoutMs * (MAX_RETRIES + 1)` =
144+
// 40s with prod defaults; the response (success or rejection) is
145+
// guaranteed to render before then. 50s gives 10s headroom for the
146+
// post-redemption render path under CI load.
147+
await expect(messageResponse).toBeVisible({ timeout: 50_000 });
144148
await expect(messageResponse).toContainText(
145149
// eslint-disable-next-line no-useless-escape
146150
`Response:{\"body\":\"#\\\"vat Bob got \\\\\\\"hello\\\\\\\" from remote Alice\\\"\",\"slots\":[]}`,

packages/kernel-browser-runtime/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Fixed
11+
12+
- Process platform-services RPC request handlers in the background so a request handler that fires a reentrant outbound RPC (e.g. transport handshake calling back into the kernel) cannot deadlock waiting for its response ([#948](https://github.com/MetaMask/ocap-kernel/pull/948))
13+
1014
## [0.6.0]
1115

1216
### Added

packages/kernel-browser-runtime/src/PlatformServicesClient.test.ts

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,197 @@ describe('PlatformServicesClient', () => {
513513
expect(successResponse).toBeDefined();
514514
});
515515
});
516+
517+
describe('remoteIncarnationChange', () => {
518+
it('forwards args to handler and returns its boolean verdict', async () => {
519+
const outputs: MessageEventWithPayload[] = [];
520+
const testStream = await TestDuplexStream.make((message) => {
521+
outputs.push(message as unknown as MessageEventWithPayload);
522+
});
523+
const testClient = new PlatformServicesClient(
524+
testStream as unknown as PlatformServicesClientStream,
525+
clientLogger,
526+
);
527+
await delay(10);
528+
529+
const remoteHandler = vi.fn(async () => 'response');
530+
const giveUpHandler = vi.fn();
531+
const incarnationHandler = vi.fn(async () => true);
532+
const initP = testClient.initializeRemoteComms(
533+
'0xabcd',
534+
{},
535+
remoteHandler,
536+
giveUpHandler,
537+
'local-incarnation',
538+
incarnationHandler,
539+
);
540+
await testStream.receiveInput(makeNullReply('m1'));
541+
await initP;
542+
543+
await testStream.receiveInput(
544+
new MessageEvent('message', {
545+
data: {
546+
id: 'm2',
547+
jsonrpc: '2.0',
548+
method: 'remoteIncarnationChange',
549+
params: {
550+
peerId: 'peer-456',
551+
observedIncarnation: 'incarnation-X',
552+
},
553+
},
554+
}),
555+
);
556+
await delay(50);
557+
558+
expect(incarnationHandler).toHaveBeenCalledExactlyOnceWith(
559+
'peer-456',
560+
'incarnation-X',
561+
);
562+
const successResponse = outputs.find(
563+
(message) =>
564+
message.payload?.id === 'm2' &&
565+
'result' in message.payload &&
566+
message.payload.result === true,
567+
);
568+
expect(successResponse).toBeDefined();
569+
});
570+
571+
it('returns false when no handler is registered', async () => {
572+
const outputs: MessageEventWithPayload[] = [];
573+
const newStream = await TestDuplexStream.make((message) => {
574+
outputs.push(message as unknown as MessageEventWithPayload);
575+
});
576+
// eslint-disable-next-line no-new -- test setup
577+
new PlatformServicesClient(
578+
newStream as unknown as PlatformServicesClientStream,
579+
);
580+
581+
await newStream.receiveInput(
582+
new MessageEvent('message', {
583+
data: {
584+
id: 'm1',
585+
jsonrpc: '2.0',
586+
method: 'remoteIncarnationChange',
587+
params: {
588+
peerId: 'peer-789',
589+
observedIncarnation: 'incarnation-Y',
590+
},
591+
},
592+
}),
593+
);
594+
await delay(10);
595+
596+
const response = outputs.find(
597+
(message) =>
598+
message.payload?.id === 'm1' &&
599+
'result' in message.payload &&
600+
message.payload.result === false,
601+
);
602+
expect(response).toBeDefined();
603+
});
604+
605+
it('coerces non-boolean handler return to true (fail closed)', async () => {
606+
const outputs: MessageEventWithPayload[] = [];
607+
const testStream = await TestDuplexStream.make((message) => {
608+
outputs.push(message as unknown as MessageEventWithPayload);
609+
});
610+
const testClient = new PlatformServicesClient(
611+
testStream as unknown as PlatformServicesClientStream,
612+
clientLogger,
613+
);
614+
await delay(10);
615+
616+
const remoteHandler = vi.fn(async () => 'response');
617+
// Handler returns a non-boolean truthy value (a buggy caller).
618+
const incarnationHandler = vi.fn(
619+
async () => 'oops' as unknown as boolean,
620+
);
621+
const initP = testClient.initializeRemoteComms(
622+
'0xabcd',
623+
{},
624+
remoteHandler,
625+
undefined,
626+
'local-incarnation',
627+
incarnationHandler,
628+
);
629+
await testStream.receiveInput(makeNullReply('m1'));
630+
await initP;
631+
632+
await testStream.receiveInput(
633+
new MessageEvent('message', {
634+
data: {
635+
id: 'm2',
636+
jsonrpc: '2.0',
637+
method: 'remoteIncarnationChange',
638+
params: {
639+
peerId: 'peer-789',
640+
observedIncarnation: 'incarnation-Z',
641+
},
642+
},
643+
}),
644+
);
645+
await delay(50);
646+
647+
// Fail closed → resolve to true so the transport drops the outbound.
648+
const response = outputs.find(
649+
(message) =>
650+
message.payload?.id === 'm2' &&
651+
'result' in message.payload &&
652+
message.payload.result === true,
653+
);
654+
expect(response).toBeDefined();
655+
});
656+
657+
it('coerces a throwing handler to true (fail closed)', async () => {
658+
const outputs: MessageEventWithPayload[] = [];
659+
const testStream = await TestDuplexStream.make((message) => {
660+
outputs.push(message as unknown as MessageEventWithPayload);
661+
});
662+
const testClient = new PlatformServicesClient(
663+
testStream as unknown as PlatformServicesClientStream,
664+
clientLogger,
665+
);
666+
await delay(10);
667+
668+
const remoteHandler = vi.fn(async () => 'response');
669+
const incarnationHandler = vi.fn(async () => {
670+
throw new Error('handler exploded');
671+
});
672+
const initP = testClient.initializeRemoteComms(
673+
'0xabcd',
674+
{},
675+
remoteHandler,
676+
undefined,
677+
'local-incarnation',
678+
incarnationHandler,
679+
);
680+
await testStream.receiveInput(makeNullReply('m1'));
681+
await initP;
682+
683+
await testStream.receiveInput(
684+
new MessageEvent('message', {
685+
data: {
686+
id: 'm2',
687+
jsonrpc: '2.0',
688+
method: 'remoteIncarnationChange',
689+
params: {
690+
peerId: 'peer-789',
691+
observedIncarnation: 'incarnation-Z',
692+
},
693+
},
694+
}),
695+
);
696+
await delay(50);
697+
698+
const response = outputs.find(
699+
(message) =>
700+
message.payload?.id === 'm2' &&
701+
'result' in message.payload &&
702+
message.payload.result === true,
703+
);
704+
expect(response).toBeDefined();
705+
});
706+
});
516707
});
517708
});
518709
});

packages/kernel-browser-runtime/src/PlatformServicesClient.ts

Lines changed: 68 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import type {
2525
PostMessageTarget,
2626
} from '@metamask/streams/browser';
2727
import { isJsonRpcResponse, isJsonRpcRequest } from '@metamask/utils';
28+
import type { JsonRpcRequest } from '@metamask/utils';
2829
import type { JsonRpcId } from '@metamask/utils';
2930

3031
// Appears in the docs.
@@ -329,16 +330,48 @@ export class PlatformServicesClient implements PlatformServices {
329330
}
330331

331332
/**
332-
* Handle a remote incarnation change notification from the server.
333+
* Forward an incarnationId observed during a peer handshake to the kernel
334+
* layer. Fires on every successful handshake; the kernel decides whether
335+
* it represents a peer restart and returns the verdict so the transport
336+
* can suppress stale outbound messages.
333337
*
334-
* @param peerId - The peer ID of the remote that restarted.
335-
* @returns A promise that resolves when handling is complete.
338+
* Fails closed: handler exceptions and non-boolean returns coerce to
339+
* `true` so the transport drops the outbound rather than letting a
340+
* potentially stale message through. Returns `false` only when no handler
341+
* is registered (transport has no kernel to consult, so the verdict is
342+
* effectively unknown — the receiver-side persisted check still gates
343+
* correctness).
344+
*
345+
* @param peerId - The peer that completed the handshake.
346+
* @param observedIncarnation - The incarnationId reported by the peer.
347+
* @returns Whether the kernel detected a peer restart.
336348
*/
337-
async #remoteIncarnationChange(peerId: string): Promise<null> {
338-
if (this.#remoteIncarnationChangeHandler) {
339-
this.#remoteIncarnationChangeHandler(peerId);
349+
async #remoteIncarnationChange(
350+
peerId: string,
351+
observedIncarnation: string,
352+
): Promise<boolean> {
353+
if (!this.#remoteIncarnationChangeHandler) {
354+
return false;
355+
}
356+
try {
357+
const verdict = await this.#remoteIncarnationChangeHandler(
358+
peerId,
359+
observedIncarnation,
360+
);
361+
if (typeof verdict !== 'boolean') {
362+
this.#logger.error(
363+
`incarnation handler returned non-boolean ${typeof verdict} for ${peerId.slice(0, 8)}; treating as restart`,
364+
);
365+
return true;
366+
}
367+
return verdict;
368+
} catch (error) {
369+
this.#logger.error(
370+
`incarnation handler threw for ${peerId.slice(0, 8)}; treating as restart:`,
371+
error,
372+
);
373+
return true;
340374
}
341-
return null;
342375
}
343376

344377
/**
@@ -383,22 +416,34 @@ export class PlatformServicesClient implements PlatformServices {
383416

384417
this.#rpcClient.handleResponse(id, event.data);
385418
} else if (isJsonRpcRequest(event.data)) {
386-
const { id, method, params } = event.data;
387-
try {
388-
this.#rpcServer.assertHasMethod(method);
389-
const result = await this.#rpcServer.execute(method, params);
390-
await this.#sendMessage({
391-
id,
392-
result,
393-
jsonrpc: '2.0',
394-
});
395-
} catch (error) {
396-
await this.#sendMessage({
397-
id,
398-
error: serializeError(error),
399-
jsonrpc: '2.0',
400-
});
401-
}
419+
// Run the request handler in the background instead of awaiting it
420+
// inside the drain. The drain processes responses too, and a request
421+
// handler that fires an outbound RPC back to the other side would
422+
// deadlock waiting for its response — the drain can't get to that
423+
// response until the request handler returns.
424+
this.#executeRequest(event.data).catch(() => undefined);
425+
}
426+
}
427+
428+
/**
429+
* Execute a JSON-RPC request and write the response back. Errors during
430+
* execution are serialized into a JSON-RPC error response; errors during
431+
* response delivery are swallowed.
432+
*
433+
* @param request - The JSON-RPC request to execute.
434+
*/
435+
async #executeRequest(request: JsonRpcRequest): Promise<void> {
436+
const { id, method, params } = request;
437+
try {
438+
this.#rpcServer.assertHasMethod(method);
439+
const result = await this.#rpcServer.execute(method, params);
440+
await this.#sendMessage({ id, result, jsonrpc: '2.0' });
441+
} catch (error) {
442+
await this.#sendMessage({
443+
id,
444+
error: serializeError(error),
445+
jsonrpc: '2.0',
446+
}).catch(() => undefined);
402447
}
403448
}
404449
}

0 commit comments

Comments
 (0)