From 71e1c55d2f15ddd1e9cecaa5413f0f73d4d9274c Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Fri, 29 May 2026 07:47:13 +0000 Subject: [PATCH 1/3] Test ipc --- packages/kernel/package.json | 1 + packages/kernel/source/ipc/emit.test.ts | 27 ++ packages/kernel/source/ipc/handler.test.ts | 113 ++++++ packages/kernel/source/ipc/subprocess.test.ts | 331 ++++++++++++++++++ pnpm-lock.yaml | 3 + 5 files changed, 475 insertions(+) create mode 100644 packages/kernel/source/ipc/emit.test.ts create mode 100644 packages/kernel/source/ipc/handler.test.ts create mode 100644 packages/kernel/source/ipc/subprocess.test.ts diff --git a/packages/kernel/package.json b/packages/kernel/package.json index 86d848cec..1aa17b062 100644 --- a/packages/kernel/package.json +++ b/packages/kernel/package.json @@ -56,6 +56,7 @@ "@types/split2": "4.2.3", "@types/tmp": "0.2.6", "capture-console": "1.0.2", + "esmock": "2.7.5", "moment-timezone": "0.6.2", "tmp": "0.2.5", "uvu": "0.5.6" diff --git a/packages/kernel/source/ipc/emit.test.ts b/packages/kernel/source/ipc/emit.test.ts new file mode 100644 index 000000000..ada293f11 --- /dev/null +++ b/packages/kernel/source/ipc/emit.test.ts @@ -0,0 +1,27 @@ +import { describe } from "@mainsail/test-runner"; +import esmock from "esmock"; + +// `emit` talks to the worker_threads parentPort, which is null on the main thread (where +// tests run). esmock lets us substitute a fake parentPort so the postMessage call is observable. +describe<{ + load: (parentPort: unknown) => Promise<{ emit: (event: string, data: unknown) => void }>; +}>("Emit", ({ assert, beforeEach, it }) => { + beforeEach((context) => { + context.load = (parentPort) => esmock("./emit", { worker_threads: { parentPort } }); + }); + + it("posts a {data, event} message to the parent port", async (context) => { + const posted: unknown[] = []; + const { emit } = await context.load({ postMessage: (message: unknown) => posted.push(message) }); + + emit("block.applied", { height: 10 }); + + assert.equal(posted, [{ data: { height: 10 }, event: "block.applied" }]); + }); + + it("is a no-op when there is no parent port", async (context) => { + const { emit } = await context.load(null); + + assert.not.throws(() => emit("block.applied", { height: 10 })); + }); +}); diff --git a/packages/kernel/source/ipc/handler.test.ts b/packages/kernel/source/ipc/handler.test.ts new file mode 100644 index 000000000..2472adeb0 --- /dev/null +++ b/packages/kernel/source/ipc/handler.test.ts @@ -0,0 +1,113 @@ +import { describe } from "@mainsail/test-runner"; +import { EventEmitter } from "events"; +import esmock from "esmock"; + +// The Handler listens on the worker_threads parentPort, which is null on the main thread +// where tests run. A fake EventEmitter parentPort lets us drive incoming messages and capture +// the replies the handler posts back. +class FakeParentPort extends EventEmitter { + public readonly posted: any[] = []; + + public postMessage(message: unknown): void { + this.posted.push(message); + } +} + +const flush = (): Promise => new Promise((resolve) => setImmediate(resolve)); + +describe<{ + parentPort: FakeParentPort; + makeHandler: (handler: T) => Promise; +}>("Handler", ({ assert, beforeEach, it }) => { + beforeEach((context) => { + context.parentPort = new FakeParentPort(); + context.makeHandler = async (handler) => { + const { Handler } = await esmock("./handler", { worker_threads: { parentPort: context.parentPort } }); + return new Handler(handler); + }; + }); + + it("registers a single message listener on the parent port", async (context) => { + await context.makeHandler({}); + + assert.equal(context.parentPort.listenerCount("message"), 1); + }); + + it("invokes the requested method and posts the result keyed by id", async (context) => { + await context.makeHandler({ add: (a: number, b: number) => a + b }); + + context.parentPort.emit("message", { args: [2, 3], id: "req-1", method: "add" }); + await flush(); + + assert.equal(context.parentPort.posted, [{ id: "req-1", result: 5 }]); + }); + + it("awaits async handler methods before replying", async (context) => { + await context.makeHandler({ slow: async () => "done" }); + + context.parentPort.emit("message", { args: [], id: "req-2", method: "slow" }); + await flush(); + + assert.equal(context.parentPort.posted, [{ id: "req-2", result: "done" }]); + }); + + it("forwards the handler's arguments in order", async (context) => { + let received: unknown[] = []; + await context.makeHandler({ record: (...arguments_: unknown[]) => (received = arguments_) }); + + context.parentPort.emit("message", { args: ["a", 1, { x: true }], id: "req-3", method: "record" }); + await flush(); + + assert.equal(received, ["a", 1, { x: true }]); + }); + + it("posts an error reply when the method is not defined on the handler", async (context) => { + await context.makeHandler({}); + + context.parentPort.emit("message", { args: [], id: "req-4", method: "missing" }); + await flush(); + + assert.equal(context.parentPort.posted, [ + { error: "Method missing is not defined on the handler", id: "req-4" }, + ]); + }); + + it("posts an error reply when the handler throws", async (context) => { + await context.makeHandler({ + boom: () => { + throw new Error("custom-error"); + }, + }); + + context.parentPort.emit("message", { args: [], id: "req-5", method: "boom" }); + await flush(); + + assert.equal(context.parentPort.posted, [{ error: "custom-error", id: "req-5" }]); + }); + + it("posts an error reply when an async handler rejects", async (context) => { + await context.makeHandler({ + boom: async () => { + throw new Error("custom-async-error"); + }, + }); + + context.parentPort.emit("message", { args: [], id: "req-6", method: "boom" }); + await flush(); + + assert.equal(context.parentPort.posted, [{ error: "custom-async-error", id: "req-6" }]); + }); + + it("normalizes a non-Error thrown value into an error reply", async (context) => { + await context.makeHandler({ + boom: () => { + throw "just a string"; + }, + }); + + context.parentPort.emit("message", { args: [], id: "req-7", method: "boom" }); + await flush(); + + assert.equal(context.parentPort.posted, [{ error: "just a string", id: "req-7" }]); + }); +}); diff --git a/packages/kernel/source/ipc/subprocess.test.ts b/packages/kernel/source/ipc/subprocess.test.ts new file mode 100644 index 000000000..e16900b86 --- /dev/null +++ b/packages/kernel/source/ipc/subprocess.test.ts @@ -0,0 +1,331 @@ +import { Identifiers, LogLevels } from "@mainsail/constants"; +import type { Contracts } from "@mainsail/contracts"; + +import { EventEmitter } from "events"; +import { PassThrough } from "stream"; + +import { describe } from "@mainsail/test-runner"; +import { Application } from "../application"; +import { Subprocess } from "./subprocess"; + +// Minimal stand-in for a worker_threads Worker. It is an EventEmitter (matching the real +// Worker's event surface), records every postMessage / terminate call, and exposes writable +// stdout/stderr streams so the split2 line piping can be exercised. +class FakeWorker extends EventEmitter { + public threadId = 42; + public readonly stdout = new PassThrough(); + public readonly stderr = new PassThrough(); + public readonly posted: any[] = []; + public terminateCalls = 0; + public terminateResult = 7; + + public postMessage(message: unknown): void { + this.posted.push(message); + } + + public async terminate(): Promise { + this.terminateCalls++; + return this.terminateResult; + } +} + +const flush = (): Promise => new Promise((resolve) => setImmediate(resolve)); + +type LogCalls = Record; + +describe<{ + app: Application; + worker: FakeWorker; + logCalls: LogCalls; + loggerContext: Contracts.Kernel.LoggerContext; + create: () => Subprocess; +}>("Subprocess", ({ assert, beforeEach, it, spy }) => { + beforeEach((context) => { + context.logCalls = {}; + const logger: Record void> = {}; + for (const level of [...LogLevels]) { + context.logCalls[level] = []; + logger[level] = (...arguments_: unknown[]) => context.logCalls[level].push(arguments_); + } + + context.app = new Application(); + context.app.bind(Identifiers.Services.Log.Service).toConstantValue(logger); + + context.worker = new FakeWorker(); + context.loggerContext = "evm"; + + context.create = () => new Subprocess(context.app, "crypto", context.loggerContext, context.worker as any); + }); + + it("constructor logs the spawned worker name including the thread id", (context) => { + context.create(); + + assert.equal(context.logCalls.debug[0], ["Spawning worker crypto-42"]); + }); + + it("captures the worker name at construction time so a later threadId reset is ignored", (context) => { + const subprocess = context.create(); + // Node resets threadId to -1 once a worker exits; the captured name must not change. + context.worker.threadId = -1; + + const promise = subprocess.sendRequest("noop"); + context.worker.emit("exit", 0); + + assert.equal(context.logCalls.debug.at(-1), ["Worker crypto-42 stopped with exit code 0"]); + + return assert.rejects(() => promise); + }); + + it("sendRequest posts a {args, id, method} message and resolves on the matching reply", async (context) => { + const subprocess = context.create(); + + const promise = subprocess.sendRequest("add", 1, 2); + + assert.equal(context.worker.posted[0], { args: [1, 2], id: 1, method: "add" }); + + context.worker.emit("message", { id: 1, result: 42 }); + + assert.equal(await promise, 42); + }); + + it("sendRequest increments the request id for each call", (context) => { + const subprocess = context.create(); + + void subprocess.sendRequest("a"); + void subprocess.sendRequest("b"); + void subprocess.sendRequest("c"); + + assert.equal( + context.worker.posted.map((message) => message.id), + [1, 2, 3], + ); + }); + + it("sendRequest rejects with an Error built from an error reply", async (context) => { + const subprocess = context.create(); + + const promise = subprocess.sendRequest("boom"); + context.worker.emit("message", { error: new Error("it failed"), id: 1 }); + + await assert.rejects(() => promise, "it failed"); + }); + + it("sendRequest deletes the callback once settled", async (context) => { + const subprocess = context.create(); + + const promise = subprocess.sendRequest("once"); + assert.equal(subprocess.getQueueSize(), 1); + + context.worker.emit("message", { id: 1, result: "ok" }); + await promise; + + assert.equal(subprocess.getQueueSize(), 0); + }); + + it("sendRequest rejects immediately once the worker is stopped", async (context) => { + const subprocess = context.create(); + await subprocess.kill(); + + await assert.rejects(() => subprocess.sendRequest("late"), "was killed"); + // Nothing should have been posted to a dead worker. + assert.equal(context.worker.posted, []); + }); + + it("onMessage ignores a reply whose id has no pending callback", (context) => { + const subprocess = context.create(); + + // No callback registered for id 99; the optional chaining must keep this from throwing. + assert.not.throws(() => context.worker.emit("message", { id: 99, result: "stray" })); + assert.equal(subprocess.getQueueSize(), 0); + }); + + it("getQueueSize reflects the number of in-flight requests", (context) => { + const subprocess = context.create(); + + assert.equal(subprocess.getQueueSize(), 0); + void subprocess.sendRequest("a"); + void subprocess.sendRequest("b"); + assert.equal(subprocess.getQueueSize(), 2); + }); + + it("kill marks the worker stopped and terminates the thread", async (context) => { + const subprocess = context.create(); + + assert.false(subprocess.isStopped()); + + const result = await subprocess.kill(); + + assert.true(subprocess.isStopped()); + assert.equal(result, 7); + assert.equal(context.worker.terminateCalls, 1); + }); + + it("dispose marks the worker stopped and terminates the thread", async (context) => { + const subprocess = context.create(); + + const result = await subprocess.dispose(); + + assert.true(subprocess.isStopped()); + assert.equal(result, 7); + assert.equal(context.worker.terminateCalls, 1); + }); + + it("a request issued after dispose rejects with the dispose reason", async (context) => { + const subprocess = context.create(); + await subprocess.dispose(); + + await assert.rejects(() => subprocess.sendRequest("late"), "is being disposed"); + }); + + it("drain resolves immediately when there are no in-flight requests", async (context) => { + const subprocess = context.create(); + + await assert.resolves(() => subprocess.drain()); + }); + + it("drain resolves once the last in-flight request is replied to", async (context) => { + const subprocess = context.create(); + + void subprocess.sendRequest("a"); + void subprocess.sendRequest("b"); + + let drained = false; + const drainPromise = subprocess.drain().then(() => (drained = true)); + + context.worker.emit("message", { id: 1, result: "ok" }); + await flush(); + assert.false(drained); + + context.worker.emit("message", { id: 2, result: "ok" }); + await drainPromise; + assert.true(drained); + }); + + it("drain resolves when pending requests are rejected by a stop", async (context) => { + const subprocess = context.create(); + + void subprocess.sendRequest("a").catch(() => {}); + const drainPromise = subprocess.drain(); + + context.worker.emit("exit", 1); + + await assert.resolves(() => drainPromise); + }); + + it("returns the same drain promise while a drain is pending", (context) => { + const subprocess = context.create(); + void subprocess.sendRequest("a"); + + const first = subprocess.drain(); + const second = subprocess.drain(); + + assert.equal(first, second); + }); + + it("registerEventHandler dispatches event messages to the handler", (context) => { + const subprocess = context.create(); + + const received: unknown[] = []; + subprocess.registerEventHandler("tick", (data) => received.push(data)); + + context.worker.emit("message", { data: { value: 1 }, event: "tick" }); + + assert.equal(received, [{ value: 1 }]); + }); + + it("ignores event messages with no registered handler", (context) => { + const subprocess = context.create(); + + assert.not.throws(() => context.worker.emit("message", { data: {}, event: "unknown" })); + }); + + it("a worker error logs, stops the worker, and rejects pending requests", async (context) => { + const subprocess = context.create(); + + const promise = subprocess.sendRequest("inflight"); + context.worker.emit("error", new Error("worker crashed")); + + assert.true(subprocess.isStopped()); + assert.equal(context.logCalls.error.at(-1), ["Worker crypto-42 error: worker crashed"]); + + await assert.rejects(() => promise, "worker crashed"); + }); + + it("a worker exit logs and stops the worker", async (context) => { + const subprocess = context.create(); + + const promise = subprocess.sendRequest("inflight"); + context.worker.emit("exit", 3); + + assert.true(subprocess.isStopped()); + assert.equal(context.logCalls.debug.at(-1), ["Worker crypto-42 stopped with exit code 3"]); + + await assert.rejects(() => promise, "stopped with exit code 3"); + }); + + it("keeps the first stop reason when a crash is followed by an exit", async (context) => { + const subprocess = context.create(); + + const promise = subprocess.sendRequest("inflight"); + context.worker.emit("error", new Error("crash first")); + context.worker.emit("exit", 1); + + // The crash reason is more informative than the exit that follows it. + await assert.rejects(() => promise, "crash first"); + }); + + it("messageerror rejects pending requests without marking the worker stopped", async (context) => { + const subprocess = context.create(); + + const promise = subprocess.sendRequest("inflight"); + context.worker.emit("messageerror", new Error("bad payload")); + + // The worker stays alive after an undeserializable reply. + assert.false(subprocess.isStopped()); + assert.equal(context.logCalls.error.at(-1), [ + "Worker crypto-42 message could not be deserialized: bad payload", + ]); + assert.equal(subprocess.getQueueSize(), 0); + + await assert.rejects(() => promise, "bad payload"); + }); + + it("routes a recognised log line to the matching logger level with the logger context", async (context) => { + context.create(); + + context.worker.stdout.write("[debug] hello world\n"); + await flush(); + + assert.equal(context.logCalls.debug.at(-1), ["hello world", "evm"]); + }); + + it("routes an unknown log level to a warning", async (context) => { + context.create(); + + context.worker.stdout.write("[trace] something\n"); + await flush(); + + assert.equal(context.logCalls.warn.at(-1), ["[unknown:trace] something"]); + }); + + it("falls back to console.log for stdout that does not match the log format", async (context) => { + context.create(); + + const consoleSpy = spy(console, "log"); + + context.worker.stdout.write("plain output without level\n"); + await flush(); + + consoleSpy.calledWith("plain output without level"); + consoleSpy.restore(); + }); + + it("routes stderr lines to the logger error level", async (context) => { + context.create(); + + context.worker.stderr.write("a fatal stderr line\n"); + await flush(); + + assert.equal(context.logCalls.error.at(-1), ["a fatal stderr line"]); + }); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 07fc34c97..51f8b8679 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -2411,6 +2411,9 @@ importers: capture-console: specifier: 1.0.2 version: 1.0.2 + esmock: + specifier: 2.7.5 + version: 2.7.5 moment-timezone: specifier: 0.6.2 version: 0.6.2 From 89b9f0165956bfb4a0034cf2346236a5e0f52a56 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Fri, 29 May 2026 07:50:10 +0000 Subject: [PATCH 2/3] Remove handleRequest form handler --- packages/contracts/source/contracts/kernel/ipc.ts | 4 ---- packages/kernel/source/ipc/handler.ts | 8 +------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/packages/contracts/source/contracts/kernel/ipc.ts b/packages/contracts/source/contracts/kernel/ipc.ts index 3ae8e58aa..8550ad379 100644 --- a/packages/contracts/source/contracts/kernel/ipc.ts +++ b/packages/contracts/source/contracts/kernel/ipc.ts @@ -35,10 +35,6 @@ export type RequestCallbacks = RequestCallback; export type EventCallback = (data: T) => void; -export interface Handler { - handleRequest>(method: K): void; -} - export interface Subprocess { dispose(): Promise; drain(): Promise; diff --git a/packages/kernel/source/ipc/handler.ts b/packages/kernel/source/ipc/handler.ts index a7f21311e..a5eef9af8 100644 --- a/packages/kernel/source/ipc/handler.ts +++ b/packages/kernel/source/ipc/handler.ts @@ -1,18 +1,12 @@ -import type { Contracts } from "@mainsail/contracts"; - import { ensureError } from "@mainsail/utils"; import { parentPort } from "worker_threads"; -export class Handler implements Contracts.Kernel.IPC.Handler { +export class Handler { private readonly handler: T; public constructor(handler: T) { this.handler = handler; - this.handleRequest(); - } - - public handleRequest(): void { parentPort?.on("message", (message) => { void this.#onMessage(message); }); From 62b6045a9aceccc3bb80cd628c2c3f97a1e1933c Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Fri, 29 May 2026 08:01:02 +0000 Subject: [PATCH 3/3] Remove defaults --- packages/transaction-pool-worker/source/defaults.ts | 1 - 1 file changed, 1 deletion(-) delete mode 100644 packages/transaction-pool-worker/source/defaults.ts diff --git a/packages/transaction-pool-worker/source/defaults.ts b/packages/transaction-pool-worker/source/defaults.ts deleted file mode 100644 index 0e40a8a2f..000000000 --- a/packages/transaction-pool-worker/source/defaults.ts +++ /dev/null @@ -1 +0,0 @@ -export const defaults = {};