diff --git a/packages/consensus/source/aggregator.ts b/packages/consensus/source/aggregator.ts index 861f59636..6b9389117 100644 --- a/packages/consensus/source/aggregator.ts +++ b/packages/consensus/source/aggregator.ts @@ -29,7 +29,7 @@ export class Aggregator implements Contracts.Consensus.Aggregator { validators[key] = true; } - const worker = await this.workerPool.getWorker(); + const worker = this.workerPool.getWorker(); const signature = await worker.consensusSignature("aggregate", signatures); return { @@ -51,7 +51,7 @@ export class Aggregator implements Contracts.Consensus.Aggregator { return false; } - const worker = await this.workerPool.getWorker(); + const worker = this.workerPool.getWorker(); const aggregatedPublicKey = await worker.publicKeyFactory("aggregate", validatorPublicKeys); diff --git a/packages/consensus/source/processors/message-processor.ts b/packages/consensus/source/processors/message-processor.ts index feabec51e..7db849aee 100644 --- a/packages/consensus/source/processors/message-processor.ts +++ b/packages/consensus/source/processors/message-processor.ts @@ -106,7 +106,7 @@ export class MessageProcessor extends AbstractProcessor implements Contracts.Con } async #hasValidSignature(message: Contracts.Crypto.Message): Promise { - const worker = await this.workerPool.getWorker(); + const worker = this.workerPool.getWorker(); return worker.consensusSignature( "verify", Buffer.from(message.signature, "hex"), diff --git a/packages/contracts/source/contracts/crypto/worker.ts b/packages/contracts/source/contracts/crypto/worker.ts index 6514ec34e..1eead6044 100644 --- a/packages/contracts/source/contracts/crypto/worker.ts +++ b/packages/contracts/source/contracts/crypto/worker.ts @@ -1,7 +1,7 @@ import type { BlockFactory } from "../crypto/block.js"; import type { PublicKeyFactory, SignatureBls, SignatureEcdsa } from "../crypto/identities.js"; import type { TransactionFactory } from "../crypto/transactions.js"; -import type { Requests, Subprocess } from "../kernel/ipc.js"; +import type { MethodArguments, Requests } from "../kernel/ipc.js"; import type { JsonObject } from "../types/index.js"; export interface WorkerFlags extends JsonObject { @@ -10,41 +10,61 @@ export interface WorkerFlags extends JsonObject { export interface WorkerScriptHandler { boot(flags: WorkerFlags): Promise; + dispose(): Promise; consensusSignature>( method: K, - ...arguments_: Parameters + arguments_: MethodArguments, ): Promise>; walletSignature>( method: K, - ...arguments_: Parameters + arguments_: MethodArguments, ): Promise>; blockFactory>( method: K, - ...arguments_: Parameters + arguments_: MethodArguments, ): Promise>; transactionFactory>( method: K, - ...arguments_: Parameters + arguments_: MethodArguments, ): Promise>; publicKeyFactory>( method: K, - ...arguments_: Parameters + arguments_: MethodArguments, ): Promise>; } export type WorkerFactory = () => Worker; -export type WorkerSubprocess = Subprocess; - -export type WorkerSubprocessFactory = () => WorkerSubprocess; - -export interface Worker extends WorkerScriptHandler { +export interface Worker { + boot(flags: WorkerFlags): Promise; + dispose(): Promise; getQueueSize(): number; + isStopped(): boolean; kill(): Promise; + consensusSignature>( + method: K, + ...arguments_: Parameters + ): Promise>; + walletSignature>( + method: K, + ...arguments_: Parameters + ): Promise>; + blockFactory>( + method: K, + ...arguments_: Parameters + ): Promise>; + transactionFactory>( + method: K, + ...arguments_: Parameters + ): Promise>; + publicKeyFactory>( + method: K, + ...arguments_: Parameters + ): Promise>; } export interface WorkerPool { boot(): Promise; - shutdown(): Promise; - getWorker(): Promise; + dispose(): Promise; + getWorker(): Worker; } diff --git a/packages/contracts/source/contracts/evm/worker.ts b/packages/contracts/source/contracts/evm/worker.ts index 0447db4bd..068040580 100644 --- a/packages/contracts/source/contracts/evm/worker.ts +++ b/packages/contracts/source/contracts/evm/worker.ts @@ -1,12 +1,12 @@ import type { CommitHandler } from "../crypto/index.js"; import type { EventListener } from "../kernel/index.js"; -import type { Subprocess } from "../kernel/ipc.js"; import type { KeyValuePair } from "../types/index.js"; export type WorkerFlags = KeyValuePair; export interface WorkerScriptHandler { boot(flags: WorkerFlags): Promise; + dispose(): Promise; setPeerCount(peerCount: number): Promise; commit(blockNumber: number): Promise; start(blockNumber: number): Promise; @@ -14,10 +14,6 @@ export interface WorkerScriptHandler { export type WorkerFactory = () => Worker; -export type WorkerSubprocess = Subprocess; - -export type WorkerSubprocessFactory = () => WorkerSubprocess; - export interface Worker extends Omit, CommitHandler, EventListener { getQueueSize(): number; kill(): Promise; diff --git a/packages/contracts/source/contracts/kernel/ipc.ts b/packages/contracts/source/contracts/kernel/ipc.ts index d085548d2..3ae8e58aa 100644 --- a/packages/contracts/source/contracts/kernel/ipc.ts +++ b/packages/contracts/source/contracts/kernel/ipc.ts @@ -22,7 +22,7 @@ export type ErrorReply = { export type Event = { event: string; - data: string; + data: unknown; }; export type Reply = SuccessReply | ErrorReply; @@ -39,9 +39,14 @@ export interface Handler { handleRequest>(method: K): void; } -export interface Subprocess { +export interface Subprocess { + dispose(): Promise; + drain(): Promise; getQueueSize(): number; + isStopped(): boolean; kill(): Promise; sendRequest(method: string, ...arguments_: unknown[]): Promise; registerEventHandler(event: string, callback: EventCallback): void; } + +export type SubprocessFactory = () => Subprocess; diff --git a/packages/contracts/source/contracts/transaction-pool/worker.ts b/packages/contracts/source/contracts/transaction-pool/worker.ts index 197bf4b48..14a975e46 100644 --- a/packages/contracts/source/contracts/transaction-pool/worker.ts +++ b/packages/contracts/source/contracts/transaction-pool/worker.ts @@ -1,6 +1,6 @@ import type { CommitHandler } from "../crypto/index.js"; import type { EventListener } from "../kernel/index.js"; -import type { EventCallback, Subprocess } from "../kernel/ipc.js"; +import type { EventCallback } from "../kernel/ipc.js"; import type { KeyValuePair } from "../types/index.js"; import type { GetBatchResult, GetBatchOptions } from "./selector.js"; @@ -8,6 +8,7 @@ export type WorkerFlags = KeyValuePair; export interface WorkerScriptHandler { boot(flags: WorkerFlags): Promise; + dispose(): Promise; getTransactions(options: GetBatchOptions): Promise; removeTransaction(address: string, id: string): Promise; commit(height: number, sendersAddresses: string[], consumedGas: number, isSyncing: boolean): Promise; @@ -19,10 +20,6 @@ export interface WorkerScriptHandler { export type WorkerFactory = () => Worker; -export type WorkerSubprocess = Subprocess; - -export type WorkerSubprocessFactory = () => WorkerSubprocess; - export interface Worker extends Omit, CommitHandler, EventListener { getQueueSize(): number; kill(): Promise; diff --git a/packages/core/bin/config/devnet/core/.env b/packages/core/bin/config/devnet/core/.env index dca0a0bcc..51afd0874 100644 --- a/packages/core/bin/config/devnet/core/.env +++ b/packages/core/bin/config/devnet/core/.env @@ -28,5 +28,4 @@ MAINSAIL_API_TRANSACTION_POOL_DISABLED= MAINSAIL_API_TRANSACTION_POOL_HOST=0.0.0.0 MAINSAIL_API_TRANSACTION_POOL_PORT=4007 -MAINSAIL_CRYPTO_WORKER_COUNT=2 MAINSAIL_CRYPTO_WORKER_LOGGING_ENABLED= diff --git a/packages/crypto-messages/source/factory.ts b/packages/crypto-messages/source/factory.ts index ae39f4776..961533cec 100644 --- a/packages/crypto-messages/source/factory.ts +++ b/packages/crypto-messages/source/factory.ts @@ -25,7 +25,7 @@ export class Factory implements Contracts.Crypto.MessageFactory { keyPair: Contracts.Crypto.KeyPair, context: Contracts.Crypto.SignatureMessageContext, ): Promise { - const worker = await this.workerPool.getWorker(); + const worker = this.workerPool.getWorker(); const bytes = await this.serializer.serializeMessageForSignature(data, context); const signature = await worker.consensusSignature("sign", bytes, Buffer.from(keyPair.privateKey, "hex")); diff --git a/packages/crypto-proposal/source/factory.ts b/packages/crypto-proposal/source/factory.ts index a6da2cf6b..ba9ddc316 100644 --- a/packages/crypto-proposal/source/factory.ts +++ b/packages/crypto-proposal/source/factory.ts @@ -31,7 +31,7 @@ export class Factory implements Contracts.Crypto.ProposalFactory { data: Contracts.Crypto.ProposalDataSerializableUnsigned, keyPair: Contracts.Crypto.KeyPair, ): Promise { - const worker = await this.workerPool.getWorker(); + const worker = this.workerPool.getWorker(); this.#verifySchema("proposalUnsigned", data); diff --git a/packages/crypto-transaction/source/factory.ts b/packages/crypto-transaction/source/factory.ts index 937199e02..7b7a120c1 100644 --- a/packages/crypto-transaction/source/factory.ts +++ b/packages/crypto-transaction/source/factory.ts @@ -122,7 +122,7 @@ export class TransactionFactory implements Contracts.Crypto.TransactionFactory { try { const { data: transaction } = await this.deserializer.deserialize(serialized); - const worker = this.workerPool ? await this.workerPool.getWorker() : undefined; + const worker = this.workerPool ? this.workerPool.getWorker() : undefined; const cryptoData = worker ? await worker.transactionFactory("computeCryptoData", transaction) : await this.computeCryptoData(transaction); diff --git a/packages/crypto-worker/package.json b/packages/crypto-worker/package.json index 739755432..3449ffc56 100644 --- a/packages/crypto-worker/package.json +++ b/packages/crypto-worker/package.json @@ -23,7 +23,6 @@ "@mainsail/constants": "workspace:*", "@mainsail/container": "workspace:*", "@mainsail/kernel": "workspace:*", - "@mainsail/utils": "workspace:*", "joi": "18.2.1" }, "devDependencies": { diff --git a/packages/crypto-worker/source/defaults.ts b/packages/crypto-worker/source/defaults.ts index 94fd83a5c..57497d178 100644 --- a/packages/crypto-worker/source/defaults.ts +++ b/packages/crypto-worker/source/defaults.ts @@ -3,6 +3,6 @@ import { Environment } from "@mainsail/kernel"; import { cpus } from "os"; export const defaults = { - workerCount: Environment.get(EnvironmentVariables.MAINSAIL_CRYPTO_WORKER_COUNT, cpus().length), + workerCount: Environment.get(EnvironmentVariables.MAINSAIL_CRYPTO_WORKER_COUNT, Math.min(cpus().length, 4)), workerLoggingEnabled: Environment.isTrue(EnvironmentVariables.MAINSAIL_CRYPTO_WORKER_LOGGING_ENABLED), }; diff --git a/packages/crypto-worker/source/service-provider.ts b/packages/crypto-worker/source/service-provider.ts index f65f76123..a2499ea30 100644 --- a/packages/crypto-worker/source/service-provider.ts +++ b/packages/crypto-worker/source/service-provider.ts @@ -5,7 +5,7 @@ import { injectable } from "@mainsail/container"; import { Ipc, Providers } from "@mainsail/kernel"; import Joi from "joi"; import { cpus } from "os"; -import { URL } from "url"; +import { fileURLToPath } from "url"; import { Worker } from "worker_threads"; import { WorkerPool } from "./worker-pool.js"; @@ -25,11 +25,11 @@ export class ServiceProvider extends Providers.ServiceProvider { this.app.bind(Identifiers.CryptoWorker.WorkerPool).to(WorkerPool).inSingletonScope(); this.app.bind<() => Ipc.Subprocess>(Identifiers.CryptoWorker.WorkerSubprocess.Factory).toFactory(() => () => { - const subprocess = new Worker(`${new URL(".", import.meta.url).pathname}/worker-script.js`, { + const subprocess = new Worker(fileURLToPath(new URL("worker-script.js", import.meta.url)), { stderr: true, stdout: true, }); - return new Ipc.Subprocess(this.app, "system", subprocess); + return new Ipc.Subprocess(this.app, "crypto", "system", subprocess); }); } @@ -38,7 +38,7 @@ export class ServiceProvider extends Providers.ServiceProvider { } public async dispose(): Promise { - await this.app.get(Identifiers.CryptoWorker.WorkerPool).shutdown(); + await this.app.get(Identifiers.CryptoWorker.WorkerPool).dispose(); } public async required(): Promise { diff --git a/packages/crypto-worker/source/worker-handler.ts b/packages/crypto-worker/source/worker-handler.ts index fcf5425a8..3ff65737a 100644 --- a/packages/crypto-worker/source/worker-handler.ts +++ b/packages/crypto-worker/source/worker-handler.ts @@ -107,60 +107,58 @@ function isBufferJson(value: unknown): value is BufferJson { } export class WorkerScriptHandler implements Contracts.Crypto.WorkerScriptHandler { + #app = new Application(); #impl!: WorkerImpl; public async boot(flags: Contracts.Crypto.WorkerFlags): Promise { - const app: Contracts.Kernel.Application = new Application(); - - await app.bootstrap({ + await this.#app.bootstrap({ flags, }); if (!flags.workerLoggingEnabled) { - app.rebind(Identifiers.Services.Log.Service).to(Services.Log.NullLogger); + this.#app.rebind(Identifiers.Services.Log.Service).to(Services.Log.NullLogger); } - await app.boot(); - this.#impl = app.resolve(WorkerImpl); + await this.#app.boot(); + this.#impl = this.#app.resolve(WorkerImpl); + } + + public async dispose(): Promise { + await this.#app.terminate(); } public async consensusSignature>( method: K, - ...arguments_: Parameters + arguments_: Contracts.Kernel.IPC.MethodArguments, ): Promise> { - // @ts-ignore - return this.#impl.callConsensusSignature(method, arguments_[0]); + return this.#impl.callConsensusSignature(method, arguments_); } public async walletSignature>( method: K, - ...arguments_: Parameters + arguments_: Contracts.Kernel.IPC.MethodArguments, ): Promise> { - // @ts-ignore - return this.#impl.callWalletSignature(method, arguments_[0]); + return this.#impl.callWalletSignature(method, arguments_); } public async blockFactory>( method: K, - ...arguments_: Parameters + arguments_: Contracts.Kernel.IPC.MethodArguments, ): Promise> { - // @ts-ignore - return this.#impl.callBlockFactory(method, arguments_[0]); + return this.#impl.callBlockFactory(method, arguments_); } public async transactionFactory>( method: K, - ...arguments_: Parameters + arguments_: Contracts.Kernel.IPC.MethodArguments, ): Promise> { - // @ts-ignore - return this.#impl.callTransactionFactory(method, arguments_[0]); + return this.#impl.callTransactionFactory(method, arguments_); } public async publicKeyFactory>( method: K, - ...arguments_: Parameters + arguments_: Contracts.Kernel.IPC.MethodArguments, ): Promise> { - // @ts-ignore - return this.#impl.callPublicKeyFactory(method, arguments_[0]); + return this.#impl.callPublicKeyFactory(method, arguments_); } } diff --git a/packages/crypto-worker/source/worker-pool.ts b/packages/crypto-worker/source/worker-pool.ts index c637ac9c3..f2aa2242e 100644 --- a/packages/crypto-worker/source/worker-pool.ts +++ b/packages/crypto-worker/source/worker-pool.ts @@ -15,25 +15,26 @@ export class WorkerPool implements Contracts.Crypto.WorkerPool { @inject(Identifiers.CryptoWorker.Worker.Factory) private readonly createWorker!: Contracts.Crypto.WorkerFactory; - private workers: Contracts.Crypto.Worker[] = []; - @inject(Identifiers.Config.Flags) private readonly flags!: Contracts.Types.KeyValuePair; + #workers: Contracts.Crypto.Worker[] = []; #currentWorkerIndex = 0; public async boot(): Promise { const workerCount = this.configuration.getRequired("workerCount"); + this.logger.info(`Booting up ${workerCount} crypto workers`); + + const workers: Contracts.Crypto.Worker[] = []; + for (let index = 0; index < workerCount; index++) { const worker = this.createWorker(); - this.workers.push(worker); + workers.push(worker); } - this.logger.info(`Booting up ${this.workers.length} crypto workers`); - await Promise.all( - this.workers.map((worker) => + workers.map((worker) => worker.boot({ ...this.flags, thread: "crypto-worker", @@ -41,16 +42,44 @@ export class WorkerPool implements Contracts.Crypto.WorkerPool { }), ), ); + + this.#workers = workers; } - public async shutdown(): Promise { - await Promise.all(this.workers.map(async (worker) => await worker.kill())); + public async dispose(): Promise { + const workers = this.#workers; + this.#workers = []; + await Promise.all(workers.map(async (worker) => await worker.dispose())); } - public async getWorker(): Promise { - const worker = this.workers[this.#currentWorkerIndex]; - this.#currentWorkerIndex = (this.#currentWorkerIndex + 1) % this.workers.length; + public getWorker(): Contracts.Crypto.Worker { + const workers = this.#workers.filter((worker) => !worker.isStopped()); + + if (workers.length === 0) { + throw new Error("No crypto workers available"); + } + + // Eviction may have shrunk the pool past the cursor; bring it back in range. + this.#currentWorkerIndex %= workers.length; + + // Pick the worker with the fewest in-flight requests. Scanning starts at a + // rotating cursor and only replaces the pick on a strictly smaller queue, so + // ties (e.g. all workers idle) fall back to round-robin and spread evenly. + let selected = workers[this.#currentWorkerIndex]; + let smallestQueueSize = selected.getQueueSize(); + + for (let offset = 1; offset < workers.length; offset++) { + const worker = workers[(this.#currentWorkerIndex + offset) % workers.length]; + const queueSize = worker.getQueueSize(); + + if (queueSize < smallestQueueSize) { + selected = worker; + smallestQueueSize = queueSize; + } + } + + this.#currentWorkerIndex = (this.#currentWorkerIndex + 1) % workers.length; - return worker; + return selected; } } diff --git a/packages/crypto-worker/source/worker.ts b/packages/crypto-worker/source/worker.ts index 71a65b879..11e5ed42a 100644 --- a/packages/crypto-worker/source/worker.ts +++ b/packages/crypto-worker/source/worker.ts @@ -1,36 +1,53 @@ import type { Contracts } from "@mainsail/contracts"; import { Identifiers } from "@mainsail/constants"; -import { inject, injectable } from "@mainsail/container"; -import { sleep } from "@mainsail/utils"; +import { inject, injectable, postConstruct } from "@mainsail/container"; @injectable() export class Worker implements Contracts.Crypto.Worker { @inject(Identifiers.CryptoWorker.WorkerSubprocess.Factory) - private readonly createWorkerSubprocess!: Contracts.Crypto.WorkerSubprocessFactory; + private readonly createWorkerSubprocess!: Contracts.Kernel.IPC.SubprocessFactory; - private ipcSubprocess!: Contracts.Crypto.WorkerSubprocess; + private ipcSubprocess!: Contracts.Kernel.IPC.Subprocess; - #booted = false; - #booting = false; + #bootPromise?: Promise; + #disposePromise?: Promise; - public async boot(flags: Contracts.Crypto.WorkerFlags): Promise { + @postConstruct() + public initialize(): void { this.ipcSubprocess = this.createWorkerSubprocess(); + } - while (this.#booting) { - await sleep(50); + public async boot(flags: Contracts.Crypto.WorkerFlags): Promise { + if (!this.#bootPromise) { + this.#bootPromise = this.ipcSubprocess.sendRequest("boot", flags); } - if (this.#booted) { - return; + await this.#bootPromise; + } + + public async dispose(): Promise { + if (!this.#disposePromise) { + this.#disposePromise = this.#doDispose(); } - this.#booting = true; + await this.#disposePromise; + } + + async #doDispose(): Promise { + // Let any work already in flight finish before tearing the worker down, so the + // dispose doesn't cut off requests that other service providers issued before us. + await this.ipcSubprocess.drain(); - await this.ipcSubprocess.sendRequest("boot", flags); + try { + await this.ipcSubprocess.sendRequest("dispose"); + } catch { + // Worker may have died mid-dispose; we still need to terminate the thread. + } - this.#booting = false; - this.#booted = true; + // Graceful inner shutdown is done; now terminate the worker thread so it doesn't hang + // around with an open parentPort listener. After this, isStopped() === true. + await this.ipcSubprocess.dispose(); } public async kill(): Promise { @@ -41,48 +58,62 @@ export class Worker implements Contracts.Crypto.Worker { return this.ipcSubprocess.getQueueSize(); } + public isStopped(): boolean { + return this.ipcSubprocess.isStopped(); + } + public async consensusSignature>( method: K, ...arguments_: Parameters ): Promise> { - return this.ipcSubprocess.sendRequest("consensusSignature", method, arguments_) as Promise< - ReturnType - >; + return this.ipcSubprocess.sendRequest>( + "consensusSignature", + method, + arguments_, + ); } public async walletSignature>( method: K, ...arguments_: Parameters ): Promise> { - return this.ipcSubprocess.sendRequest("walletSignature", method, arguments_) as Promise< - ReturnType - >; + return this.ipcSubprocess.sendRequest>( + "walletSignature", + method, + arguments_, + ); } public async blockFactory>( method: K, ...arguments_: Parameters ): Promise> { - return this.ipcSubprocess.sendRequest("blockFactory", method, arguments_) as Promise< - ReturnType - >; + return this.ipcSubprocess.sendRequest>( + "blockFactory", + method, + arguments_, + ); } public async transactionFactory>( method: K, ...arguments_: Parameters ): Promise> { - return this.ipcSubprocess.sendRequest("transactionFactory", method, arguments_) as Promise< - ReturnType - >; + return this.ipcSubprocess.sendRequest>( + "transactionFactory", + method, + arguments_, + ); } public async publicKeyFactory>( method: K, ...arguments_: Parameters ): Promise> { - return this.ipcSubprocess.sendRequest("publicKeyFactory", method, arguments_) as Promise< - ReturnType - >; + return this.ipcSubprocess.sendRequest>( + "publicKeyFactory", + method, + arguments_, + ); } } diff --git a/packages/evm-api-worker/source/service-provider.ts b/packages/evm-api-worker/source/service-provider.ts index 00d8aff66..5a8d1cc20 100644 --- a/packages/evm-api-worker/source/service-provider.ts +++ b/packages/evm-api-worker/source/service-provider.ts @@ -4,6 +4,7 @@ import { Identifiers } from "@mainsail/constants"; import { inject, injectable } from "@mainsail/container"; import { Ipc, Providers } from "@mainsail/kernel"; import Joi from "joi"; +import { fileURLToPath } from "url"; import { Worker } from "worker_threads"; import { Worker as WorkerInstance } from "./worker.js"; @@ -15,11 +16,11 @@ export class ServiceProvider extends Providers.ServiceProvider { public async register(): Promise { this.app.bind<() => Ipc.Subprocess>(Identifiers.Evm.WorkerSubprocess.Factory).toFactory(() => () => { - const subprocess = new Worker(`${new URL(".", import.meta.url).pathname}/worker-script.js`, { + const subprocess = new Worker(fileURLToPath(new URL("worker-script.js", import.meta.url)), { stderr: true, stdout: true, }); - return new Ipc.Subprocess(this.app, "api", subprocess); + return new Ipc.Subprocess(this.app, "evm-api", "api", subprocess); }); this.app.bind(Identifiers.Evm.Worker).toConstantValue(this.app.resolve(WorkerInstance)); @@ -33,7 +34,7 @@ export class ServiceProvider extends Providers.ServiceProvider { } public async dispose(): Promise { - await this.app.get(Identifiers.Evm.Worker).kill(); + await this.app.get(Identifiers.Evm.Worker).dispose(); } public async required(): Promise { diff --git a/packages/evm-api-worker/source/worker-handler.ts b/packages/evm-api-worker/source/worker-handler.ts index 4221a5bfe..a0e0668d8 100644 --- a/packages/evm-api-worker/source/worker-handler.ts +++ b/packages/evm-api-worker/source/worker-handler.ts @@ -5,17 +5,18 @@ import { Application } from "@mainsail/kernel"; import { CommitHandler, SetPeerCountHandler, StartHandler } from "./handlers/index.js"; export class WorkerScriptHandler implements Contracts.Evm.WorkerScriptHandler { - #app!: Contracts.Kernel.Application; + #app = new Application(); public async boot(flags: Contracts.Crypto.WorkerFlags): Promise { - const app: Contracts.Kernel.Application = new Application(); - - await app.bootstrap({ + await this.#app.bootstrap({ flags, }); - await app.boot(); - this.#app = app; + await this.#app.boot(); + } + + public async dispose(): Promise { + await this.#app.terminate(); } public async start(height: number): Promise { diff --git a/packages/evm-api-worker/source/worker.ts b/packages/evm-api-worker/source/worker.ts index b7b8e581b..342b5ce97 100644 --- a/packages/evm-api-worker/source/worker.ts +++ b/packages/evm-api-worker/source/worker.ts @@ -6,7 +6,7 @@ import { inject, injectable, postConstruct } from "@mainsail/container"; @injectable() export class Worker implements Contracts.Evm.Worker { @inject(Identifiers.Evm.WorkerSubprocess.Factory) - private readonly createWorkerSubprocess!: Contracts.Crypto.WorkerSubprocessFactory; + private readonly createWorkerSubprocess!: Contracts.Kernel.IPC.SubprocessFactory; @inject(Identifiers.Services.EventDispatcher.Service) private readonly eventDispatcher!: Contracts.Kernel.EventDispatcher; @@ -14,9 +14,10 @@ export class Worker implements Contracts.Evm.Worker { @inject(Identifiers.P2P.Peer.Repository) private readonly p2pRepository!: Contracts.P2P.PeerRepository; - private ipcSubprocess!: Contracts.Evm.WorkerSubprocess; + private ipcSubprocess!: Contracts.Kernel.IPC.Subprocess; - #booted = false; + #bootPromise?: Promise; + #disposePromise?: Promise; @postConstruct() public initialize(): void { @@ -35,12 +36,35 @@ export class Worker implements Contracts.Evm.Worker { } public async boot(flags: Contracts.Evm.WorkerFlags): Promise { - if (this.#booted) { - return; + if (!this.#bootPromise) { + this.#bootPromise = this.ipcSubprocess.sendRequest("boot", flags); } - this.#booted = true; - await this.ipcSubprocess.sendRequest("boot", flags); + await this.#bootPromise; + } + + public async dispose(): Promise { + if (!this.#disposePromise) { + this.#disposePromise = this.#doDispose(); + } + + await this.#disposePromise; + } + + async #doDispose(): Promise { + // Let any work already in flight finish before tearing the worker down, so the + // dispose doesn't cut off requests that other service providers issued before us. + await this.ipcSubprocess.drain(); + + try { + await this.ipcSubprocess.sendRequest("dispose"); + } catch { + // Worker may have died mid-dispose; we still need to terminate the thread. + } + + // Graceful inner shutdown is done; now terminate the worker thread so it doesn't hang + // around with an open parentPort listener. After this, isStopped() === true. + await this.ipcSubprocess.dispose(); } public async kill(): Promise { diff --git a/packages/kernel/source/application.ts b/packages/kernel/source/application.ts index 883993228..e5d24a0f2 100644 --- a/packages/kernel/source/application.ts +++ b/packages/kernel/source/application.ts @@ -151,7 +151,7 @@ export class Application extends BaseApplication implements Contracts.Kernel.App if (reason) { this.get(Identifiers.Services.Log.Service)[error ? "error" : "warn"]( - `Application shutdown: ${reason}`, + `${this.isWorker() ? "Worker " + this.thread() : "Application"} shutdown: ${reason}`, ); } @@ -170,7 +170,7 @@ export class Application extends BaseApplication implements Contracts.Kernel.App const timeout = setTimeout(() => { this.get(Identifiers.Services.Log.Service).warn( - "Force application termination. Service providers did not dispose in time.", + `Force ${this.isWorker() ? "worker " + this.thread() : "application"} termination. Service providers did not dispose in time.`, ); exit(1); }, 3000); @@ -184,10 +184,10 @@ export class Application extends BaseApplication implements Contracts.Kernel.App this.#logOpenHandlers(); this.get(Identifiers.Services.Log.Service).notice( - "Application is gracefully terminated.", + `${this.isWorker() ? "Worker " + this.thread() : "Application"} is gracefully terminated.`, ); - exit(1); + exit(0); } async #bootstrapWith(type: string): Promise { @@ -227,7 +227,6 @@ export class Application extends BaseApplication implements Contracts.Kernel.App #logOpenHandlers(): void { try { - // @ts-ignore const resourcesInfo: string[] = process.getActiveResourcesInfo(); // Method is experimental const timeouts = resourcesInfo.filter((resource) => resource.includes("Timeout")); diff --git a/packages/kernel/source/ipc/emit.ts b/packages/kernel/source/ipc/emit.ts index b5ebe804b..e723a0a32 100644 --- a/packages/kernel/source/ipc/emit.ts +++ b/packages/kernel/source/ipc/emit.ts @@ -1,5 +1,5 @@ import { parentPort } from "worker_threads"; -export const emit = (event: string, data: T): void => { +export const emit = (event: string, data: unknown): void => { parentPort?.postMessage({ data, event }); }; diff --git a/packages/kernel/source/ipc/handler.ts b/packages/kernel/source/ipc/handler.ts index a22fa44a3..a7f21311e 100644 --- a/packages/kernel/source/ipc/handler.ts +++ b/packages/kernel/source/ipc/handler.ts @@ -19,11 +19,11 @@ export class Handler implements Contracts.Kernel.IPC.Handler = Record> implements Contracts.Kernel.IPC - .Subprocess { +export class Subprocess implements Contracts.Kernel.IPC.Subprocess { #logLevels = new Set(LogLevels); private lastId = 1; private readonly subprocess: Worker; - private readonly callbacks = new Map>(); - private readonly eventHandlers = new Map>(); + private readonly workerName: string; + private readonly callbacks = new Map>(); + private readonly eventHandlers = new Map>(); + #stopped?: Error; + #drainPromise?: Promise; + #drainResolve?: () => void; public constructor( app: Contracts.Kernel.Application, + name: string, loggerContext: Contracts.Kernel.LoggerContext, subprocess: Worker, ) { this.subprocess = subprocess; - this.subprocess.on("message", this.onSubprocessMessage.bind(this)); - this.subprocess.on("message", this.onEmit.bind(this)); + + // Capture the thread id up front: Node resets it to -1 once the worker exits. + this.workerName = `${name}-${subprocess.threadId}`; const logger = app.get(Identifiers.Services.Log.Service); + logger.debug(`Spawning worker ${this.workerName}`); + + this.subprocess.on("message", this.onMessage.bind(this)); + this.subprocess.on("error", (error: Error) => { + logger.error(`Worker ${this.workerName} error: ${error.message}`); + this.#stop(error); + }); + this.subprocess.on("exit", (code) => { + logger.debug(`Worker ${this.workerName} stopped with exit code ${code}`); + this.#stop(new Error(`Worker ${this.workerName} stopped with exit code ${code}`)); + }); + // A reply that fails to deserialize cannot be matched back to its request id, so the + // pending callback can never be settled. Reject everything in flight to avoid a silent + // hang. The worker stays alive after this, so it is not marked stopped. + this.subprocess.on("messageerror", (error: Error) => { + logger.error(`Worker ${this.workerName} message could not be deserialized: ${error.message}`); + this.rejectPending(error); + }); + this.subprocess.stdout.pipe(split()).on("data", (line) => { // [LEVEL] MESSAGE const match = line.match(/^\[(\w+)]\s+(.*)$/); @@ -48,19 +72,53 @@ export class Subprocess = Record { + this.#stopped ??= new Error(`Worker ${this.workerName} was killed`); return this.subprocess.terminate(); } + // Graceful counterpart to kill(): same termination, but signals "normal shutdown" rather + // than a critical-error abort. Any pending request still in flight rejects with this reason. + public async dispose(): Promise { + this.#stopped ??= new Error(`Worker ${this.workerName} is being disposed`); + return this.subprocess.terminate(); + } + + // Resolves once every in-flight sendRequest has settled (either replied or rejected via + // #stop / messageerror). Callers use this to wait for the worker to be quiet before + // disposing, so terminate() doesn't cut off work in progress. + public async drain(): Promise { + if (this.callbacks.size === 0) { + return; + } + + if (!this.#drainPromise) { + this.#drainPromise = new Promise((resolve) => { + this.#drainResolve = resolve; + }); + } + + return this.#drainPromise; + } + public getQueueSize(): number { return this.callbacks.size; } - // TODO: use type magic to infer args (didn't work when T is also using same signatures) + public isStopped(): boolean { + return this.#stopped !== undefined; + } + public sendRequest(method: string, ...arguments_: unknown[]): Promise { - return new Promise((resolve, reject) => { + if (this.#stopped) { + return Promise.reject(this.#stopped); + } + + return new Promise((resolve, reject) => { const id = this.lastId++; - this.callbacks.set(id, { reject, resolve } as unknown as Contracts.Kernel.IPC.RequestCallback); - // TODO: we have to make sure args are always serializable and ideally don't copy + // The callbacks map is heterogeneous (one entry per in-flight request, each with its + // own result type), so it stores `unknown`; this is the boundary where the request's + // `T` is erased. `sendRequest` keeps the promise typed for the caller. + this.callbacks.set(id, { reject, resolve: resolve as (result: unknown) => void }); this.subprocess.postMessage({ args: arguments_, id, method }); }); } @@ -69,31 +127,49 @@ export class Subprocess = Record); } - private onEmit(message: Contracts.Kernel.IPC.Event): void { - if (!("event" in message)) { - return; - } + // Mark the worker permanently gone, then reject anything in flight. The first reason wins + // (a crash `error` is more informative than the `exit` that follows it). + #stop(error: Error): void { + this.#stopped ??= error; + this.rejectPending(error); + } - const callback = this.eventHandlers.get(message.event); + private rejectPending(error: Error): void { + for (const { reject } of this.callbacks.values()) { + reject(error); + } + this.callbacks.clear(); + this.#notifyDrained(); + } - if (callback) { - callback(message.data); + #notifyDrained(): void { + if (this.#drainResolve) { + this.#drainResolve(); + this.#drainResolve = undefined; + this.#drainPromise = undefined; } } - private onSubprocessMessage(message: Contracts.Kernel.IPC.Reply): void { - if (!("id" in message)) { + private onMessage(message: Contracts.Kernel.IPC.Reply | Contracts.Kernel.IPC.Event): void { + if ("id" in message) { + try { + if ("error" in message) { + this.callbacks.get(message.id)?.reject(new Error(message.error)); + } else { + this.callbacks.get(message.id)?.resolve(message.result); + } + } finally { + this.callbacks.delete(message.id); + if (this.callbacks.size === 0) { + this.#notifyDrained(); + } + } + return; } - try { - if ("error" in message) { - this.callbacks.get(message.id)?.reject(new Error(message.error)); - } else { - this.callbacks.get(message.id)?.resolve(message.result as unknown as T); - } - } finally { - this.callbacks.delete(message.id); + if ("event" in message) { + this.eventHandlers.get(message.event)?.(message.data); } } } diff --git a/packages/p2p/source/peer-processor.ts b/packages/p2p/source/peer-processor.ts index ca239cf11..3f024ed7b 100644 --- a/packages/p2p/source/peer-processor.ts +++ b/packages/p2p/source/peer-processor.ts @@ -55,7 +55,7 @@ export class PeerProcessor implements Contracts.P2P.PeerProcessor { handle: async (): Promise => this.#disconnectInvalidPeers(), }); - this.transactionPoolWorker.registerEventHandler("peer.removed", (ip: string) => { + this.transactionPoolWorker.registerEventHandler(Events.PeerEvent.Removed, (ip: string) => { this.peerDisposer.disposePeer(ip); }); } diff --git a/packages/transaction-pool-broadcaster/source/peer-communicator.ts b/packages/transaction-pool-broadcaster/source/peer-communicator.ts index cf59448a0..0692f9118 100644 --- a/packages/transaction-pool-broadcaster/source/peer-communicator.ts +++ b/packages/transaction-pool-broadcaster/source/peer-communicator.ts @@ -1,6 +1,6 @@ import type { Contracts } from "@mainsail/contracts"; -import { Identifiers } from "@mainsail/constants"; +import { Events, Identifiers } from "@mainsail/constants"; import { inject, injectable, tagged } from "@mainsail/container"; import { Ipc } from "@mainsail/kernel"; import { ensureError, http } from "@mainsail/utils"; @@ -40,7 +40,7 @@ export class PeerCommunicator implements Contracts.TransactionPool.PeerCommunica if (peer.errorCount++ > this.configuration.getRequired("maxSequentialErrors")) { this.repository.forgetPeer(peer.ip); - Ipc.emit("peer.removed", peer.ip); + Ipc.emit(Events.PeerEvent.Removed, peer.ip); } } } diff --git a/packages/transaction-pool-worker/source/service-provider.ts b/packages/transaction-pool-worker/source/service-provider.ts index ac9db042e..d56031bd9 100644 --- a/packages/transaction-pool-worker/source/service-provider.ts +++ b/packages/transaction-pool-worker/source/service-provider.ts @@ -4,6 +4,7 @@ import { Identifiers } from "@mainsail/constants"; import { inject, injectable } from "@mainsail/container"; import { Ipc, Providers } from "@mainsail/kernel"; import Joi from "joi"; +import { fileURLToPath } from "url"; import { Worker } from "worker_threads"; import { Worker as WorkerInstance } from "./worker.js"; @@ -17,11 +18,11 @@ export class ServiceProvider extends Providers.ServiceProvider { this.app .bind<() => Ipc.Subprocess>(Identifiers.TransactionPool.WorkerSubprocess.Factory) .toFactory(() => () => { - const subprocess = new Worker(`${new URL(".", import.meta.url).pathname}/worker-script.js`, { + const subprocess = new Worker(fileURLToPath(new URL("worker-script.js", import.meta.url)), { stderr: true, stdout: true, }); - return new Ipc.Subprocess(this.app, "tx-pool", subprocess); + return new Ipc.Subprocess(this.app, "transaction-pool", "tx-pool", subprocess); }); this.app.bind(Identifiers.TransactionPool.Worker).toConstantValue(this.app.resolve(WorkerInstance)); @@ -35,7 +36,7 @@ export class ServiceProvider extends Providers.ServiceProvider { } public async dispose(): Promise { - await this.app.get(Identifiers.TransactionPool.Worker).kill(); + await this.app.get(Identifiers.TransactionPool.Worker).dispose(); } public async required(): Promise { diff --git a/packages/transaction-pool-worker/source/worker-handler.ts b/packages/transaction-pool-worker/source/worker-handler.ts index 8ddaeb987..f990f10dd 100644 --- a/packages/transaction-pool-worker/source/worker-handler.ts +++ b/packages/transaction-pool-worker/source/worker-handler.ts @@ -13,17 +13,18 @@ import { } from "./handlers/index.js"; export class WorkerScriptHandler implements Contracts.TransactionPool.WorkerScriptHandler { - #app!: Contracts.Kernel.Application; + #app = new Application(); public async boot(flags: Contracts.Crypto.WorkerFlags): Promise { - const app: Contracts.Kernel.Application = new Application(); - - await app.bootstrap({ + await this.#app.bootstrap({ flags, }); - await app.boot(); - this.#app = app; + await this.#app.boot(); + } + + public async dispose(): Promise { + await this.#app.terminate(); } public async start(height: number): Promise { diff --git a/packages/transaction-pool-worker/source/worker.ts b/packages/transaction-pool-worker/source/worker.ts index a7da9e1a7..c5b181073 100644 --- a/packages/transaction-pool-worker/source/worker.ts +++ b/packages/transaction-pool-worker/source/worker.ts @@ -7,7 +7,7 @@ import dayjs from "dayjs"; @injectable() export class Worker implements Contracts.TransactionPool.Worker { @inject(Identifiers.TransactionPool.WorkerSubprocess.Factory) - private readonly createWorkerSubprocess!: Contracts.Crypto.WorkerSubprocessFactory; + private readonly createWorkerSubprocess!: Contracts.Kernel.IPC.SubprocessFactory; @inject(Identifiers.Cryptography.Configuration) private readonly configuration!: Contracts.Crypto.Configuration; @@ -15,9 +15,10 @@ export class Worker implements Contracts.TransactionPool.Worker { @inject(Identifiers.Services.EventDispatcher.Service) private readonly eventDispatcher!: Contracts.Kernel.EventDispatcher; - private ipcSubprocess!: Contracts.TransactionPool.WorkerSubprocess; + private ipcSubprocess!: Contracts.Kernel.IPC.Subprocess; - #booted = false; + #bootPromise?: Promise; + #disposePromise?: Promise; @postConstruct() public initialize(): void { @@ -37,12 +38,35 @@ export class Worker implements Contracts.TransactionPool.Worker { } public async boot(flags: Contracts.TransactionPool.WorkerFlags): Promise { - if (this.#booted) { - return; + if (!this.#bootPromise) { + this.#bootPromise = this.ipcSubprocess.sendRequest("boot", flags); } - this.#booted = true; - await this.ipcSubprocess.sendRequest("boot", flags); + await this.#bootPromise; + } + + public async dispose(): Promise { + if (!this.#disposePromise) { + this.#disposePromise = this.#doDispose(); + } + + await this.#disposePromise; + } + + async #doDispose(): Promise { + // Let any work already in flight finish before tearing the worker down, so the + // dispose doesn't cut off requests that other service providers issued before us. + await this.ipcSubprocess.drain(); + + try { + await this.ipcSubprocess.sendRequest("dispose"); + } catch { + // Worker may have died mid-dispose; we still need to terminate the thread. + } + + // Graceful inner shutdown is done; now terminate the worker thread so it doesn't hang + // around with an open parentPort listener. After this, isStopped() === true. + await this.ipcSubprocess.dispose(); } public async kill(): Promise { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 485d722e8..07fc34c97 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1925,9 +1925,6 @@ importers: '@mainsail/kernel': specifier: workspace:* version: link:../kernel - '@mainsail/utils': - specifier: workspace:* - version: link:../utils joi: specifier: 18.2.1 version: 18.2.1 diff --git a/tests/functional/consensus/source/worker.ts b/tests/functional/consensus/source/worker.ts index e28153470..142fbb9b6 100644 --- a/tests/functional/consensus/source/worker.ts +++ b/tests/functional/consensus/source/worker.ts @@ -4,7 +4,7 @@ import { Identifiers } from "@mainsail/constants"; import { inject, injectable, tagged } from "@mainsail/container"; @injectable() -export class Worker implements Contracts.Crypto.WorkerScriptHandler { +export class Worker implements Contracts.Crypto.Worker { @inject(Identifiers.Cryptography.Transaction.Factory) private readonly transactionFactoryImp!: Contracts.Crypto.TransactionFactory; @@ -55,14 +55,20 @@ export class Worker implements Contracts.Crypto.WorkerScriptHandler { return this.#callPublicKeyFactory(method, arguments_); } - public async getQueueSize(): Promise { + public getQueueSize(): number { return 0; } - public async kill(signal?: number | NodeJS.Signals): Promise { - return true; + public isStopped(): boolean { + return false; } + public async kill(): Promise { + return 0; + } + + public async dispose(): Promise {} + async #callConsensusSignature>( method: K, arguments_: Parameters, diff --git a/tests/functional/resync/source/pool-worker.ts b/tests/functional/resync/source/pool-worker.ts index 0ae491532..520e7bea4 100644 --- a/tests/functional/resync/source/pool-worker.ts +++ b/tests/functional/resync/source/pool-worker.ts @@ -20,6 +20,8 @@ export class PoolWorker implements Contracts.TransactionPool.Worker { public async kill(): Promise { return 0; } + + public async dispose(): Promise {} public getQueueSize(): number { return 0; } diff --git a/tests/functional/resync/source/worker.ts b/tests/functional/resync/source/worker.ts index 06713b2ac..0bed131ea 100644 --- a/tests/functional/resync/source/worker.ts +++ b/tests/functional/resync/source/worker.ts @@ -4,7 +4,7 @@ import { Identifiers } from "@mainsail/constants"; import { inject, injectable, tagged } from "@mainsail/container"; @injectable() -export class Worker implements Contracts.Crypto.WorkerScriptHandler { +export class Worker implements Contracts.Crypto.Worker { @inject(Identifiers.Cryptography.Transaction.Factory) private readonly transactionFactoryImp!: Contracts.Crypto.TransactionFactory; @@ -51,14 +51,20 @@ export class Worker implements Contracts.Crypto.WorkerScriptHandler { throw new Error("Method publicKeyFactory not implemented."); } - public async getQueueSize(): Promise { + public getQueueSize(): number { return 0; } - public async kill(signal?: number | NodeJS.Signals): Promise { - return true; + public isStopped(): boolean { + return false; } + public async kill(): Promise { + return 0; + } + + public async dispose(): Promise {} + async #callConsensusSignature>( method: K, arguments_: Parameters, diff --git a/tests/functional/transaction-pool-api/source/pool-worker.ts b/tests/functional/transaction-pool-api/source/pool-worker.ts index 4f7c2aa5d..cec8667af 100644 --- a/tests/functional/transaction-pool-api/source/pool-worker.ts +++ b/tests/functional/transaction-pool-api/source/pool-worker.ts @@ -20,6 +20,8 @@ export class PoolWorker implements Contracts.TransactionPool.Worker { public async kill(): Promise { return 0; } + + public async dispose(): Promise {} public getQueueSize(): number { return 0; } diff --git a/tests/functional/transaction-pool-api/source/worker.ts b/tests/functional/transaction-pool-api/source/worker.ts index 7914451cf..044b243b5 100644 --- a/tests/functional/transaction-pool-api/source/worker.ts +++ b/tests/functional/transaction-pool-api/source/worker.ts @@ -4,7 +4,7 @@ import { Identifiers } from "@mainsail/constants"; import { inject, injectable, tagged } from "@mainsail/container"; @injectable() -export class Worker implements Contracts.Crypto.WorkerScriptHandler { +export class Worker implements Contracts.Crypto.Worker { @inject(Identifiers.Cryptography.Transaction.Factory) private readonly transactionFactoryImp!: Contracts.Crypto.TransactionFactory; @@ -49,14 +49,20 @@ export class Worker implements Contracts.Crypto.WorkerScriptHandler { throw new Error("Method publicKeyFactory not implemented."); } - public async getQueueSize(): Promise { + public getQueueSize(): number { return 0; } - public async kill(signal?: number | NodeJS.Signals): Promise { - return true; + public isStopped(): boolean { + return false; } + public async kill(): Promise { + return 0; + } + + public async dispose(): Promise {} + async #callConsensusSignature>( method: K, arguments_: Parameters,