From 73568a17523d35bc3a00606bf23858c354c6e77d Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 12:32:40 +0000 Subject: [PATCH 01/37] Fix boot --- packages/crypto-worker/source/worker.ts | 22 +++++-------------- packages/evm-api-worker/source/worker.ts | 9 ++++---- .../transaction-pool-worker/source/worker.ts | 9 ++++---- 3 files changed, 13 insertions(+), 27 deletions(-) diff --git a/packages/crypto-worker/source/worker.ts b/packages/crypto-worker/source/worker.ts index 71a65b879..3c7d8cee7 100644 --- a/packages/crypto-worker/source/worker.ts +++ b/packages/crypto-worker/source/worker.ts @@ -2,7 +2,6 @@ import type { Contracts } from "@mainsail/contracts"; import { Identifiers } from "@mainsail/constants"; import { inject, injectable } from "@mainsail/container"; -import { sleep } from "@mainsail/utils"; @injectable() export class Worker implements Contracts.Crypto.Worker { @@ -11,26 +10,15 @@ export class Worker implements Contracts.Crypto.Worker { private ipcSubprocess!: Contracts.Crypto.WorkerSubprocess; - #booted = false; - #booting = false; + #bootPromise?: Promise; public async boot(flags: Contracts.Crypto.WorkerFlags): Promise { - this.ipcSubprocess = this.createWorkerSubprocess(); - - while (this.#booting) { - await sleep(50); - } - - if (this.#booted) { - return; + if (!this.#bootPromise) { + this.ipcSubprocess = this.createWorkerSubprocess(); + this.#bootPromise = this.ipcSubprocess.sendRequest("boot", flags); } - this.#booting = true; - - await this.ipcSubprocess.sendRequest("boot", flags); - - this.#booting = false; - this.#booted = true; + await this.#bootPromise; } public async kill(): Promise { diff --git a/packages/evm-api-worker/source/worker.ts b/packages/evm-api-worker/source/worker.ts index b7b8e581b..941b731f7 100644 --- a/packages/evm-api-worker/source/worker.ts +++ b/packages/evm-api-worker/source/worker.ts @@ -16,7 +16,7 @@ export class Worker implements Contracts.Evm.Worker { private ipcSubprocess!: Contracts.Evm.WorkerSubprocess; - #booted = false; + #bootPromise?: Promise; @postConstruct() public initialize(): void { @@ -35,12 +35,11 @@ export class Worker implements Contracts.Evm.Worker { } public async boot(flags: Contracts.Evm.WorkerFlags): Promise { - if (this.#booted) { - return; + if (!this.#bootPromise) { + this.#bootPromise = this.ipcSubprocess.sendRequest("boot", flags); } - this.#booted = true; - await this.ipcSubprocess.sendRequest("boot", flags); + await this.#bootPromise; } public async kill(): Promise { diff --git a/packages/transaction-pool-worker/source/worker.ts b/packages/transaction-pool-worker/source/worker.ts index a7da9e1a7..f9c03313d 100644 --- a/packages/transaction-pool-worker/source/worker.ts +++ b/packages/transaction-pool-worker/source/worker.ts @@ -17,7 +17,7 @@ export class Worker implements Contracts.TransactionPool.Worker { private ipcSubprocess!: Contracts.TransactionPool.WorkerSubprocess; - #booted = false; + #bootPromise?: Promise; @postConstruct() public initialize(): void { @@ -37,12 +37,11 @@ export class Worker implements Contracts.TransactionPool.Worker { } public async boot(flags: Contracts.TransactionPool.WorkerFlags): Promise { - if (this.#booted) { - return; + if (!this.#bootPromise) { + this.#bootPromise = this.ipcSubprocess.sendRequest("boot", flags); } - this.#booted = true; - await this.ipcSubprocess.sendRequest("boot", flags); + await this.#bootPromise; } public async kill(): Promise { From 8d394d7ae0070e1cb095fbc9de903477362a45ba Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 12:33:53 +0000 Subject: [PATCH 02/37] Use post construct --- packages/crypto-worker/source/worker.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/crypto-worker/source/worker.ts b/packages/crypto-worker/source/worker.ts index 3c7d8cee7..0a30efd55 100644 --- a/packages/crypto-worker/source/worker.ts +++ b/packages/crypto-worker/source/worker.ts @@ -1,7 +1,7 @@ import type { Contracts } from "@mainsail/contracts"; import { Identifiers } from "@mainsail/constants"; -import { inject, injectable } from "@mainsail/container"; +import { inject, injectable, postConstruct } from "@mainsail/container"; @injectable() export class Worker implements Contracts.Crypto.Worker { @@ -12,9 +12,13 @@ export class Worker implements Contracts.Crypto.Worker { #bootPromise?: Promise; + @postConstruct() + public initialize(): void { + this.ipcSubprocess = this.createWorkerSubprocess(); + } + public async boot(flags: Contracts.Crypto.WorkerFlags): Promise { if (!this.#bootPromise) { - this.ipcSubprocess = this.createWorkerSubprocess(); this.#bootPromise = this.ipcSubprocess.sendRequest("boot", flags); } From a7537efc769a6eb7f84626a14b9770a2840afeff Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 12:36:43 +0000 Subject: [PATCH 03/37] Limit default workers to 4 --- packages/core/bin/config/devnet/core/.env | 1 - packages/crypto-worker/source/defaults.ts | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/core/bin/config/devnet/core/.env b/packages/core/bin/config/devnet/core/.env index dca0a0bcc..51afd0874 100644 --- a/packages/core/bin/config/devnet/core/.env +++ b/packages/core/bin/config/devnet/core/.env @@ -28,5 +28,4 @@ MAINSAIL_API_TRANSACTION_POOL_DISABLED= MAINSAIL_API_TRANSACTION_POOL_HOST=0.0.0.0 MAINSAIL_API_TRANSACTION_POOL_PORT=4007 -MAINSAIL_CRYPTO_WORKER_COUNT=2 MAINSAIL_CRYPTO_WORKER_LOGGING_ENABLED= diff --git a/packages/crypto-worker/source/defaults.ts b/packages/crypto-worker/source/defaults.ts index 94fd83a5c..57497d178 100644 --- a/packages/crypto-worker/source/defaults.ts +++ b/packages/crypto-worker/source/defaults.ts @@ -3,6 +3,6 @@ import { Environment } from "@mainsail/kernel"; import { cpus } from "os"; export const defaults = { - workerCount: Environment.get(EnvironmentVariables.MAINSAIL_CRYPTO_WORKER_COUNT, cpus().length), + workerCount: Environment.get(EnvironmentVariables.MAINSAIL_CRYPTO_WORKER_COUNT, Math.min(cpus().length, 4)), workerLoggingEnabled: Environment.isTrue(EnvironmentVariables.MAINSAIL_CRYPTO_WORKER_LOGGING_ENABLED), }; From 3e29db4bd8d86bdb569046cbad85056b444e6146 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 12:42:20 +0000 Subject: [PATCH 04/37] Move method is not defined to try catch --- packages/kernel/source/ipc/handler.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/kernel/source/ipc/handler.ts b/packages/kernel/source/ipc/handler.ts index 3c29d8963..b3fcafcbe 100644 --- a/packages/kernel/source/ipc/handler.ts +++ b/packages/kernel/source/ipc/handler.ts @@ -18,11 +18,11 @@ export class Handler implements Contracts.Kernel.IPC.Handler Date: Wed, 27 May 2026 12:48:02 +0000 Subject: [PATCH 05/37] Handle errors --- packages/kernel/source/ipc/subprocess.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index a3840dd55..6f7bbc72e 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -21,6 +21,10 @@ export class Subprocess = Record this.rejectPending(error)); + this.subprocess.on("exit", (code) => + this.rejectPending(new Error(`Worker stopped with exit code ${code}`)), + ); const logger = app.get(Identifiers.Services.Log.Service); @@ -69,6 +73,13 @@ export class Subprocess = Record); } + private rejectPending(error: Error): void { + for (const { reject } of this.callbacks.values()) { + reject(error); + } + this.callbacks.clear(); + } + private onEmit(message: Contracts.Kernel.IPC.Event): void { if (!("event" in message)) { return; From b0497f50c69eac341ff8e526f31ab47d84e31b1b Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 12:48:46 +0000 Subject: [PATCH 06/37] messageerror --- packages/kernel/source/ipc/subprocess.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index 6f7bbc72e..6108ed200 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -25,6 +25,13 @@ export class Subprocess = Record this.rejectPending(new Error(`Worker stopped with exit code ${code}`)), ); + // A reply that fails to deserialize cannot be matched back to its request id, + // so the pending callback can never be settled. Reject everything in flight to + // avoid a silent hang rather than leaking the stuck request. + this.subprocess.on("messageerror", (error: Error) => { + logger.error(`Worker message could not be deserialized: ${error.message}`); + this.rejectPending(error); + }); const logger = app.get(Identifiers.Services.Log.Service); From 828c2a36e6697b78df33b189b7386eb0ebb52338 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 12:52:34 +0000 Subject: [PATCH 07/37] Log on error --- packages/kernel/source/ipc/subprocess.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index 6108ed200..88cd85742 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -21,7 +21,10 @@ export class Subprocess = Record this.rejectPending(error)); + this.subprocess.on("error", (error: Error) => { + logger.error(`Worker error: ${error.message}`); + this.rejectPending(error); + }); this.subprocess.on("exit", (code) => this.rejectPending(new Error(`Worker stopped with exit code ${code}`)), ); From 66cd5ad9d4d8d599ed392be87bc99e2ea3851431 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 12:54:56 +0000 Subject: [PATCH 08/37] Add thread id --- packages/kernel/source/ipc/subprocess.ts | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index 88cd85742..b1657572a 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -19,20 +19,25 @@ export class Subprocess = Record { - logger.error(`Worker error: ${error.message}`); + logger.error(`Worker ${threadId} error: ${error.message}`); this.rejectPending(error); }); - this.subprocess.on("exit", (code) => - this.rejectPending(new Error(`Worker stopped with exit code ${code}`)), - ); + this.subprocess.on("exit", (code) => { + logger.debug(`Worker ${threadId} stopped with exit code ${code}`); + this.rejectPending(new Error(`Worker stopped with exit code ${code}`)); + }); // A reply that fails to deserialize cannot be matched back to its request id, // so the pending callback can never be settled. Reject everything in flight to // avoid a silent hang rather than leaking the stuck request. this.subprocess.on("messageerror", (error: Error) => { - logger.error(`Worker message could not be deserialized: ${error.message}`); + logger.error(`Worker ${threadId} message could not be deserialized: ${error.message}`); this.rejectPending(error); }); From 63d658b8cc90a852159a7f3d60b732381277e698 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 13:02:57 +0000 Subject: [PATCH 09/37] Low worker names --- packages/crypto-worker/source/service-provider.ts | 2 +- packages/evm-api-worker/source/service-provider.ts | 2 +- packages/kernel/source/ipc/subprocess.ts | 11 ++++++----- .../source/service-provider.ts | 2 +- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/packages/crypto-worker/source/service-provider.ts b/packages/crypto-worker/source/service-provider.ts index f65f76123..f0f3fb3d8 100644 --- a/packages/crypto-worker/source/service-provider.ts +++ b/packages/crypto-worker/source/service-provider.ts @@ -29,7 +29,7 @@ export class ServiceProvider extends Providers.ServiceProvider { stderr: true, stdout: true, }); - return new Ipc.Subprocess(this.app, "system", subprocess); + return new Ipc.Subprocess(this.app, "crypto", "system", subprocess); }); } diff --git a/packages/evm-api-worker/source/service-provider.ts b/packages/evm-api-worker/source/service-provider.ts index 00d8aff66..6885d5e5c 100644 --- a/packages/evm-api-worker/source/service-provider.ts +++ b/packages/evm-api-worker/source/service-provider.ts @@ -19,7 +19,7 @@ export class ServiceProvider extends Providers.ServiceProvider { stderr: true, stdout: true, }); - return new Ipc.Subprocess(this.app, "api", subprocess); + return new Ipc.Subprocess(this.app, "evm-api", "api", subprocess); }); this.app.bind(Identifiers.Evm.Worker).toConstantValue(this.app.resolve(WorkerInstance)); diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index b1657572a..eb282e2bf 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -15,29 +15,30 @@ export class Subprocess = Record { - logger.error(`Worker ${threadId} error: ${error.message}`); + logger.error(`Worker ${workerName} error: ${error.message}`); this.rejectPending(error); }); this.subprocess.on("exit", (code) => { - logger.debug(`Worker ${threadId} stopped with exit code ${code}`); - this.rejectPending(new Error(`Worker stopped with exit code ${code}`)); + logger.debug(`Worker ${workerName} stopped with exit code ${code}`); + this.rejectPending(new Error(`Worker ${workerName} stopped with exit code ${code}`)); }); // A reply that fails to deserialize cannot be matched back to its request id, // so the pending callback can never be settled. Reject everything in flight to // avoid a silent hang rather than leaking the stuck request. this.subprocess.on("messageerror", (error: Error) => { - logger.error(`Worker ${threadId} message could not be deserialized: ${error.message}`); + logger.error(`Worker ${workerName} message could not be deserialized: ${error.message}`); this.rejectPending(error); }); diff --git a/packages/transaction-pool-worker/source/service-provider.ts b/packages/transaction-pool-worker/source/service-provider.ts index ac9db042e..aad0a0efb 100644 --- a/packages/transaction-pool-worker/source/service-provider.ts +++ b/packages/transaction-pool-worker/source/service-provider.ts @@ -21,7 +21,7 @@ export class ServiceProvider extends Providers.ServiceProvider { stderr: true, stdout: true, }); - return new Ipc.Subprocess(this.app, "tx-pool", subprocess); + return new Ipc.Subprocess(this.app, "transaction-pool", "tx-pool", subprocess); }); this.app.bind(Identifiers.TransactionPool.Worker).toConstantValue(this.app.resolve(WorkerInstance)); From 508b436b89c50cb0bfcdc458494cea214ef960f0 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 13:06:24 +0000 Subject: [PATCH 10/37] Add spawning log --- packages/kernel/source/ipc/subprocess.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index eb282e2bf..f9e622e51 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -21,8 +21,11 @@ export class Subprocess = Record(Identifiers.Services.Log.Service); + // Capture the thread id up front: Node resets it to -1 once the worker exits. const workerName = `${name}-${this.subprocess.threadId}`; + logger.debug(`Spawning worker ${workerName}`); this.subprocess.on("message", this.onSubprocessMessage.bind(this)); this.subprocess.on("message", this.onEmit.bind(this)); @@ -42,7 +45,6 @@ export class Subprocess = Record(Identifiers.Services.Log.Service); this.subprocess.stdout.pipe(split()).on("data", (line) => { // [LEVEL] MESSAGE From d8cbfeb201596fcc8c17e912f4d994ff82299e42 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 13:07:22 +0000 Subject: [PATCH 11/37] Change log position --- packages/crypto-worker/source/worker-pool.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/crypto-worker/source/worker-pool.ts b/packages/crypto-worker/source/worker-pool.ts index c637ac9c3..d076c37c0 100644 --- a/packages/crypto-worker/source/worker-pool.ts +++ b/packages/crypto-worker/source/worker-pool.ts @@ -25,13 +25,13 @@ export class WorkerPool implements Contracts.Crypto.WorkerPool { public async boot(): Promise { const workerCount = this.configuration.getRequired("workerCount"); + this.logger.info(`Booting up ${workerCount} crypto workers`); + for (let index = 0; index < workerCount; index++) { const worker = this.createWorker(); this.workers.push(worker); } - this.logger.info(`Booting up ${this.workers.length} crypto workers`); - await Promise.all( this.workers.map((worker) => worker.boot({ From 03577a884d0ff34909bab1ab6d2694b0a8dfc3f0 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 13:22:40 +0000 Subject: [PATCH 12/37] Improve worker selection --- packages/crypto-worker/source/worker-pool.ts | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/packages/crypto-worker/source/worker-pool.ts b/packages/crypto-worker/source/worker-pool.ts index d076c37c0..026fa9e5c 100644 --- a/packages/crypto-worker/source/worker-pool.ts +++ b/packages/crypto-worker/source/worker-pool.ts @@ -48,9 +48,24 @@ export class WorkerPool implements Contracts.Crypto.WorkerPool { } public async getWorker(): Promise { - const worker = this.workers[this.#currentWorkerIndex]; + // Pick the worker with the fewest in-flight requests. Scanning starts at a + // rotating cursor and only replaces the pick on a strictly smaller queue, so + // ties (e.g. all workers idle) fall back to round-robin and spread evenly. + let selected = this.workers[this.#currentWorkerIndex]; + let smallestQueueSize = selected.getQueueSize(); + + for (let offset = 1; offset < this.workers.length; offset++) { + const worker = this.workers[(this.#currentWorkerIndex + offset) % this.workers.length]; + const queueSize = worker.getQueueSize(); + + if (queueSize < smallestQueueSize) { + selected = worker; + smallestQueueSize = queueSize; + } + } + this.#currentWorkerIndex = (this.#currentWorkerIndex + 1) % this.workers.length; - return worker; + return selected; } } From c32ec2d24bdc85f7b1f93e3c66f888024f1ee0cf Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 13:24:05 +0000 Subject: [PATCH 13/37] Workers are private --- packages/crypto-worker/source/worker-pool.ts | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/packages/crypto-worker/source/worker-pool.ts b/packages/crypto-worker/source/worker-pool.ts index 026fa9e5c..7c9fa6406 100644 --- a/packages/crypto-worker/source/worker-pool.ts +++ b/packages/crypto-worker/source/worker-pool.ts @@ -15,11 +15,10 @@ export class WorkerPool implements Contracts.Crypto.WorkerPool { @inject(Identifiers.CryptoWorker.Worker.Factory) private readonly createWorker!: Contracts.Crypto.WorkerFactory; - private workers: Contracts.Crypto.Worker[] = []; - @inject(Identifiers.Config.Flags) private readonly flags!: Contracts.Types.KeyValuePair; + #workers: Contracts.Crypto.Worker[] = []; #currentWorkerIndex = 0; public async boot(): Promise { @@ -29,11 +28,11 @@ export class WorkerPool implements Contracts.Crypto.WorkerPool { for (let index = 0; index < workerCount; index++) { const worker = this.createWorker(); - this.workers.push(worker); + this.#workers.push(worker); } await Promise.all( - this.workers.map((worker) => + this.#workers.map((worker) => worker.boot({ ...this.flags, thread: "crypto-worker", @@ -44,18 +43,18 @@ export class WorkerPool implements Contracts.Crypto.WorkerPool { } public async shutdown(): Promise { - await Promise.all(this.workers.map(async (worker) => await worker.kill())); + await Promise.all(this.#workers.map(async (worker) => await worker.kill())); } public async getWorker(): Promise { // Pick the worker with the fewest in-flight requests. Scanning starts at a // rotating cursor and only replaces the pick on a strictly smaller queue, so // ties (e.g. all workers idle) fall back to round-robin and spread evenly. - let selected = this.workers[this.#currentWorkerIndex]; + let selected = this.#workers[this.#currentWorkerIndex]; let smallestQueueSize = selected.getQueueSize(); - for (let offset = 1; offset < this.workers.length; offset++) { - const worker = this.workers[(this.#currentWorkerIndex + offset) % this.workers.length]; + for (let offset = 1; offset < this.#workers.length; offset++) { + const worker = this.#workers[(this.#currentWorkerIndex + offset) % this.#workers.length]; const queueSize = worker.getQueueSize(); if (queueSize < smallestQueueSize) { @@ -64,7 +63,7 @@ export class WorkerPool implements Contracts.Crypto.WorkerPool { } } - this.#currentWorkerIndex = (this.#currentWorkerIndex + 1) % this.workers.length; + this.#currentWorkerIndex = (this.#currentWorkerIndex + 1) % this.#workers.length; return selected; } From f61c2d6bc381b254b956002440793c046a753bb7 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 13:26:52 +0000 Subject: [PATCH 14/37] Handle no workers available --- packages/crypto-worker/source/worker-pool.ts | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/packages/crypto-worker/source/worker-pool.ts b/packages/crypto-worker/source/worker-pool.ts index 7c9fa6406..f2132d9af 100644 --- a/packages/crypto-worker/source/worker-pool.ts +++ b/packages/crypto-worker/source/worker-pool.ts @@ -26,13 +26,15 @@ export class WorkerPool implements Contracts.Crypto.WorkerPool { this.logger.info(`Booting up ${workerCount} crypto workers`); + const workers: Contracts.Crypto.Worker[] = []; + for (let index = 0; index < workerCount; index++) { const worker = this.createWorker(); - this.#workers.push(worker); + workers.push(worker); } await Promise.all( - this.#workers.map((worker) => + workers.map((worker) => worker.boot({ ...this.flags, thread: "crypto-worker", @@ -40,13 +42,21 @@ export class WorkerPool implements Contracts.Crypto.WorkerPool { }), ), ); + + this.#workers = workers; } public async shutdown(): Promise { - await Promise.all(this.#workers.map(async (worker) => await worker.kill())); + const workers = this.#workers; + this.#workers = []; + await Promise.all(workers.map(async (worker) => await worker.kill())); } public async getWorker(): Promise { + if(this.#workers.length === 0) { + throw new Error("No crypto workers available"); + } + // Pick the worker with the fewest in-flight requests. Scanning starts at a // rotating cursor and only replaces the pick on a strictly smaller queue, so // ties (e.g. all workers idle) fall back to round-robin and spread evenly. From 38f62e088be363ef7c73867335d48af7b5cbeb15 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 13:32:21 +0000 Subject: [PATCH 15/37] Make get worker async --- packages/consensus/source/aggregator.ts | 4 ++-- packages/consensus/source/processors/message-processor.ts | 2 +- packages/contracts/source/contracts/crypto/worker.ts | 2 +- packages/crypto-messages/source/factory.ts | 2 +- packages/crypto-proposal/source/factory.ts | 2 +- packages/crypto-transaction/source/factory.ts | 2 +- packages/crypto-worker/source/worker-pool.ts | 4 ++-- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/packages/consensus/source/aggregator.ts b/packages/consensus/source/aggregator.ts index 861f59636..6b9389117 100644 --- a/packages/consensus/source/aggregator.ts +++ b/packages/consensus/source/aggregator.ts @@ -29,7 +29,7 @@ export class Aggregator implements Contracts.Consensus.Aggregator { validators[key] = true; } - const worker = await this.workerPool.getWorker(); + const worker = this.workerPool.getWorker(); const signature = await worker.consensusSignature("aggregate", signatures); return { @@ -51,7 +51,7 @@ export class Aggregator implements Contracts.Consensus.Aggregator { return false; } - const worker = await this.workerPool.getWorker(); + const worker = this.workerPool.getWorker(); const aggregatedPublicKey = await worker.publicKeyFactory("aggregate", validatorPublicKeys); diff --git a/packages/consensus/source/processors/message-processor.ts b/packages/consensus/source/processors/message-processor.ts index feabec51e..7db849aee 100644 --- a/packages/consensus/source/processors/message-processor.ts +++ b/packages/consensus/source/processors/message-processor.ts @@ -106,7 +106,7 @@ export class MessageProcessor extends AbstractProcessor implements Contracts.Con } async #hasValidSignature(message: Contracts.Crypto.Message): Promise { - const worker = await this.workerPool.getWorker(); + const worker = this.workerPool.getWorker(); return worker.consensusSignature( "verify", Buffer.from(message.signature, "hex"), diff --git a/packages/contracts/source/contracts/crypto/worker.ts b/packages/contracts/source/contracts/crypto/worker.ts index 6514ec34e..4af368452 100644 --- a/packages/contracts/source/contracts/crypto/worker.ts +++ b/packages/contracts/source/contracts/crypto/worker.ts @@ -46,5 +46,5 @@ export interface Worker extends WorkerScriptHandler { export interface WorkerPool { boot(): Promise; shutdown(): Promise; - getWorker(): Promise; + getWorker(): Worker; } diff --git a/packages/crypto-messages/source/factory.ts b/packages/crypto-messages/source/factory.ts index ae39f4776..961533cec 100644 --- a/packages/crypto-messages/source/factory.ts +++ b/packages/crypto-messages/source/factory.ts @@ -25,7 +25,7 @@ export class Factory implements Contracts.Crypto.MessageFactory { keyPair: Contracts.Crypto.KeyPair, context: Contracts.Crypto.SignatureMessageContext, ): Promise { - const worker = await this.workerPool.getWorker(); + const worker = this.workerPool.getWorker(); const bytes = await this.serializer.serializeMessageForSignature(data, context); const signature = await worker.consensusSignature("sign", bytes, Buffer.from(keyPair.privateKey, "hex")); diff --git a/packages/crypto-proposal/source/factory.ts b/packages/crypto-proposal/source/factory.ts index a6da2cf6b..ba9ddc316 100644 --- a/packages/crypto-proposal/source/factory.ts +++ b/packages/crypto-proposal/source/factory.ts @@ -31,7 +31,7 @@ export class Factory implements Contracts.Crypto.ProposalFactory { data: Contracts.Crypto.ProposalDataSerializableUnsigned, keyPair: Contracts.Crypto.KeyPair, ): Promise { - const worker = await this.workerPool.getWorker(); + const worker = this.workerPool.getWorker(); this.#verifySchema("proposalUnsigned", data); diff --git a/packages/crypto-transaction/source/factory.ts b/packages/crypto-transaction/source/factory.ts index 43b805313..c26f1801d 100644 --- a/packages/crypto-transaction/source/factory.ts +++ b/packages/crypto-transaction/source/factory.ts @@ -122,7 +122,7 @@ export class TransactionFactory implements Contracts.Crypto.TransactionFactory { try { const { data: transaction } = await this.deserializer.deserialize(serialized); - const worker = this.workerPool ? await this.workerPool.getWorker() : undefined; + const worker = this.workerPool ? this.workerPool.getWorker() : undefined; const cryptoData = worker ? await worker.transactionFactory("computeCryptoData", transaction) : await this.computeCryptoData(transaction); diff --git a/packages/crypto-worker/source/worker-pool.ts b/packages/crypto-worker/source/worker-pool.ts index f2132d9af..8c0793a2f 100644 --- a/packages/crypto-worker/source/worker-pool.ts +++ b/packages/crypto-worker/source/worker-pool.ts @@ -52,8 +52,8 @@ export class WorkerPool implements Contracts.Crypto.WorkerPool { await Promise.all(workers.map(async (worker) => await worker.kill())); } - public async getWorker(): Promise { - if(this.#workers.length === 0) { + public getWorker(): Contracts.Crypto.Worker { + if (this.#workers.length === 0) { throw new Error("No crypto workers available"); } From 100468584bf7bd339c771034c52f6db3d3a88e98 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 13:36:30 +0000 Subject: [PATCH 16/37] Fix paths --- packages/crypto-worker/source/service-provider.ts | 4 ++-- packages/evm-api-worker/source/service-provider.ts | 3 ++- packages/transaction-pool-worker/source/service-provider.ts | 3 ++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/crypto-worker/source/service-provider.ts b/packages/crypto-worker/source/service-provider.ts index f0f3fb3d8..91a13b68d 100644 --- a/packages/crypto-worker/source/service-provider.ts +++ b/packages/crypto-worker/source/service-provider.ts @@ -5,7 +5,7 @@ import { injectable } from "@mainsail/container"; import { Ipc, Providers } from "@mainsail/kernel"; import Joi from "joi"; import { cpus } from "os"; -import { URL } from "url"; +import { fileURLToPath } from "url"; import { Worker } from "worker_threads"; import { WorkerPool } from "./worker-pool.js"; @@ -25,7 +25,7 @@ export class ServiceProvider extends Providers.ServiceProvider { this.app.bind(Identifiers.CryptoWorker.WorkerPool).to(WorkerPool).inSingletonScope(); this.app.bind<() => Ipc.Subprocess>(Identifiers.CryptoWorker.WorkerSubprocess.Factory).toFactory(() => () => { - const subprocess = new Worker(`${new URL(".", import.meta.url).pathname}/worker-script.js`, { + const subprocess = new Worker(fileURLToPath(new URL("worker-script.js", import.meta.url)), { stderr: true, stdout: true, }); diff --git a/packages/evm-api-worker/source/service-provider.ts b/packages/evm-api-worker/source/service-provider.ts index 6885d5e5c..ee292cc28 100644 --- a/packages/evm-api-worker/source/service-provider.ts +++ b/packages/evm-api-worker/source/service-provider.ts @@ -4,6 +4,7 @@ import { Identifiers } from "@mainsail/constants"; import { inject, injectable } from "@mainsail/container"; import { Ipc, Providers } from "@mainsail/kernel"; import Joi from "joi"; +import { fileURLToPath } from "url"; import { Worker } from "worker_threads"; import { Worker as WorkerInstance } from "./worker.js"; @@ -15,7 +16,7 @@ export class ServiceProvider extends Providers.ServiceProvider { public async register(): Promise { this.app.bind<() => Ipc.Subprocess>(Identifiers.Evm.WorkerSubprocess.Factory).toFactory(() => () => { - const subprocess = new Worker(`${new URL(".", import.meta.url).pathname}/worker-script.js`, { + const subprocess = new Worker(fileURLToPath(new URL("worker-script.js", import.meta.url)), { stderr: true, stdout: true, }); diff --git a/packages/transaction-pool-worker/source/service-provider.ts b/packages/transaction-pool-worker/source/service-provider.ts index aad0a0efb..bf7e1ad2c 100644 --- a/packages/transaction-pool-worker/source/service-provider.ts +++ b/packages/transaction-pool-worker/source/service-provider.ts @@ -4,6 +4,7 @@ import { Identifiers } from "@mainsail/constants"; import { inject, injectable } from "@mainsail/container"; import { Ipc, Providers } from "@mainsail/kernel"; import Joi from "joi"; +import { fileURLToPath } from "url"; import { Worker } from "worker_threads"; import { Worker as WorkerInstance } from "./worker.js"; @@ -17,7 +18,7 @@ export class ServiceProvider extends Providers.ServiceProvider { this.app .bind<() => Ipc.Subprocess>(Identifiers.TransactionPool.WorkerSubprocess.Factory) .toFactory(() => () => { - const subprocess = new Worker(`${new URL(".", import.meta.url).pathname}/worker-script.js`, { + const subprocess = new Worker(fileURLToPath(new URL("worker-script.js", import.meta.url)), { stderr: true, stdout: true, }); From 2781d0e4c3456c52d291125eedb5f90ae55e2795 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 13:51:06 +0000 Subject: [PATCH 17/37] Fix worker contracts --- .../source/contracts/crypto/worker.ts | 35 ++++++++++++---- .../crypto-worker/source/worker-handler.ts | 25 +++++------- packages/crypto-worker/source/worker.ts | 40 ++++++++++++------- packages/kernel/source/ipc/subprocess.ts | 1 - 4 files changed, 63 insertions(+), 38 deletions(-) diff --git a/packages/contracts/source/contracts/crypto/worker.ts b/packages/contracts/source/contracts/crypto/worker.ts index 4af368452..8d7549322 100644 --- a/packages/contracts/source/contracts/crypto/worker.ts +++ b/packages/contracts/source/contracts/crypto/worker.ts @@ -1,7 +1,7 @@ import type { BlockFactory } from "../crypto/block.js"; import type { PublicKeyFactory, SignatureBls, SignatureEcdsa } from "../crypto/identities.js"; import type { TransactionFactory } from "../crypto/transactions.js"; -import type { Requests, Subprocess } from "../kernel/ipc.js"; +import type { MethodArguments, Requests, Subprocess } from "../kernel/ipc.js"; import type { JsonObject } from "../types/index.js"; export interface WorkerFlags extends JsonObject { @@ -12,23 +12,23 @@ export interface WorkerScriptHandler { boot(flags: WorkerFlags): Promise; consensusSignature>( method: K, - ...arguments_: Parameters + arguments_: MethodArguments, ): Promise>; walletSignature>( method: K, - ...arguments_: Parameters + arguments_: MethodArguments, ): Promise>; blockFactory>( method: K, - ...arguments_: Parameters + arguments_: MethodArguments, ): Promise>; transactionFactory>( method: K, - ...arguments_: Parameters + arguments_: MethodArguments, ): Promise>; publicKeyFactory>( method: K, - ...arguments_: Parameters + arguments_: MethodArguments, ): Promise>; } @@ -38,9 +38,30 @@ export type WorkerSubprocess = Subprocess; export type WorkerSubprocessFactory = () => WorkerSubprocess; -export interface Worker extends WorkerScriptHandler { +export interface Worker { + boot(flags: WorkerFlags): Promise; getQueueSize(): number; kill(): Promise; + consensusSignature>( + method: K, + ...arguments_: Parameters + ): Promise>; + walletSignature>( + method: K, + ...arguments_: Parameters + ): Promise>; + blockFactory>( + method: K, + ...arguments_: Parameters + ): Promise>; + transactionFactory>( + method: K, + ...arguments_: Parameters + ): Promise>; + publicKeyFactory>( + method: K, + ...arguments_: Parameters + ): Promise>; } export interface WorkerPool { diff --git a/packages/crypto-worker/source/worker-handler.ts b/packages/crypto-worker/source/worker-handler.ts index fcf5425a8..46d5936a4 100644 --- a/packages/crypto-worker/source/worker-handler.ts +++ b/packages/crypto-worker/source/worker-handler.ts @@ -126,41 +126,36 @@ export class WorkerScriptHandler implements Contracts.Crypto.WorkerScriptHandler public async consensusSignature>( method: K, - ...arguments_: Parameters + arguments_: Contracts.Kernel.IPC.MethodArguments, ): Promise> { - // @ts-ignore - return this.#impl.callConsensusSignature(method, arguments_[0]); + return this.#impl.callConsensusSignature(method, arguments_); } public async walletSignature>( method: K, - ...arguments_: Parameters + arguments_: Contracts.Kernel.IPC.MethodArguments, ): Promise> { - // @ts-ignore - return this.#impl.callWalletSignature(method, arguments_[0]); + return this.#impl.callWalletSignature(method, arguments_); } public async blockFactory>( method: K, - ...arguments_: Parameters + arguments_: Contracts.Kernel.IPC.MethodArguments, ): Promise> { - // @ts-ignore - return this.#impl.callBlockFactory(method, arguments_[0]); + return this.#impl.callBlockFactory(method, arguments_); } public async transactionFactory>( method: K, - ...arguments_: Parameters + arguments_: Contracts.Kernel.IPC.MethodArguments, ): Promise> { - // @ts-ignore - return this.#impl.callTransactionFactory(method, arguments_[0]); + return this.#impl.callTransactionFactory(method, arguments_); } public async publicKeyFactory>( method: K, - ...arguments_: Parameters + arguments_: Contracts.Kernel.IPC.MethodArguments, ): Promise> { - // @ts-ignore - return this.#impl.callPublicKeyFactory(method, arguments_[0]); + return this.#impl.callPublicKeyFactory(method, arguments_); } } diff --git a/packages/crypto-worker/source/worker.ts b/packages/crypto-worker/source/worker.ts index 0a30efd55..54ac69c29 100644 --- a/packages/crypto-worker/source/worker.ts +++ b/packages/crypto-worker/source/worker.ts @@ -37,44 +37,54 @@ export class Worker implements Contracts.Crypto.Worker { method: K, ...arguments_: Parameters ): Promise> { - return this.ipcSubprocess.sendRequest("consensusSignature", method, arguments_) as Promise< - ReturnType - >; + return this.ipcSubprocess.sendRequest>( + "consensusSignature", + method, + arguments_, + ); } public async walletSignature>( method: K, ...arguments_: Parameters ): Promise> { - return this.ipcSubprocess.sendRequest("walletSignature", method, arguments_) as Promise< - ReturnType - >; + return this.ipcSubprocess.sendRequest>( + "walletSignature", + method, + arguments_, + ); } public async blockFactory>( method: K, ...arguments_: Parameters ): Promise> { - return this.ipcSubprocess.sendRequest("blockFactory", method, arguments_) as Promise< - ReturnType - >; + return this.ipcSubprocess.sendRequest>( + "blockFactory", + method, + arguments_, + ); } public async transactionFactory>( method: K, ...arguments_: Parameters ): Promise> { - return this.ipcSubprocess.sendRequest("transactionFactory", method, arguments_) as Promise< - ReturnType - >; + return this.ipcSubprocess.sendRequest>( + "transactionFactory", + method, + arguments_, + ); } public async publicKeyFactory>( method: K, ...arguments_: Parameters ): Promise> { - return this.ipcSubprocess.sendRequest("publicKeyFactory", method, arguments_) as Promise< - ReturnType - >; + return this.ipcSubprocess.sendRequest>( + "publicKeyFactory", + method, + arguments_, + ); } } diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index f9e622e51..f2b44b8e1 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -77,7 +77,6 @@ export class Subprocess = Record(method: string, ...arguments_: unknown[]): Promise { return new Promise((resolve, reject) => { const id = this.lastId++; From 27569f84557743983050fbc762c4a5f2e8b76cc9 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 13:57:00 +0000 Subject: [PATCH 18/37] Remove TODO --- packages/kernel/source/ipc/subprocess.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index f2b44b8e1..7b1b96ac3 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -81,7 +81,6 @@ export class Subprocess = Record { const id = this.lastId++; this.callbacks.set(id, { reject, resolve } as unknown as Contracts.Kernel.IPC.RequestCallback); - // TODO: we have to make sure args are always serializable and ideally don't copy this.subprocess.postMessage({ args: arguments_, id, method }); }); } From 49f868634a1ba751bbb1f5bee765a8bba803991f Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 14:03:10 +0000 Subject: [PATCH 19/37] Use events for emits --- packages/p2p/source/peer-processor.ts | 2 +- .../transaction-pool-broadcaster/source/peer-communicator.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/p2p/source/peer-processor.ts b/packages/p2p/source/peer-processor.ts index ca239cf11..3f024ed7b 100644 --- a/packages/p2p/source/peer-processor.ts +++ b/packages/p2p/source/peer-processor.ts @@ -55,7 +55,7 @@ export class PeerProcessor implements Contracts.P2P.PeerProcessor { handle: async (): Promise => this.#disconnectInvalidPeers(), }); - this.transactionPoolWorker.registerEventHandler("peer.removed", (ip: string) => { + this.transactionPoolWorker.registerEventHandler(Events.PeerEvent.Removed, (ip: string) => { this.peerDisposer.disposePeer(ip); }); } diff --git a/packages/transaction-pool-broadcaster/source/peer-communicator.ts b/packages/transaction-pool-broadcaster/source/peer-communicator.ts index cf59448a0..0692f9118 100644 --- a/packages/transaction-pool-broadcaster/source/peer-communicator.ts +++ b/packages/transaction-pool-broadcaster/source/peer-communicator.ts @@ -1,6 +1,6 @@ import type { Contracts } from "@mainsail/contracts"; -import { Identifiers } from "@mainsail/constants"; +import { Events, Identifiers } from "@mainsail/constants"; import { inject, injectable, tagged } from "@mainsail/container"; import { Ipc } from "@mainsail/kernel"; import { ensureError, http } from "@mainsail/utils"; @@ -40,7 +40,7 @@ export class PeerCommunicator implements Contracts.TransactionPool.PeerCommunica if (peer.errorCount++ > this.configuration.getRequired("maxSequentialErrors")) { this.repository.forgetPeer(peer.ip); - Ipc.emit("peer.removed", peer.ip); + Ipc.emit(Events.PeerEvent.Removed, peer.ip); } } } From 20dafcc05302d6a46b37472e8b2d8b789dc59501 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 14:09:00 +0000 Subject: [PATCH 20/37] Join message events --- packages/kernel/source/ipc/subprocess.ts | 37 +++++++++--------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index 7b1b96ac3..4beec0c5f 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -27,8 +27,7 @@ export class Subprocess = Record { logger.error(`Worker ${workerName} error: ${error.message}`); this.rejectPending(error); @@ -96,31 +95,23 @@ export class Subprocess = Record | Contracts.Kernel.IPC.Event): void { + if ("id" in message) { + try { + if ("error" in message) { + this.callbacks.get(message.id)?.reject(new Error(message.error)); + } else { + this.callbacks.get(message.id)?.resolve(message.result as unknown as T); + } + } finally { + this.callbacks.delete(message.id); + } - private onSubprocessMessage(message: Contracts.Kernel.IPC.Reply): void { - if (!("id" in message)) { return; } - try { - if ("error" in message) { - this.callbacks.get(message.id)?.reject(new Error(message.error)); - } else { - this.callbacks.get(message.id)?.resolve(message.result as unknown as T); - } - } finally { - this.callbacks.delete(message.id); + if ("event" in message) { + this.eventHandlers.get(message.event)?.(message.data); } } } From 57455f544ddf8de54c8576c9b8228eef08765f14 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 14:15:44 +0000 Subject: [PATCH 21/37] Fix types --- .../contracts/source/contracts/crypto/worker.ts | 2 +- .../contracts/source/contracts/evm/worker.ts | 2 +- .../contracts/source/contracts/kernel/ipc.ts | 2 +- .../source/contracts/transaction-pool/worker.ts | 2 +- packages/kernel/source/ipc/subprocess.ts | 16 +++++++++------- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/packages/contracts/source/contracts/crypto/worker.ts b/packages/contracts/source/contracts/crypto/worker.ts index 8d7549322..3f6dd2f8e 100644 --- a/packages/contracts/source/contracts/crypto/worker.ts +++ b/packages/contracts/source/contracts/crypto/worker.ts @@ -34,7 +34,7 @@ export interface WorkerScriptHandler { export type WorkerFactory = () => Worker; -export type WorkerSubprocess = Subprocess; +export type WorkerSubprocess = Subprocess; export type WorkerSubprocessFactory = () => WorkerSubprocess; diff --git a/packages/contracts/source/contracts/evm/worker.ts b/packages/contracts/source/contracts/evm/worker.ts index 0447db4bd..04a50091e 100644 --- a/packages/contracts/source/contracts/evm/worker.ts +++ b/packages/contracts/source/contracts/evm/worker.ts @@ -14,7 +14,7 @@ export interface WorkerScriptHandler { export type WorkerFactory = () => Worker; -export type WorkerSubprocess = Subprocess; +export type WorkerSubprocess = Subprocess; export type WorkerSubprocessFactory = () => WorkerSubprocess; diff --git a/packages/contracts/source/contracts/kernel/ipc.ts b/packages/contracts/source/contracts/kernel/ipc.ts index d085548d2..06211cb26 100644 --- a/packages/contracts/source/contracts/kernel/ipc.ts +++ b/packages/contracts/source/contracts/kernel/ipc.ts @@ -39,7 +39,7 @@ export interface Handler { handleRequest>(method: K): void; } -export interface Subprocess { +export interface Subprocess { getQueueSize(): number; kill(): Promise; sendRequest(method: string, ...arguments_: unknown[]): Promise; diff --git a/packages/contracts/source/contracts/transaction-pool/worker.ts b/packages/contracts/source/contracts/transaction-pool/worker.ts index 197bf4b48..6fb7fbd99 100644 --- a/packages/contracts/source/contracts/transaction-pool/worker.ts +++ b/packages/contracts/source/contracts/transaction-pool/worker.ts @@ -19,7 +19,7 @@ export interface WorkerScriptHandler { export type WorkerFactory = () => Worker; -export type WorkerSubprocess = Subprocess; +export type WorkerSubprocess = Subprocess; export type WorkerSubprocessFactory = () => WorkerSubprocess; diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index 4beec0c5f..ab929d916 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -4,13 +4,12 @@ import type { Worker } from "worker_threads"; import { Identifiers, LogLevels } from "@mainsail/constants"; import split from "split2"; -export class Subprocess = Record> implements Contracts.Kernel.IPC - .Subprocess { +export class Subprocess implements Contracts.Kernel.IPC.Subprocess { #logLevels = new Set(LogLevels); private lastId = 1; private readonly subprocess: Worker; - private readonly callbacks = new Map>(); + private readonly callbacks = new Map>(); private readonly eventHandlers = new Map>(); public constructor( @@ -77,9 +76,12 @@ export class Subprocess = Record(method: string, ...arguments_: unknown[]): Promise { - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { const id = this.lastId++; - this.callbacks.set(id, { reject, resolve } as unknown as Contracts.Kernel.IPC.RequestCallback); + // The callbacks map is heterogeneous (one entry per in-flight request, each with its + // own result type), so it stores `unknown`; this is the boundary where the request's + // `T` is erased. `sendRequest` keeps the promise typed for the caller. + this.callbacks.set(id, { reject, resolve: resolve as (result: unknown) => void }); this.subprocess.postMessage({ args: arguments_, id, method }); }); } @@ -95,13 +97,13 @@ export class Subprocess = Record | Contracts.Kernel.IPC.Event): void { + private onMessage(message: Contracts.Kernel.IPC.Reply | Contracts.Kernel.IPC.Event): void { if ("id" in message) { try { if ("error" in message) { this.callbacks.get(message.id)?.reject(new Error(message.error)); } else { - this.callbacks.get(message.id)?.resolve(message.result as unknown as T); + this.callbacks.get(message.id)?.resolve(message.result); } } finally { this.callbacks.delete(message.id); From 2bc04bf8e989f16adc615159c163c1e856e9ebba Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 14:39:27 +0000 Subject: [PATCH 22/37] Fix types --- packages/contracts/source/contracts/crypto/worker.ts | 6 +----- packages/contracts/source/contracts/evm/worker.ts | 5 ----- packages/contracts/source/contracts/kernel/ipc.ts | 2 ++ .../contracts/source/contracts/transaction-pool/worker.ts | 6 +----- packages/crypto-worker/source/worker.ts | 4 ++-- packages/evm-api-worker/source/worker.ts | 4 ++-- packages/transaction-pool-worker/source/worker.ts | 4 ++-- 7 files changed, 10 insertions(+), 21 deletions(-) diff --git a/packages/contracts/source/contracts/crypto/worker.ts b/packages/contracts/source/contracts/crypto/worker.ts index 3f6dd2f8e..b2a05983a 100644 --- a/packages/contracts/source/contracts/crypto/worker.ts +++ b/packages/contracts/source/contracts/crypto/worker.ts @@ -1,7 +1,7 @@ import type { BlockFactory } from "../crypto/block.js"; import type { PublicKeyFactory, SignatureBls, SignatureEcdsa } from "../crypto/identities.js"; import type { TransactionFactory } from "../crypto/transactions.js"; -import type { MethodArguments, Requests, Subprocess } from "../kernel/ipc.js"; +import type { MethodArguments, Requests } from "../kernel/ipc.js"; import type { JsonObject } from "../types/index.js"; export interface WorkerFlags extends JsonObject { @@ -34,10 +34,6 @@ export interface WorkerScriptHandler { export type WorkerFactory = () => Worker; -export type WorkerSubprocess = Subprocess; - -export type WorkerSubprocessFactory = () => WorkerSubprocess; - export interface Worker { boot(flags: WorkerFlags): Promise; getQueueSize(): number; diff --git a/packages/contracts/source/contracts/evm/worker.ts b/packages/contracts/source/contracts/evm/worker.ts index 04a50091e..13f9f9b15 100644 --- a/packages/contracts/source/contracts/evm/worker.ts +++ b/packages/contracts/source/contracts/evm/worker.ts @@ -1,6 +1,5 @@ import type { CommitHandler } from "../crypto/index.js"; import type { EventListener } from "../kernel/index.js"; -import type { Subprocess } from "../kernel/ipc.js"; import type { KeyValuePair } from "../types/index.js"; export type WorkerFlags = KeyValuePair; @@ -14,10 +13,6 @@ export interface WorkerScriptHandler { export type WorkerFactory = () => Worker; -export type WorkerSubprocess = Subprocess; - -export type WorkerSubprocessFactory = () => WorkerSubprocess; - export interface Worker extends Omit, CommitHandler, EventListener { getQueueSize(): number; kill(): Promise; diff --git a/packages/contracts/source/contracts/kernel/ipc.ts b/packages/contracts/source/contracts/kernel/ipc.ts index 06211cb26..451b21c71 100644 --- a/packages/contracts/source/contracts/kernel/ipc.ts +++ b/packages/contracts/source/contracts/kernel/ipc.ts @@ -45,3 +45,5 @@ export interface Subprocess { sendRequest(method: string, ...arguments_: unknown[]): Promise; registerEventHandler(event: string, callback: EventCallback): void; } + +export type SubprocessFactory = () => Subprocess; diff --git a/packages/contracts/source/contracts/transaction-pool/worker.ts b/packages/contracts/source/contracts/transaction-pool/worker.ts index 6fb7fbd99..3c76c916a 100644 --- a/packages/contracts/source/contracts/transaction-pool/worker.ts +++ b/packages/contracts/source/contracts/transaction-pool/worker.ts @@ -1,6 +1,6 @@ import type { CommitHandler } from "../crypto/index.js"; import type { EventListener } from "../kernel/index.js"; -import type { EventCallback, Subprocess } from "../kernel/ipc.js"; +import type { EventCallback } from "../kernel/ipc.js"; import type { KeyValuePair } from "../types/index.js"; import type { GetBatchResult, GetBatchOptions } from "./selector.js"; @@ -19,10 +19,6 @@ export interface WorkerScriptHandler { export type WorkerFactory = () => Worker; -export type WorkerSubprocess = Subprocess; - -export type WorkerSubprocessFactory = () => WorkerSubprocess; - export interface Worker extends Omit, CommitHandler, EventListener { getQueueSize(): number; kill(): Promise; diff --git a/packages/crypto-worker/source/worker.ts b/packages/crypto-worker/source/worker.ts index 54ac69c29..5fa2a6258 100644 --- a/packages/crypto-worker/source/worker.ts +++ b/packages/crypto-worker/source/worker.ts @@ -6,9 +6,9 @@ import { inject, injectable, postConstruct } from "@mainsail/container"; @injectable() export class Worker implements Contracts.Crypto.Worker { @inject(Identifiers.CryptoWorker.WorkerSubprocess.Factory) - private readonly createWorkerSubprocess!: Contracts.Crypto.WorkerSubprocessFactory; + private readonly createWorkerSubprocess!: Contracts.Kernel.IPC.SubprocessFactory; - private ipcSubprocess!: Contracts.Crypto.WorkerSubprocess; + private ipcSubprocess!: Contracts.Kernel.IPC.Subprocess; #bootPromise?: Promise; diff --git a/packages/evm-api-worker/source/worker.ts b/packages/evm-api-worker/source/worker.ts index 941b731f7..4647e82d9 100644 --- a/packages/evm-api-worker/source/worker.ts +++ b/packages/evm-api-worker/source/worker.ts @@ -6,7 +6,7 @@ import { inject, injectable, postConstruct } from "@mainsail/container"; @injectable() export class Worker implements Contracts.Evm.Worker { @inject(Identifiers.Evm.WorkerSubprocess.Factory) - private readonly createWorkerSubprocess!: Contracts.Crypto.WorkerSubprocessFactory; + private readonly createWorkerSubprocess!: Contracts.Kernel.IPC.SubprocessFactory; @inject(Identifiers.Services.EventDispatcher.Service) private readonly eventDispatcher!: Contracts.Kernel.EventDispatcher; @@ -14,7 +14,7 @@ export class Worker implements Contracts.Evm.Worker { @inject(Identifiers.P2P.Peer.Repository) private readonly p2pRepository!: Contracts.P2P.PeerRepository; - private ipcSubprocess!: Contracts.Evm.WorkerSubprocess; + private ipcSubprocess!: Contracts.Kernel.IPC.Subprocess; #bootPromise?: Promise; diff --git a/packages/transaction-pool-worker/source/worker.ts b/packages/transaction-pool-worker/source/worker.ts index f9c03313d..b383b7696 100644 --- a/packages/transaction-pool-worker/source/worker.ts +++ b/packages/transaction-pool-worker/source/worker.ts @@ -7,7 +7,7 @@ import dayjs from "dayjs"; @injectable() export class Worker implements Contracts.TransactionPool.Worker { @inject(Identifiers.TransactionPool.WorkerSubprocess.Factory) - private readonly createWorkerSubprocess!: Contracts.Crypto.WorkerSubprocessFactory; + private readonly createWorkerSubprocess!: Contracts.Kernel.IPC.SubprocessFactory; @inject(Identifiers.Cryptography.Configuration) private readonly configuration!: Contracts.Crypto.Configuration; @@ -15,7 +15,7 @@ export class Worker implements Contracts.TransactionPool.Worker { @inject(Identifiers.Services.EventDispatcher.Service) private readonly eventDispatcher!: Contracts.Kernel.EventDispatcher; - private ipcSubprocess!: Contracts.TransactionPool.WorkerSubprocess; + private ipcSubprocess!: Contracts.Kernel.IPC.Subprocess; #bootPromise?: Promise; From 2e2fd68e9ccfba6d7c8e5acb1f6bdc9967a2e180 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 14:48:32 +0000 Subject: [PATCH 23/37] Handle stopped worker --- packages/kernel/source/ipc/subprocess.ts | 37 +++++++++++++++++------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index ab929d916..f82ec7332 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -9,8 +9,10 @@ export class Subprocess implements Contracts.Kernel.IPC.Subprocess { private lastId = 1; private readonly subprocess: Worker; + private readonly workerName: string; private readonly callbacks = new Map>(); private readonly eventHandlers = new Map>(); + #stopped?: Error; public constructor( app: Contracts.Kernel.Application, @@ -20,26 +22,27 @@ export class Subprocess implements Contracts.Kernel.IPC.Subprocess { ) { this.subprocess = subprocess; + // Capture the thread id up front: Node resets it to -1 once the worker exits. + this.workerName = `${name}-${subprocess.threadId}`; + const logger = app.get(Identifiers.Services.Log.Service); - // Capture the thread id up front: Node resets it to -1 once the worker exits. - const workerName = `${name}-${this.subprocess.threadId}`; - logger.debug(`Spawning worker ${workerName}`); + logger.debug(`Spawning worker ${this.workerName}`); this.subprocess.on("message", this.onMessage.bind(this)); this.subprocess.on("error", (error: Error) => { - logger.error(`Worker ${workerName} error: ${error.message}`); - this.rejectPending(error); + logger.error(`Worker ${this.workerName} error: ${error.message}`); + this.#stop(error); }); this.subprocess.on("exit", (code) => { - logger.debug(`Worker ${workerName} stopped with exit code ${code}`); - this.rejectPending(new Error(`Worker ${workerName} stopped with exit code ${code}`)); + logger.debug(`Worker ${this.workerName} stopped with exit code ${code}`); + this.#stop(new Error(`Worker ${this.workerName} stopped with exit code ${code}`)); }); - // A reply that fails to deserialize cannot be matched back to its request id, - // so the pending callback can never be settled. Reject everything in flight to - // avoid a silent hang rather than leaking the stuck request. + // A reply that fails to deserialize cannot be matched back to its request id, so the + // pending callback can never be settled. Reject everything in flight to avoid a silent + // hang. The worker stays alive after this, so it is not marked stopped. this.subprocess.on("messageerror", (error: Error) => { - logger.error(`Worker ${workerName} message could not be deserialized: ${error.message}`); + logger.error(`Worker ${this.workerName} message could not be deserialized: ${error.message}`); this.rejectPending(error); }); @@ -68,6 +71,7 @@ export class Subprocess implements Contracts.Kernel.IPC.Subprocess { } public async kill(): Promise { + this.#stopped ??= new Error(`Worker ${this.workerName} was killed`); return this.subprocess.terminate(); } @@ -76,6 +80,10 @@ export class Subprocess implements Contracts.Kernel.IPC.Subprocess { } public sendRequest(method: string, ...arguments_: unknown[]): Promise { + if (this.#stopped) { + return Promise.reject(this.#stopped); + } + return new Promise((resolve, reject) => { const id = this.lastId++; // The callbacks map is heterogeneous (one entry per in-flight request, each with its @@ -90,6 +98,13 @@ export class Subprocess implements Contracts.Kernel.IPC.Subprocess { this.eventHandlers.set(event, callback as Contracts.Kernel.IPC.EventCallback); } + // Mark the worker permanently gone, then reject anything in flight. The first reason wins + // (a crash `error` is more informative than the `exit` that follows it). + #stop(error: Error): void { + this.#stopped ??= error; + this.rejectPending(error); + } + private rejectPending(error: Error): void { for (const { reject } of this.callbacks.values()) { reject(error); From de2814fa188364809a5225e2bd7468cd75bde493 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 14:53:46 +0000 Subject: [PATCH 24/37] Remove stopped workers from pool --- .../contracts/source/contracts/crypto/worker.ts | 1 + packages/contracts/source/contracts/kernel/ipc.ts | 1 + packages/crypto-worker/source/worker-pool.ts | 15 ++++++++++----- packages/crypto-worker/source/worker.ts | 4 ++++ packages/kernel/source/ipc/subprocess.ts | 4 ++++ 5 files changed, 20 insertions(+), 5 deletions(-) diff --git a/packages/contracts/source/contracts/crypto/worker.ts b/packages/contracts/source/contracts/crypto/worker.ts index b2a05983a..385d40e90 100644 --- a/packages/contracts/source/contracts/crypto/worker.ts +++ b/packages/contracts/source/contracts/crypto/worker.ts @@ -37,6 +37,7 @@ export type WorkerFactory = () => Worker; export interface Worker { boot(flags: WorkerFlags): Promise; getQueueSize(): number; + isStopped(): boolean; kill(): Promise; consensusSignature>( method: K, diff --git a/packages/contracts/source/contracts/kernel/ipc.ts b/packages/contracts/source/contracts/kernel/ipc.ts index 451b21c71..0a8324a58 100644 --- a/packages/contracts/source/contracts/kernel/ipc.ts +++ b/packages/contracts/source/contracts/kernel/ipc.ts @@ -41,6 +41,7 @@ export interface Handler { export interface Subprocess { getQueueSize(): number; + isStopped(): boolean; kill(): Promise; sendRequest(method: string, ...arguments_: unknown[]): Promise; registerEventHandler(event: string, callback: EventCallback): void; diff --git a/packages/crypto-worker/source/worker-pool.ts b/packages/crypto-worker/source/worker-pool.ts index 8c0793a2f..6a7e1cae2 100644 --- a/packages/crypto-worker/source/worker-pool.ts +++ b/packages/crypto-worker/source/worker-pool.ts @@ -53,18 +53,23 @@ export class WorkerPool implements Contracts.Crypto.WorkerPool { } public getWorker(): Contracts.Crypto.Worker { - if (this.#workers.length === 0) { + const workers = this.#workers.filter((worker) => !worker.isStopped()); + + if (workers.length === 0) { throw new Error("No crypto workers available"); } + // Eviction may have shrunk the pool past the cursor; bring it back in range. + this.#currentWorkerIndex %= workers.length; + // Pick the worker with the fewest in-flight requests. Scanning starts at a // rotating cursor and only replaces the pick on a strictly smaller queue, so // ties (e.g. all workers idle) fall back to round-robin and spread evenly. - let selected = this.#workers[this.#currentWorkerIndex]; + let selected = workers[this.#currentWorkerIndex]; let smallestQueueSize = selected.getQueueSize(); - for (let offset = 1; offset < this.#workers.length; offset++) { - const worker = this.#workers[(this.#currentWorkerIndex + offset) % this.#workers.length]; + for (let offset = 1; offset < workers.length; offset++) { + const worker = workers[(this.#currentWorkerIndex + offset) % workers.length]; const queueSize = worker.getQueueSize(); if (queueSize < smallestQueueSize) { @@ -73,7 +78,7 @@ export class WorkerPool implements Contracts.Crypto.WorkerPool { } } - this.#currentWorkerIndex = (this.#currentWorkerIndex + 1) % this.#workers.length; + this.#currentWorkerIndex = (this.#currentWorkerIndex + 1) % workers.length; return selected; } diff --git a/packages/crypto-worker/source/worker.ts b/packages/crypto-worker/source/worker.ts index 5fa2a6258..198ba2f16 100644 --- a/packages/crypto-worker/source/worker.ts +++ b/packages/crypto-worker/source/worker.ts @@ -33,6 +33,10 @@ export class Worker implements Contracts.Crypto.Worker { return this.ipcSubprocess.getQueueSize(); } + public isStopped(): boolean { + return this.ipcSubprocess.isStopped(); + } + public async consensusSignature>( method: K, ...arguments_: Parameters diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index f82ec7332..7f735f7bb 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -79,6 +79,10 @@ export class Subprocess implements Contracts.Kernel.IPC.Subprocess { return this.callbacks.size; } + public isStopped(): boolean { + return this.#stopped !== undefined; + } + public sendRequest(method: string, ...arguments_: unknown[]): Promise { if (this.#stopped) { return Promise.reject(this.#stopped); From 2647af4c138e8afe5837a2a42060f8f6913d3cc9 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 15:01:02 +0000 Subject: [PATCH 25/37] Fix emit --- packages/kernel/source/ipc/emit.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/kernel/source/ipc/emit.ts b/packages/kernel/source/ipc/emit.ts index b5ebe804b..021693046 100644 --- a/packages/kernel/source/ipc/emit.ts +++ b/packages/kernel/source/ipc/emit.ts @@ -1,5 +1,5 @@ import { parentPort } from "worker_threads"; -export const emit = (event: string, data: T): void => { +export const emit = (event: string, data: string): void => { parentPort?.postMessage({ data, event }); }; From d469fe3121a87ba52efd9008590450c954a4565a Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 17:19:26 +0000 Subject: [PATCH 26/37] Update emit type --- packages/contracts/source/contracts/kernel/ipc.ts | 2 +- packages/kernel/source/ipc/emit.ts | 2 +- packages/kernel/source/ipc/subprocess.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/contracts/source/contracts/kernel/ipc.ts b/packages/contracts/source/contracts/kernel/ipc.ts index 0a8324a58..6b3dc7cf5 100644 --- a/packages/contracts/source/contracts/kernel/ipc.ts +++ b/packages/contracts/source/contracts/kernel/ipc.ts @@ -22,7 +22,7 @@ export type ErrorReply = { export type Event = { event: string; - data: string; + data: unknown; }; export type Reply = SuccessReply | ErrorReply; diff --git a/packages/kernel/source/ipc/emit.ts b/packages/kernel/source/ipc/emit.ts index 021693046..e723a0a32 100644 --- a/packages/kernel/source/ipc/emit.ts +++ b/packages/kernel/source/ipc/emit.ts @@ -1,5 +1,5 @@ import { parentPort } from "worker_threads"; -export const emit = (event: string, data: string): void => { +export const emit = (event: string, data: unknown): void => { parentPort?.postMessage({ data, event }); }; diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index 7f735f7bb..c06d09cbd 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -11,7 +11,7 @@ export class Subprocess implements Contracts.Kernel.IPC.Subprocess { private readonly subprocess: Worker; private readonly workerName: string; private readonly callbacks = new Map>(); - private readonly eventHandlers = new Map>(); + private readonly eventHandlers = new Map>(); #stopped?: Error; public constructor( From ac01356d29f8bda098fe7e47d3f182c3cc006d29 Mon Sep 17 00:00:00 2001 From: sebastijankuzner <58827427+sebastijankuzner@users.noreply.github.com> Date: Wed, 27 May 2026 17:29:05 +0000 Subject: [PATCH 27/37] style: resolve style guide violations [ci-lint-fix] --- packages/kernel/source/ipc/subprocess.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index c06d09cbd..3ac5e2ea5 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -46,7 +46,6 @@ export class Subprocess implements Contracts.Kernel.IPC.Subprocess { this.rejectPending(error); }); - this.subprocess.stdout.pipe(split()).on("data", (line) => { // [LEVEL] MESSAGE const match = line.match(/^\[(\w+)]\s+(.*)$/); From e06afd4e098fa486a33a46e9e9fa9ed30138e024 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Wed, 27 May 2026 17:30:54 +0000 Subject: [PATCH 28/37] Deps --- packages/crypto-worker/package.json | 1 - pnpm-lock.yaml | 3 --- 2 files changed, 4 deletions(-) diff --git a/packages/crypto-worker/package.json b/packages/crypto-worker/package.json index 739755432..3449ffc56 100644 --- a/packages/crypto-worker/package.json +++ b/packages/crypto-worker/package.json @@ -23,7 +23,6 @@ "@mainsail/constants": "workspace:*", "@mainsail/container": "workspace:*", "@mainsail/kernel": "workspace:*", - "@mainsail/utils": "workspace:*", "joi": "18.2.1" }, "devDependencies": { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 485d722e8..07fc34c97 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1925,9 +1925,6 @@ importers: '@mainsail/kernel': specifier: workspace:* version: link:../kernel - '@mainsail/utils': - specifier: workspace:* - version: link:../utils joi: specifier: 18.2.1 version: 18.2.1 From d2b70d58b5be52ae5e881aeb4a5b8e92f057f3da Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Thu, 28 May 2026 12:57:05 +0000 Subject: [PATCH 29/37] Initial dispose --- .../contracts/source/contracts/crypto/worker.ts | 4 +++- packages/contracts/source/contracts/evm/worker.ts | 1 + .../source/contracts/kernel/application.ts | 2 ++ .../source/contracts/transaction-pool/worker.ts | 1 + packages/crypto-worker/source/service-provider.ts | 2 +- packages/crypto-worker/source/worker-handler.ts | 15 +++++++++------ packages/crypto-worker/source/worker-pool.ts | 4 ++-- packages/crypto-worker/source/worker.ts | 9 +++++++++ .../evm-api-worker/source/service-provider.ts | 2 +- packages/evm-api-worker/source/worker-handler.ts | 13 +++++++------ packages/evm-api-worker/source/worker.ts | 9 +++++++++ packages/kernel/source/application.ts | 6 +++++- .../source/service-provider.ts | 2 +- .../source/worker-handler.ts | 12 +++++++----- packages/transaction-pool-worker/source/worker.ts | 9 +++++++++ 15 files changed, 67 insertions(+), 24 deletions(-) diff --git a/packages/contracts/source/contracts/crypto/worker.ts b/packages/contracts/source/contracts/crypto/worker.ts index 385d40e90..1eead6044 100644 --- a/packages/contracts/source/contracts/crypto/worker.ts +++ b/packages/contracts/source/contracts/crypto/worker.ts @@ -10,6 +10,7 @@ export interface WorkerFlags extends JsonObject { export interface WorkerScriptHandler { boot(flags: WorkerFlags): Promise; + dispose(): Promise; consensusSignature>( method: K, arguments_: MethodArguments, @@ -36,6 +37,7 @@ export type WorkerFactory = () => Worker; export interface Worker { boot(flags: WorkerFlags): Promise; + dispose(): Promise; getQueueSize(): number; isStopped(): boolean; kill(): Promise; @@ -63,6 +65,6 @@ export interface Worker { export interface WorkerPool { boot(): Promise; - shutdown(): Promise; + dispose(): Promise; getWorker(): Worker; } diff --git a/packages/contracts/source/contracts/evm/worker.ts b/packages/contracts/source/contracts/evm/worker.ts index 13f9f9b15..068040580 100644 --- a/packages/contracts/source/contracts/evm/worker.ts +++ b/packages/contracts/source/contracts/evm/worker.ts @@ -6,6 +6,7 @@ export type WorkerFlags = KeyValuePair; export interface WorkerScriptHandler { boot(flags: WorkerFlags): Promise; + dispose(): Promise; setPeerCount(peerCount: number): Promise; commit(blockNumber: number): Promise; start(blockNumber: number): Promise; diff --git a/packages/contracts/source/contracts/kernel/application.ts b/packages/contracts/source/contracts/kernel/application.ts index 45ab75bd0..90f74c70f 100644 --- a/packages/contracts/source/contracts/kernel/application.ts +++ b/packages/contracts/source/contracts/kernel/application.ts @@ -6,6 +6,8 @@ export interface Application { boot(): Promise; + dispose(): Promise; + reboot(): Promise; config(key: string, value?: T, defaultValue?: T): T | undefined; diff --git a/packages/contracts/source/contracts/transaction-pool/worker.ts b/packages/contracts/source/contracts/transaction-pool/worker.ts index 3c76c916a..14a975e46 100644 --- a/packages/contracts/source/contracts/transaction-pool/worker.ts +++ b/packages/contracts/source/contracts/transaction-pool/worker.ts @@ -8,6 +8,7 @@ export type WorkerFlags = KeyValuePair; export interface WorkerScriptHandler { boot(flags: WorkerFlags): Promise; + dispose(): Promise; getTransactions(options: GetBatchOptions): Promise; removeTransaction(address: string, id: string): Promise; commit(height: number, sendersAddresses: string[], consumedGas: number, isSyncing: boolean): Promise; diff --git a/packages/crypto-worker/source/service-provider.ts b/packages/crypto-worker/source/service-provider.ts index 91a13b68d..a2499ea30 100644 --- a/packages/crypto-worker/source/service-provider.ts +++ b/packages/crypto-worker/source/service-provider.ts @@ -38,7 +38,7 @@ export class ServiceProvider extends Providers.ServiceProvider { } public async dispose(): Promise { - await this.app.get(Identifiers.CryptoWorker.WorkerPool).shutdown(); + await this.app.get(Identifiers.CryptoWorker.WorkerPool).dispose(); } public async required(): Promise { diff --git a/packages/crypto-worker/source/worker-handler.ts b/packages/crypto-worker/source/worker-handler.ts index 46d5936a4..65d22d7d3 100644 --- a/packages/crypto-worker/source/worker-handler.ts +++ b/packages/crypto-worker/source/worker-handler.ts @@ -107,21 +107,24 @@ function isBufferJson(value: unknown): value is BufferJson { } export class WorkerScriptHandler implements Contracts.Crypto.WorkerScriptHandler { + #app = new Application(); #impl!: WorkerImpl; public async boot(flags: Contracts.Crypto.WorkerFlags): Promise { - const app: Contracts.Kernel.Application = new Application(); - - await app.bootstrap({ + await this.#app.bootstrap({ flags, }); if (!flags.workerLoggingEnabled) { - app.rebind(Identifiers.Services.Log.Service).to(Services.Log.NullLogger); + this.#app.rebind(Identifiers.Services.Log.Service).to(Services.Log.NullLogger); } - await app.boot(); - this.#impl = app.resolve(WorkerImpl); + await this.#app.boot(); + this.#impl = this.#app.resolve(WorkerImpl); + } + + public async dispose(): Promise { + await this.#app.dispose(); } public async consensusSignature>( diff --git a/packages/crypto-worker/source/worker-pool.ts b/packages/crypto-worker/source/worker-pool.ts index 6a7e1cae2..f2aa2242e 100644 --- a/packages/crypto-worker/source/worker-pool.ts +++ b/packages/crypto-worker/source/worker-pool.ts @@ -46,10 +46,10 @@ export class WorkerPool implements Contracts.Crypto.WorkerPool { this.#workers = workers; } - public async shutdown(): Promise { + public async dispose(): Promise { const workers = this.#workers; this.#workers = []; - await Promise.all(workers.map(async (worker) => await worker.kill())); + await Promise.all(workers.map(async (worker) => await worker.dispose())); } public getWorker(): Contracts.Crypto.Worker { diff --git a/packages/crypto-worker/source/worker.ts b/packages/crypto-worker/source/worker.ts index 198ba2f16..bccab15d3 100644 --- a/packages/crypto-worker/source/worker.ts +++ b/packages/crypto-worker/source/worker.ts @@ -11,6 +11,7 @@ export class Worker implements Contracts.Crypto.Worker { private ipcSubprocess!: Contracts.Kernel.IPC.Subprocess; #bootPromise?: Promise; + #disposePromise?: Promise; @postConstruct() public initialize(): void { @@ -25,6 +26,14 @@ export class Worker implements Contracts.Crypto.Worker { await this.#bootPromise; } + public async dispose(): Promise { + if (!this.#disposePromise) { + this.#disposePromise = this.ipcSubprocess.sendRequest("dispose"); + } + + await this.#disposePromise; + } + public async kill(): Promise { return this.ipcSubprocess.kill(); } diff --git a/packages/evm-api-worker/source/service-provider.ts b/packages/evm-api-worker/source/service-provider.ts index ee292cc28..5a8d1cc20 100644 --- a/packages/evm-api-worker/source/service-provider.ts +++ b/packages/evm-api-worker/source/service-provider.ts @@ -34,7 +34,7 @@ export class ServiceProvider extends Providers.ServiceProvider { } public async dispose(): Promise { - await this.app.get(Identifiers.Evm.Worker).kill(); + await this.app.get(Identifiers.Evm.Worker).dispose(); } public async required(): Promise { diff --git a/packages/evm-api-worker/source/worker-handler.ts b/packages/evm-api-worker/source/worker-handler.ts index 4221a5bfe..f79b78a93 100644 --- a/packages/evm-api-worker/source/worker-handler.ts +++ b/packages/evm-api-worker/source/worker-handler.ts @@ -5,17 +5,18 @@ import { Application } from "@mainsail/kernel"; import { CommitHandler, SetPeerCountHandler, StartHandler } from "./handlers/index.js"; export class WorkerScriptHandler implements Contracts.Evm.WorkerScriptHandler { - #app!: Contracts.Kernel.Application; + #app = new Application(); public async boot(flags: Contracts.Crypto.WorkerFlags): Promise { - const app: Contracts.Kernel.Application = new Application(); - - await app.bootstrap({ + await this.#app.bootstrap({ flags, }); - await app.boot(); - this.#app = app; + await this.#app.boot(); + } + + public async dispose(): Promise { + await this.#app.dispose(); } public async start(height: number): Promise { diff --git a/packages/evm-api-worker/source/worker.ts b/packages/evm-api-worker/source/worker.ts index 4647e82d9..30b19f76e 100644 --- a/packages/evm-api-worker/source/worker.ts +++ b/packages/evm-api-worker/source/worker.ts @@ -17,6 +17,7 @@ export class Worker implements Contracts.Evm.Worker { private ipcSubprocess!: Contracts.Kernel.IPC.Subprocess; #bootPromise?: Promise; + #disposePromise?: Promise; @postConstruct() public initialize(): void { @@ -42,6 +43,14 @@ export class Worker implements Contracts.Evm.Worker { await this.#bootPromise; } + public async dispose(): Promise { + if (!this.#disposePromise) { + this.#disposePromise = this.ipcSubprocess.sendRequest("dispose"); + } + + await this.#disposePromise; + } + public async kill(): Promise { return this.ipcSubprocess.kill(); } diff --git a/packages/kernel/source/application.ts b/packages/kernel/source/application.ts index 883993228..079598444 100644 --- a/packages/kernel/source/application.ts +++ b/packages/kernel/source/application.ts @@ -53,8 +53,12 @@ export class Application extends BaseApplication implements Contracts.Kernel.App } } - public async reboot(): Promise { + public async dispose(): Promise { await this.#disposeServiceProviders(); + } + + public async reboot(): Promise { + await this.dispose(); await this.boot(); } diff --git a/packages/transaction-pool-worker/source/service-provider.ts b/packages/transaction-pool-worker/source/service-provider.ts index bf7e1ad2c..d56031bd9 100644 --- a/packages/transaction-pool-worker/source/service-provider.ts +++ b/packages/transaction-pool-worker/source/service-provider.ts @@ -36,7 +36,7 @@ export class ServiceProvider extends Providers.ServiceProvider { } public async dispose(): Promise { - await this.app.get(Identifiers.TransactionPool.Worker).kill(); + await this.app.get(Identifiers.TransactionPool.Worker).dispose(); } public async required(): Promise { diff --git a/packages/transaction-pool-worker/source/worker-handler.ts b/packages/transaction-pool-worker/source/worker-handler.ts index 8ddaeb987..8e44f79eb 100644 --- a/packages/transaction-pool-worker/source/worker-handler.ts +++ b/packages/transaction-pool-worker/source/worker-handler.ts @@ -13,17 +13,19 @@ import { } from "./handlers/index.js"; export class WorkerScriptHandler implements Contracts.TransactionPool.WorkerScriptHandler { - #app!: Contracts.Kernel.Application; + #app = new Application(); public async boot(flags: Contracts.Crypto.WorkerFlags): Promise { - const app: Contracts.Kernel.Application = new Application(); - await app.bootstrap({ + await this.#app.bootstrap({ flags, }); - await app.boot(); - this.#app = app; + await this.#app.boot(); + } + + public async dispose(): Promise { + await this.#app.dispose(); } public async start(height: number): Promise { diff --git a/packages/transaction-pool-worker/source/worker.ts b/packages/transaction-pool-worker/source/worker.ts index b383b7696..7012cf292 100644 --- a/packages/transaction-pool-worker/source/worker.ts +++ b/packages/transaction-pool-worker/source/worker.ts @@ -18,6 +18,7 @@ export class Worker implements Contracts.TransactionPool.Worker { private ipcSubprocess!: Contracts.Kernel.IPC.Subprocess; #bootPromise?: Promise; + #disposePromise?: Promise; @postConstruct() public initialize(): void { @@ -44,6 +45,14 @@ export class Worker implements Contracts.TransactionPool.Worker { await this.#bootPromise; } + public async dispose(): Promise { + if (!this.#disposePromise) { + this.#disposePromise = this.ipcSubprocess.sendRequest("dispose"); + } + + await this.#disposePromise; + } + public async kill(): Promise { return this.ipcSubprocess.kill(); } From 1990008648b6249cc1bf1c9240e6d0371b4731eb Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Thu, 28 May 2026 13:41:17 +0000 Subject: [PATCH 30/37] Log when disposed --- packages/kernel/source/application.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/kernel/source/application.ts b/packages/kernel/source/application.ts index 079598444..6b5309f89 100644 --- a/packages/kernel/source/application.ts +++ b/packages/kernel/source/application.ts @@ -55,6 +55,10 @@ export class Application extends BaseApplication implements Contracts.Kernel.App public async dispose(): Promise { await this.#disposeServiceProviders(); + + this.get(Identifiers.Services.Log.Service).debug( + `Application ${this.thread()} is disposed.`, + ); } public async reboot(): Promise { From ad53f0d29c7af87c932dbe34ef6da3afa82fc682 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Thu, 28 May 2026 14:05:28 +0000 Subject: [PATCH 31/37] Dispose workers --- packages/contracts/source/contracts/kernel/ipc.ts | 1 + packages/crypto-worker/source/worker.ts | 14 +++++++++++++- packages/evm-api-worker/source/worker.ts | 14 +++++++++++++- packages/kernel/source/ipc/subprocess.ts | 7 +++++++ packages/transaction-pool-worker/source/worker.ts | 14 +++++++++++++- 5 files changed, 47 insertions(+), 3 deletions(-) diff --git a/packages/contracts/source/contracts/kernel/ipc.ts b/packages/contracts/source/contracts/kernel/ipc.ts index 6b3dc7cf5..3856647e4 100644 --- a/packages/contracts/source/contracts/kernel/ipc.ts +++ b/packages/contracts/source/contracts/kernel/ipc.ts @@ -40,6 +40,7 @@ export interface Handler { } export interface Subprocess { + dispose(): Promise; getQueueSize(): number; isStopped(): boolean; kill(): Promise; diff --git a/packages/crypto-worker/source/worker.ts b/packages/crypto-worker/source/worker.ts index bccab15d3..0f15aa9e3 100644 --- a/packages/crypto-worker/source/worker.ts +++ b/packages/crypto-worker/source/worker.ts @@ -28,12 +28,24 @@ export class Worker implements Contracts.Crypto.Worker { public async dispose(): Promise { if (!this.#disposePromise) { - this.#disposePromise = this.ipcSubprocess.sendRequest("dispose"); + this.#disposePromise = this.#doDispose(); } await this.#disposePromise; } + async #doDispose(): Promise { + try { + await this.ipcSubprocess.sendRequest("dispose"); + } catch { + // Worker may have died mid-dispose; we still need to terminate the thread. + } + + // Graceful inner shutdown is done; now terminate the worker thread so it doesn't hang + // around with an open parentPort listener. After this, isStopped() === true. + await this.ipcSubprocess.dispose(); + } + public async kill(): Promise { return this.ipcSubprocess.kill(); } diff --git a/packages/evm-api-worker/source/worker.ts b/packages/evm-api-worker/source/worker.ts index 30b19f76e..a98d876bd 100644 --- a/packages/evm-api-worker/source/worker.ts +++ b/packages/evm-api-worker/source/worker.ts @@ -45,12 +45,24 @@ export class Worker implements Contracts.Evm.Worker { public async dispose(): Promise { if (!this.#disposePromise) { - this.#disposePromise = this.ipcSubprocess.sendRequest("dispose"); + this.#disposePromise = this.#doDispose(); } await this.#disposePromise; } + async #doDispose(): Promise { + try { + await this.ipcSubprocess.sendRequest("dispose"); + } catch { + // Worker may have died mid-dispose; we still need to terminate the thread. + } + + // Graceful inner shutdown is done; now terminate the worker thread so it doesn't hang + // around with an open parentPort listener. After this, isStopped() === true. + await this.ipcSubprocess.dispose(); + } + public async kill(): Promise { return this.ipcSubprocess.kill(); } diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index 3ac5e2ea5..b12a523d5 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -74,6 +74,13 @@ export class Subprocess implements Contracts.Kernel.IPC.Subprocess { return this.subprocess.terminate(); } + // Graceful counterpart to kill(): same termination, but signals "normal shutdown" rather + // than a critical-error abort. Any pending request still in flight rejects with this reason. + public async dispose(): Promise { + this.#stopped ??= new Error(`Worker ${this.workerName} is being disposed`); + return this.subprocess.terminate(); + } + public getQueueSize(): number { return this.callbacks.size; } diff --git a/packages/transaction-pool-worker/source/worker.ts b/packages/transaction-pool-worker/source/worker.ts index 7012cf292..eb7d72697 100644 --- a/packages/transaction-pool-worker/source/worker.ts +++ b/packages/transaction-pool-worker/source/worker.ts @@ -47,12 +47,24 @@ export class Worker implements Contracts.TransactionPool.Worker { public async dispose(): Promise { if (!this.#disposePromise) { - this.#disposePromise = this.ipcSubprocess.sendRequest("dispose"); + this.#disposePromise = this.#doDispose(); } await this.#disposePromise; } + async #doDispose(): Promise { + try { + await this.ipcSubprocess.sendRequest("dispose"); + } catch { + // Worker may have died mid-dispose; we still need to terminate the thread. + } + + // Graceful inner shutdown is done; now terminate the worker thread so it doesn't hang + // around with an open parentPort listener. After this, isStopped() === true. + await this.ipcSubprocess.dispose(); + } + public async kill(): Promise { return this.ipcSubprocess.kill(); } From 641ab11e7ea7ceb40727d392aa156d63388e04f7 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Thu, 28 May 2026 14:50:14 +0000 Subject: [PATCH 32/37] Use terminate --- packages/crypto-worker/source/worker-handler.ts | 2 +- packages/evm-api-worker/source/worker-handler.ts | 2 +- packages/kernel/source/application.ts | 2 +- packages/transaction-pool-worker/source/worker-handler.ts | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/crypto-worker/source/worker-handler.ts b/packages/crypto-worker/source/worker-handler.ts index 65d22d7d3..3ff65737a 100644 --- a/packages/crypto-worker/source/worker-handler.ts +++ b/packages/crypto-worker/source/worker-handler.ts @@ -124,7 +124,7 @@ export class WorkerScriptHandler implements Contracts.Crypto.WorkerScriptHandler } public async dispose(): Promise { - await this.#app.dispose(); + await this.#app.terminate(); } public async consensusSignature>( diff --git a/packages/evm-api-worker/source/worker-handler.ts b/packages/evm-api-worker/source/worker-handler.ts index f79b78a93..a0e0668d8 100644 --- a/packages/evm-api-worker/source/worker-handler.ts +++ b/packages/evm-api-worker/source/worker-handler.ts @@ -16,7 +16,7 @@ export class WorkerScriptHandler implements Contracts.Evm.WorkerScriptHandler { } public async dispose(): Promise { - await this.#app.dispose(); + await this.#app.terminate(); } public async start(height: number): Promise { diff --git a/packages/kernel/source/application.ts b/packages/kernel/source/application.ts index 6b5309f89..0ea3fc12e 100644 --- a/packages/kernel/source/application.ts +++ b/packages/kernel/source/application.ts @@ -195,7 +195,7 @@ export class Application extends BaseApplication implements Contracts.Kernel.App "Application is gracefully terminated.", ); - exit(1); + exit(0); } async #bootstrapWith(type: string): Promise { diff --git a/packages/transaction-pool-worker/source/worker-handler.ts b/packages/transaction-pool-worker/source/worker-handler.ts index 8e44f79eb..7e468dde0 100644 --- a/packages/transaction-pool-worker/source/worker-handler.ts +++ b/packages/transaction-pool-worker/source/worker-handler.ts @@ -25,7 +25,7 @@ export class WorkerScriptHandler implements Contracts.TransactionPool.WorkerScri } public async dispose(): Promise { - await this.#app.dispose(); + await this.#app.terminate(); } public async start(height: number): Promise { From 7598cbfc0737e9f45f17d2721927a10a2cf2f77f Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Thu, 28 May 2026 14:54:12 +0000 Subject: [PATCH 33/37] Fix logs --- packages/kernel/source/application.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/kernel/source/application.ts b/packages/kernel/source/application.ts index 0ea3fc12e..06ba36401 100644 --- a/packages/kernel/source/application.ts +++ b/packages/kernel/source/application.ts @@ -159,7 +159,7 @@ export class Application extends BaseApplication implements Contracts.Kernel.App if (reason) { this.get(Identifiers.Services.Log.Service)[error ? "error" : "warn"]( - `Application shutdown: ${reason}`, + `${this.isWorker() ? "Worker " + this.thread() : "Application"} shutdown: ${reason}`, ); } @@ -178,7 +178,7 @@ export class Application extends BaseApplication implements Contracts.Kernel.App const timeout = setTimeout(() => { this.get(Identifiers.Services.Log.Service).warn( - "Force application termination. Service providers did not dispose in time.", + `Force ${this.isWorker() ? "worker " + this.thread() : "application"} termination. Service providers did not dispose in time.`, ); exit(1); }, 3000); @@ -192,7 +192,7 @@ export class Application extends BaseApplication implements Contracts.Kernel.App this.#logOpenHandlers(); this.get(Identifiers.Services.Log.Service).notice( - "Application is gracefully terminated.", + `${this.isWorker() ? "Worker " + this.thread() : "Application"} is gracefully terminated.`, ); exit(0); @@ -235,7 +235,6 @@ export class Application extends BaseApplication implements Contracts.Kernel.App #logOpenHandlers(): void { try { - // @ts-ignore const resourcesInfo: string[] = process.getActiveResourcesInfo(); // Method is experimental const timeouts = resourcesInfo.filter((resource) => resource.includes("Timeout")); From 80daa9b28799426a1ab95053aa2652e418c8f761 Mon Sep 17 00:00:00 2001 From: sebastijankuzner <58827427+sebastijankuzner@users.noreply.github.com> Date: Thu, 28 May 2026 14:59:47 +0000 Subject: [PATCH 34/37] style: resolve style guide violations [ci-lint-fix] --- packages/kernel/source/application.ts | 2 +- packages/transaction-pool-worker/source/worker-handler.ts | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/kernel/source/application.ts b/packages/kernel/source/application.ts index 06ba36401..b844a3298 100644 --- a/packages/kernel/source/application.ts +++ b/packages/kernel/source/application.ts @@ -57,7 +57,7 @@ export class Application extends BaseApplication implements Contracts.Kernel.App await this.#disposeServiceProviders(); this.get(Identifiers.Services.Log.Service).debug( - `Application ${this.thread()} is disposed.`, + `Application ${this.thread()} is disposed.`, ); } diff --git a/packages/transaction-pool-worker/source/worker-handler.ts b/packages/transaction-pool-worker/source/worker-handler.ts index 7e468dde0..f990f10dd 100644 --- a/packages/transaction-pool-worker/source/worker-handler.ts +++ b/packages/transaction-pool-worker/source/worker-handler.ts @@ -16,7 +16,6 @@ export class WorkerScriptHandler implements Contracts.TransactionPool.WorkerScri #app = new Application(); public async boot(flags: Contracts.Crypto.WorkerFlags): Promise { - await this.#app.bootstrap({ flags, }); From bc7a4262f6fc5f9f7bcf37f29ce460d30116d85d Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Thu, 28 May 2026 15:03:54 +0000 Subject: [PATCH 35/37] Support drain --- .../contracts/source/contracts/kernel/ipc.ts | 1 + packages/crypto-worker/source/worker.ts | 4 +++ packages/evm-api-worker/source/worker.ts | 4 +++ packages/kernel/source/ipc/subprocess.ts | 31 +++++++++++++++++++ .../transaction-pool-worker/source/worker.ts | 4 +++ 5 files changed, 44 insertions(+) diff --git a/packages/contracts/source/contracts/kernel/ipc.ts b/packages/contracts/source/contracts/kernel/ipc.ts index 3856647e4..3ae8e58aa 100644 --- a/packages/contracts/source/contracts/kernel/ipc.ts +++ b/packages/contracts/source/contracts/kernel/ipc.ts @@ -41,6 +41,7 @@ export interface Handler { export interface Subprocess { dispose(): Promise; + drain(): Promise; getQueueSize(): number; isStopped(): boolean; kill(): Promise; diff --git a/packages/crypto-worker/source/worker.ts b/packages/crypto-worker/source/worker.ts index 0f15aa9e3..11e5ed42a 100644 --- a/packages/crypto-worker/source/worker.ts +++ b/packages/crypto-worker/source/worker.ts @@ -35,6 +35,10 @@ export class Worker implements Contracts.Crypto.Worker { } async #doDispose(): Promise { + // Let any work already in flight finish before tearing the worker down, so the + // dispose doesn't cut off requests that other service providers issued before us. + await this.ipcSubprocess.drain(); + try { await this.ipcSubprocess.sendRequest("dispose"); } catch { diff --git a/packages/evm-api-worker/source/worker.ts b/packages/evm-api-worker/source/worker.ts index a98d876bd..342b5ce97 100644 --- a/packages/evm-api-worker/source/worker.ts +++ b/packages/evm-api-worker/source/worker.ts @@ -52,6 +52,10 @@ export class Worker implements Contracts.Evm.Worker { } async #doDispose(): Promise { + // Let any work already in flight finish before tearing the worker down, so the + // dispose doesn't cut off requests that other service providers issued before us. + await this.ipcSubprocess.drain(); + try { await this.ipcSubprocess.sendRequest("dispose"); } catch { diff --git a/packages/kernel/source/ipc/subprocess.ts b/packages/kernel/source/ipc/subprocess.ts index b12a523d5..5578711f5 100644 --- a/packages/kernel/source/ipc/subprocess.ts +++ b/packages/kernel/source/ipc/subprocess.ts @@ -13,6 +13,8 @@ export class Subprocess implements Contracts.Kernel.IPC.Subprocess { private readonly callbacks = new Map>(); private readonly eventHandlers = new Map>(); #stopped?: Error; + #drainPromise?: Promise; + #drainResolve?: () => void; public constructor( app: Contracts.Kernel.Application, @@ -81,6 +83,23 @@ export class Subprocess implements Contracts.Kernel.IPC.Subprocess { return this.subprocess.terminate(); } + // Resolves once every in-flight sendRequest has settled (either replied or rejected via + // #stop / messageerror). Callers use this to wait for the worker to be quiet before + // disposing, so terminate() doesn't cut off work in progress. + public async drain(): Promise { + if (this.callbacks.size === 0) { + return; + } + + if (!this.#drainPromise) { + this.#drainPromise = new Promise((resolve) => { + this.#drainResolve = resolve; + }); + } + + return this.#drainPromise; + } + public getQueueSize(): number { return this.callbacks.size; } @@ -120,6 +139,15 @@ export class Subprocess implements Contracts.Kernel.IPC.Subprocess { reject(error); } this.callbacks.clear(); + this.#notifyDrained(); + } + + #notifyDrained(): void { + if (this.#drainResolve) { + this.#drainResolve(); + this.#drainResolve = undefined; + this.#drainPromise = undefined; + } } private onMessage(message: Contracts.Kernel.IPC.Reply | Contracts.Kernel.IPC.Event): void { @@ -132,6 +160,9 @@ export class Subprocess implements Contracts.Kernel.IPC.Subprocess { } } finally { this.callbacks.delete(message.id); + if (this.callbacks.size === 0) { + this.#notifyDrained(); + } } return; diff --git a/packages/transaction-pool-worker/source/worker.ts b/packages/transaction-pool-worker/source/worker.ts index eb7d72697..c5b181073 100644 --- a/packages/transaction-pool-worker/source/worker.ts +++ b/packages/transaction-pool-worker/source/worker.ts @@ -54,6 +54,10 @@ export class Worker implements Contracts.TransactionPool.Worker { } async #doDispose(): Promise { + // Let any work already in flight finish before tearing the worker down, so the + // dispose doesn't cut off requests that other service providers issued before us. + await this.ipcSubprocess.drain(); + try { await this.ipcSubprocess.sendRequest("dispose"); } catch { From c62c412ab669356f46cf16240c27c6f0d5922211 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Thu, 28 May 2026 15:06:32 +0000 Subject: [PATCH 36/37] Remove dipose from app --- .../contracts/source/contracts/kernel/application.ts | 2 -- packages/kernel/source/application.ts | 10 +--------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/packages/contracts/source/contracts/kernel/application.ts b/packages/contracts/source/contracts/kernel/application.ts index 90f74c70f..45ab75bd0 100644 --- a/packages/contracts/source/contracts/kernel/application.ts +++ b/packages/contracts/source/contracts/kernel/application.ts @@ -6,8 +6,6 @@ export interface Application { boot(): Promise; - dispose(): Promise; - reboot(): Promise; config(key: string, value?: T, defaultValue?: T): T | undefined; diff --git a/packages/kernel/source/application.ts b/packages/kernel/source/application.ts index b844a3298..e5d24a0f2 100644 --- a/packages/kernel/source/application.ts +++ b/packages/kernel/source/application.ts @@ -53,16 +53,8 @@ export class Application extends BaseApplication implements Contracts.Kernel.App } } - public async dispose(): Promise { - await this.#disposeServiceProviders(); - - this.get(Identifiers.Services.Log.Service).debug( - `Application ${this.thread()} is disposed.`, - ); - } - public async reboot(): Promise { - await this.dispose(); + await this.#disposeServiceProviders(); await this.boot(); } From df9e07503556b4b55ff6863c3d327e73265e3f65 Mon Sep 17 00:00:00 2001 From: sebastijankuzner Date: Thu, 28 May 2026 18:45:50 +0000 Subject: [PATCH 37/37] Fix functional --- tests/functional/consensus/source/worker.ts | 14 ++++++++++---- tests/functional/resync/source/pool-worker.ts | 2 ++ tests/functional/resync/source/worker.ts | 14 ++++++++++---- .../transaction-pool-api/source/pool-worker.ts | 2 ++ .../transaction-pool-api/source/worker.ts | 14 ++++++++++---- 5 files changed, 34 insertions(+), 12 deletions(-) diff --git a/tests/functional/consensus/source/worker.ts b/tests/functional/consensus/source/worker.ts index e28153470..142fbb9b6 100644 --- a/tests/functional/consensus/source/worker.ts +++ b/tests/functional/consensus/source/worker.ts @@ -4,7 +4,7 @@ import { Identifiers } from "@mainsail/constants"; import { inject, injectable, tagged } from "@mainsail/container"; @injectable() -export class Worker implements Contracts.Crypto.WorkerScriptHandler { +export class Worker implements Contracts.Crypto.Worker { @inject(Identifiers.Cryptography.Transaction.Factory) private readonly transactionFactoryImp!: Contracts.Crypto.TransactionFactory; @@ -55,14 +55,20 @@ export class Worker implements Contracts.Crypto.WorkerScriptHandler { return this.#callPublicKeyFactory(method, arguments_); } - public async getQueueSize(): Promise { + public getQueueSize(): number { return 0; } - public async kill(signal?: number | NodeJS.Signals): Promise { - return true; + public isStopped(): boolean { + return false; } + public async kill(): Promise { + return 0; + } + + public async dispose(): Promise {} + async #callConsensusSignature>( method: K, arguments_: Parameters, diff --git a/tests/functional/resync/source/pool-worker.ts b/tests/functional/resync/source/pool-worker.ts index 0ae491532..520e7bea4 100644 --- a/tests/functional/resync/source/pool-worker.ts +++ b/tests/functional/resync/source/pool-worker.ts @@ -20,6 +20,8 @@ export class PoolWorker implements Contracts.TransactionPool.Worker { public async kill(): Promise { return 0; } + + public async dispose(): Promise {} public getQueueSize(): number { return 0; } diff --git a/tests/functional/resync/source/worker.ts b/tests/functional/resync/source/worker.ts index 06713b2ac..0bed131ea 100644 --- a/tests/functional/resync/source/worker.ts +++ b/tests/functional/resync/source/worker.ts @@ -4,7 +4,7 @@ import { Identifiers } from "@mainsail/constants"; import { inject, injectable, tagged } from "@mainsail/container"; @injectable() -export class Worker implements Contracts.Crypto.WorkerScriptHandler { +export class Worker implements Contracts.Crypto.Worker { @inject(Identifiers.Cryptography.Transaction.Factory) private readonly transactionFactoryImp!: Contracts.Crypto.TransactionFactory; @@ -51,14 +51,20 @@ export class Worker implements Contracts.Crypto.WorkerScriptHandler { throw new Error("Method publicKeyFactory not implemented."); } - public async getQueueSize(): Promise { + public getQueueSize(): number { return 0; } - public async kill(signal?: number | NodeJS.Signals): Promise { - return true; + public isStopped(): boolean { + return false; } + public async kill(): Promise { + return 0; + } + + public async dispose(): Promise {} + async #callConsensusSignature>( method: K, arguments_: Parameters, diff --git a/tests/functional/transaction-pool-api/source/pool-worker.ts b/tests/functional/transaction-pool-api/source/pool-worker.ts index 4f7c2aa5d..cec8667af 100644 --- a/tests/functional/transaction-pool-api/source/pool-worker.ts +++ b/tests/functional/transaction-pool-api/source/pool-worker.ts @@ -20,6 +20,8 @@ export class PoolWorker implements Contracts.TransactionPool.Worker { public async kill(): Promise { return 0; } + + public async dispose(): Promise {} public getQueueSize(): number { return 0; } diff --git a/tests/functional/transaction-pool-api/source/worker.ts b/tests/functional/transaction-pool-api/source/worker.ts index 7914451cf..044b243b5 100644 --- a/tests/functional/transaction-pool-api/source/worker.ts +++ b/tests/functional/transaction-pool-api/source/worker.ts @@ -4,7 +4,7 @@ import { Identifiers } from "@mainsail/constants"; import { inject, injectable, tagged } from "@mainsail/container"; @injectable() -export class Worker implements Contracts.Crypto.WorkerScriptHandler { +export class Worker implements Contracts.Crypto.Worker { @inject(Identifiers.Cryptography.Transaction.Factory) private readonly transactionFactoryImp!: Contracts.Crypto.TransactionFactory; @@ -49,14 +49,20 @@ export class Worker implements Contracts.Crypto.WorkerScriptHandler { throw new Error("Method publicKeyFactory not implemented."); } - public async getQueueSize(): Promise { + public getQueueSize(): number { return 0; } - public async kill(signal?: number | NodeJS.Signals): Promise { - return true; + public isStopped(): boolean { + return false; } + public async kill(): Promise { + return 0; + } + + public async dispose(): Promise {} + async #callConsensusSignature>( method: K, arguments_: Parameters,