diff --git a/packages/crypto-worker/package.json b/packages/crypto-worker/package.json index 3449ffc56..f1f6d3ed8 100644 --- a/packages/crypto-worker/package.json +++ b/packages/crypto-worker/package.json @@ -31,6 +31,7 @@ "@types/chance": "1.1.8", "@types/fs-extra": "11.0.4", "@types/tmp": "0.2.6", + "esmock": "2.7.5", "uvu": "0.5.6" }, "engines": { diff --git a/packages/crypto-worker/source/defaults.test.ts b/packages/crypto-worker/source/defaults.test.ts new file mode 100644 index 000000000..d5c27f164 --- /dev/null +++ b/packages/crypto-worker/source/defaults.test.ts @@ -0,0 +1,47 @@ +import { EnvironmentVariables } from "@mainsail/constants"; +import esmock from "esmock"; + +import { describe } from "@mainsail/test-runner"; + +let bust = 0; +const load = async (): Promise<{ workerCount: number | string; workerLoggingEnabled: boolean }> => + (await import(`./defaults.js?bust=${bust++}`)).defaults; + +// Re-import defaults with os.cpus() mocked to report a specific core count. +const loadWithCpus = async (cores: number): Promise<{ workerCount: number | string }> => { + delete process.env[EnvironmentVariables.MAINSAIL_CRYPTO_WORKER_COUNT]; + return (await esmock("./defaults", { os: { cpus: () => Array.from({ length: cores }, () => ({})) } })).defaults; +}; + +describe("Defaults", ({ assert, it }) => { + it("falls back to a CPU-derived worker count and disabled logging", async () => { + const defaults = await load(); + + assert.number(defaults.workerCount); + assert.gte(defaults.workerCount as number, 1); + assert.lte(defaults.workerCount as number, 4); + assert.false(defaults.workerLoggingEnabled); + }); + + it("caps the worker count at 4 on machines with more cores", async () => { + const defaults = await loadWithCpus(16); + + assert.equal(defaults.workerCount, 4); + }); + + it("uses the cpu count when fewer than 4 cores are available", async () => { + const defaults = await loadWithCpus(2); + + assert.equal(defaults.workerCount, 2); + }); + + it("reads the worker count and logging flag from the environment", async () => { + process.env[EnvironmentVariables.MAINSAIL_CRYPTO_WORKER_COUNT] = "7"; + process.env[EnvironmentVariables.MAINSAIL_CRYPTO_WORKER_LOGGING_ENABLED] = "true"; + + const defaults = await load(); + + assert.equal(defaults.workerCount, "7"); + assert.true(defaults.workerLoggingEnabled); + }); +}); diff --git a/packages/crypto-worker/source/service-provider.test.ts b/packages/crypto-worker/source/service-provider.test.ts new file mode 100644 index 000000000..e137cc176 --- /dev/null +++ b/packages/crypto-worker/source/service-provider.test.ts @@ -0,0 +1,200 @@ +import { Identifiers } from "@mainsail/constants"; +import { Application, Ipc } from "@mainsail/kernel"; +import { EventEmitter } from "events"; +import esmock from "esmock"; +import { PassThrough } from "stream"; + +import { describe } from "@mainsail/test-runner"; +import { Worker as WorkerInstance } from "./worker"; + +// Records every `new Worker(...)` so the factory test can assert how the thread is spawned. +const constructions: any[][] = []; + +// Stand-in for worker_threads.Worker: an EventEmitter exposing the stdout/stderr streams and +// threadId that Ipc.Subprocess reads, so the real Subprocess wraps it without a real thread. +class FakeWorker extends EventEmitter { + public threadId = 1; + public readonly stdout = new PassThrough(); + public readonly stderr = new PassThrough(); + + public constructor(...arguments_: any[]) { + super(); + constructions.push(arguments_); + } + + public postMessage(): void {} + public async terminate(): Promise { + return 0; + } +} + +// Load the provider with worker_threads.Worker swapped for the fake; the real Ipc.Subprocess +// and ./worker.js stay in place. +const { ServiceProvider } = await esmock("./service-provider", { + worker_threads: { Worker: FakeWorker }, +}); + +describe<{ + app: Application; + serviceProvider: any; +}>("ServiceProvider", ({ assert, beforeEach, it, spy }) => { + beforeEach((context) => { + constructions.length = 0; + + context.app = new Application(); + context.app.bind(Identifiers.Config.Flags).toConstantValue({ network: "testnet" }); + // Ipc.Subprocess resolves the logger from the container when the factory runs. + context.app.bind(Identifiers.Services.Log.Service).toConstantValue({ debug: () => {}, error: () => {} }); + + context.serviceProvider = context.app.resolve(ServiceProvider); + }); + + it("register binds the worker instance, worker factory, worker pool and subprocess factory", async (context) => { + await context.serviceProvider.register(); + + assert.true(context.app.isBound(Identifiers.CryptoWorker.Worker.Instance)); + assert.true(context.app.isBound(Identifiers.CryptoWorker.Worker.Factory)); + assert.true(context.app.isBound(Identifiers.CryptoWorker.WorkerPool)); + assert.true(context.app.isBound(Identifiers.CryptoWorker.WorkerSubprocess.Factory)); + assert.function(context.app.get(Identifiers.CryptoWorker.WorkerSubprocess.Factory)); + }); + + it("the subprocess factory spawns the worker script with piped stdio and wraps it in an Ipc.Subprocess", async (context) => { + await context.serviceProvider.register(); + + const factory = context.app.get(Identifiers.CryptoWorker.WorkerSubprocess.Factory) as () => Ipc.Subprocess; + const subprocess = factory(); + + assert.length(constructions, 1); + const [scriptPath, options] = constructions[0]; + assert.true(scriptPath.endsWith("worker-script.js")); + assert.equal(options, { stderr: true, stdout: true }); + assert.instance(subprocess, Ipc.Subprocess); + }); + + it("the worker factory resolves a Worker instance", async (context) => { + await context.serviceProvider.register(); + + const factory = context.app.get<() => WorkerInstance>(Identifiers.CryptoWorker.Worker.Factory); + + assert.instance(factory(), WorkerInstance); + }); + + it("boot boots the worker pool", async (context) => { + await context.serviceProvider.register(); + + const pool = { boot: async () => {}, dispose: async () => {} }; + context.app.rebind(Identifiers.CryptoWorker.WorkerPool).toConstantValue(pool); + const boot = spy(pool, "boot"); + + await context.serviceProvider.boot(); + + boot.calledOnce(); + }); + + it("dispose disposes the worker pool", async (context) => { + await context.serviceProvider.register(); + + const pool = { boot: async () => {}, dispose: async () => {} }; + context.app.rebind(Identifiers.CryptoWorker.WorkerPool).toConstantValue(pool); + const dispose = spy(pool, "dispose"); + + await context.serviceProvider.dispose(); + + dispose.calledOnce(); + }); + + it("is required", async (context) => { + assert.true(await context.serviceProvider.required()); + }); +}); + +const importFresh = (moduleName: string) => import(`${moduleName}?${Date.now()}`); + +describe<{ + app: Application; + serviceProvider: any; +}>("ServiceProvider.configSchema", ({ assert, beforeEach, it }) => { + const importDefaults = async () => (await importFresh("../distribution/defaults.js")).defaults; + + beforeEach((context) => { + context.app = new Application(); + context.serviceProvider = context.app.resolve(ServiceProvider); + + for (const key of Object.keys(process.env)) { + if (key.includes("MAINSAIL_CRYPTO_WORKER")) { + delete process.env[key]; + } + } + }); + + it("should validate schema using defaults", async ({ serviceProvider }) => { + const result = serviceProvider.configSchema().validate(await importDefaults()); + + assert.undefined(result.error); + assert.number(result.value.workerCount); + assert.boolean(result.value.workerLoggingEnabled); + }); + + it("should allow configuration extension", async ({ serviceProvider }) => { + const defaults = await importDefaults(); + defaults.customField = "dummy"; + + const result = serviceProvider.configSchema().validate(defaults); + + assert.undefined(result.error); + assert.equal(result.value.customField, "dummy"); + }); + + it("should parse process.env.MAINSAIL_CRYPTO_WORKER_COUNT", async ({ serviceProvider }) => { + process.env.MAINSAIL_CRYPTO_WORKER_COUNT = "1"; + + const result = serviceProvider.configSchema().validate(await importDefaults()); + + assert.undefined(result.error); + assert.equal(result.value.workerCount, 1); + }); + + it("should throw if process.env.MAINSAIL_CRYPTO_WORKER_COUNT is below the minimum", async ({ serviceProvider }) => { + process.env.MAINSAIL_CRYPTO_WORKER_COUNT = "0"; + + const result = serviceProvider.configSchema().validate(await importDefaults()); + + assert.defined(result.error); + }); + + it("should throw if process.env.MAINSAIL_CRYPTO_WORKER_COUNT is not a number", async ({ serviceProvider }) => { + process.env.MAINSAIL_CRYPTO_WORKER_COUNT = "not-a-number"; + + const result = serviceProvider.configSchema().validate(await importDefaults()); + + assert.defined(result.error); + }); + + it("should throw if process.env.MAINSAIL_CRYPTO_WORKER_COUNT is not an integer", async ({ serviceProvider }) => { + process.env.MAINSAIL_CRYPTO_WORKER_COUNT = "1.5"; + + const result = serviceProvider.configSchema().validate(await importDefaults()); + + assert.defined(result.error); + }); + + it("should throw if process.env.MAINSAIL_CRYPTO_WORKER_COUNT exceeds the available cpus", async ({ + serviceProvider, + }) => { + process.env.MAINSAIL_CRYPTO_WORKER_COUNT = "9999"; + + const result = serviceProvider.configSchema().validate(await importDefaults()); + + assert.defined(result.error); + }); + + it("should parse process.env.MAINSAIL_CRYPTO_WORKER_LOGGING_ENABLED", async ({ serviceProvider }) => { + process.env.MAINSAIL_CRYPTO_WORKER_LOGGING_ENABLED = "true"; + + const result = serviceProvider.configSchema().validate(await importDefaults()); + + assert.undefined(result.error); + assert.true(result.value.workerLoggingEnabled); + }); +}); diff --git a/packages/crypto-worker/source/worker-handler.test.ts b/packages/crypto-worker/source/worker-handler.test.ts new file mode 100644 index 000000000..baecad58c --- /dev/null +++ b/packages/crypto-worker/source/worker-handler.test.ts @@ -0,0 +1,122 @@ +import { Identifiers } from "@mainsail/constants"; +import { Application, Services } from "@mainsail/kernel"; + +import { describe } from "@mainsail/test-runner"; +import { WorkerScriptHandler } from "./worker-handler"; + +describe<{ + subject: WorkerScriptHandler; + impl: any; + toCalls: unknown[]; + resolve: any; + bootstrap: any; + boot: any; + terminate: any; + rebind: any; +}>("WorkerScriptHandler", ({ assert, beforeEach, it, spy, stub }) => { + beforeEach((context) => { + context.impl = { + callBlockFactory: async () => {}, + callConsensusSignature: async () => {}, + callPublicKeyFactory: async () => {}, + callTransactionFactory: async () => {}, + callWalletSignature: async () => {}, + }; + context.toCalls = []; + + // WorkerScriptHandler owns a private `new Application()`; stub the prototype so bootstrap, + // boot, the WorkerImplementation resolution and the logger rebind all stay in-process. + context.resolve = stub(Application.prototype, "resolve").returnValue(context.impl); + context.bootstrap = stub(Application.prototype, "bootstrap").resolvedValue(undefined); + context.boot = stub(Application.prototype, "boot").resolvedValue(undefined); + context.terminate = stub(Application.prototype, "terminate").resolvedValue(undefined); + context.rebind = stub(Application.prototype, "rebind").returnValue({ + to: (value: unknown) => context.toCalls.push(value), + }); + + context.subject = new WorkerScriptHandler(); + }); + + it("boot bootstraps with the flags, boots and resolves the worker impl", async ({ + subject, + bootstrap, + boot, + resolve, + rebind, + }) => { + await subject.boot({ workerLoggingEnabled: true } as any); + + bootstrap.calledWith({ flags: { workerLoggingEnabled: true } }); + boot.calledOnce(); + resolve.calledOnce(); + // Logging enabled → the logger is left in place. + rebind.neverCalled(); + }); + + it("boot rebinds the logger to the null logger when worker logging is disabled", async ({ + subject, + rebind, + toCalls, + }) => { + await subject.boot({ workerLoggingEnabled: false } as any); + + rebind.calledWith(Identifiers.Services.Log.Service); + assert.equal(toCalls, [Services.Log.NullLogger]); + }); + + it("dispose terminates the app", async ({ subject, terminate }) => { + await subject.dispose(); + + terminate.calledOnce(); + }); + + it("consensusSignature delegates to the worker impl", async ({ subject, impl }) => { + await subject.boot({ workerLoggingEnabled: true } as any); + const call = spy(impl, "callConsensusSignature"); + const message = Buffer.from("message-to-sign"); + const privateKey = Buffer.from("consensus-private-key"); + + await subject.consensusSignature("sign", [message, privateKey]); + + call.calledWith("sign", [message, privateKey]); + }); + + it("walletSignature delegates to the worker impl", async ({ subject, impl }) => { + await subject.boot({ workerLoggingEnabled: true } as any); + const call = spy(impl, "callWalletSignature"); + const message = Buffer.from("message-to-sign"); + const privateKey = Buffer.from("wallet-private-key"); + + await subject.walletSignature("signRecoverable", [message, privateKey]); + + call.calledWith("signRecoverable", [message, privateKey]); + }); + + it("blockFactory delegates to the worker impl", async ({ subject, impl }) => { + await subject.boot({ workerLoggingEnabled: true } as any); + const call = spy(impl, "callBlockFactory"); + + await subject.blockFactory("fromHex", ["0a1b2c3d"]); + + call.calledWith("fromHex", ["0a1b2c3d"]); + }); + + it("transactionFactory delegates to the worker impl", async ({ subject, impl }) => { + await subject.boot({ workerLoggingEnabled: true } as any); + const call = spy(impl, "callTransactionFactory"); + const bytes = Buffer.from("deadbeef", "hex"); + + await subject.transactionFactory("fromBytes", [bytes]); + + call.calledWith("fromBytes", [bytes]); + }); + + it("publicKeyFactory delegates to the worker impl", async ({ subject, impl }) => { + await subject.boot({ workerLoggingEnabled: true } as any); + const call = spy(impl, "callPublicKeyFactory"); + + await subject.publicKeyFactory("fromMnemonic", ["clay harbor essay analyst"]); + + call.calledWith("fromMnemonic", ["clay harbor essay analyst"]); + }); +}); diff --git a/packages/crypto-worker/source/worker-handler.ts b/packages/crypto-worker/source/worker-handler.ts index 3ff65737a..4ce416d8f 100644 --- a/packages/crypto-worker/source/worker-handler.ts +++ b/packages/crypto-worker/source/worker-handler.ts @@ -1,114 +1,13 @@ import type { Contracts } from "@mainsail/contracts"; import { Identifiers } from "@mainsail/constants"; -import { inject, injectable, tagged } from "@mainsail/container"; import { Application, Services } from "@mainsail/kernel"; -@injectable() -class WorkerImpl { - @inject(Identifiers.Cryptography.Block.Factory) - private readonly blockFactory!: Contracts.Crypto.BlockFactory; - - @inject(Identifiers.Cryptography.Transaction.Factory) - private readonly transactionFactory!: Contracts.Crypto.TransactionFactory; - - @inject(Identifiers.Cryptography.Signature.Instance) - @tagged("type", "consensus") - private readonly consensusSignature!: Contracts.Crypto.SignatureBls; - - @inject(Identifiers.Cryptography.Identity.PublicKey.Factory) - @tagged("type", "consensus") - private readonly publicKeyFactory!: Contracts.Crypto.PublicKeyFactory; - - @inject(Identifiers.Cryptography.Signature.Instance) - @tagged("type", "wallet") - private readonly walletSignature!: Contracts.Crypto.SignatureEcdsa; - - public async callConsensusSignature>( - method: K, - arguments_: Contracts.Kernel.IPC.MethodArguments, - ): Promise> { - return this.#call(this.consensusSignature, method, arguments_); - } - - public async callWalletSignature>( - method: K, - arguments_: Contracts.Kernel.IPC.MethodArguments, - ): Promise> { - return this.#call(this.walletSignature, method, arguments_); - } - - public async callTransactionFactory>( - method: K, - arguments_: Contracts.Kernel.IPC.MethodArguments, - ): Promise> { - return this.#call(this.transactionFactory, method, arguments_); - } - - public async callBlockFactory>( - method: K, - arguments_: Contracts.Kernel.IPC.MethodArguments, - ): Promise> { - return this.#call(this.blockFactory, method, arguments_); - } - - public async callPublicKeyFactory>( - method: K, - arguments_: Contracts.Kernel.IPC.MethodArguments, - ): Promise> { - return this.#call(this.publicKeyFactory, method, arguments_); - } - - async #call & keyof T>( - object: T, - method: K, - arguments_: Contracts.Kernel.IPC.MethodArguments, - ): Promise> { - const normalizedArguments = (arguments_ as unknown[]).map((argument) => { - if (isBufferJson(argument)) { - return Buffer.from(argument.data); - } - - if (Array.isArray(argument) && argument.length > 0 && isBufferJson(argument[0])) { - return argument.map((item) => Buffer.from(item.data)); - } - - return argument; - }) as Contracts.Kernel.IPC.MethodArguments; - - if (typeof object[method] !== "function") { - throw new TypeError(`property "${method}" is not a function`); - } - - return ( - object[method] as ( - ...arguments__: Contracts.Kernel.IPC.MethodArguments - ) => Contracts.Kernel.IPC.MethodReturn - )(...normalizedArguments); - } -} - -type BufferJson = { - type: "Buffer"; - data: Uint8Array | number[]; -}; - -function isBufferJson(value: unknown): value is BufferJson { - if (typeof value !== "object" || value === null) { - return false; - } - - const v = value as { type?: unknown; data?: unknown }; - if (v.type !== "Buffer") { - return false; - } - - return Array.isArray(v.data) || v.data instanceof Uint8Array; -} +import { WorkerImplementation } from "./worker-implementation.js"; export class WorkerScriptHandler implements Contracts.Crypto.WorkerScriptHandler { #app = new Application(); - #impl!: WorkerImpl; + #impl!: WorkerImplementation; public async boot(flags: Contracts.Crypto.WorkerFlags): Promise { await this.#app.bootstrap({ @@ -120,7 +19,7 @@ export class WorkerScriptHandler implements Contracts.Crypto.WorkerScriptHandler } await this.#app.boot(); - this.#impl = this.#app.resolve(WorkerImpl); + this.#impl = this.#app.resolve(WorkerImplementation); } public async dispose(): Promise { diff --git a/packages/crypto-worker/source/worker-implementation.test.ts b/packages/crypto-worker/source/worker-implementation.test.ts new file mode 100644 index 000000000..bf33c2d37 --- /dev/null +++ b/packages/crypto-worker/source/worker-implementation.test.ts @@ -0,0 +1,120 @@ +import { Identifiers } from "@mainsail/constants"; +import { Application } from "@mainsail/kernel"; + +import { describe } from "@mainsail/test-runner"; +import { WorkerImplementation } from "./worker-implementation"; + +describe<{ + app: Application; + impl: WorkerImplementation; + consensusSignature: any; + walletSignature: any; + blockFactory: any; + transactionFactory: any; + publicKeyFactory: any; +}>("WorkerImplementation", ({ assert, beforeEach, it, spy }) => { + beforeEach((context) => { + context.consensusSignature = { aggregate: async () => "aggregated", sign: async () => "signature" }; + context.walletSignature = { signRecoverable: async () => ({ r: "r", s: "s", v: 0 }) }; + context.blockFactory = { fromHex: async () => ({ block: true }) }; + context.transactionFactory = { fromHex: async () => ({ transaction: true }) }; + context.publicKeyFactory = { fromMnemonic: async () => "public-key" }; + + context.app = new Application(); + context.app.bind(Identifiers.Cryptography.Block.Factory).toConstantValue(context.blockFactory); + context.app.bind(Identifiers.Cryptography.Transaction.Factory).toConstantValue(context.transactionFactory); + context.app + .bind(Identifiers.Cryptography.Signature.Instance) + .toConstantValue(context.consensusSignature) + .whenTagged("type", "consensus"); + context.app + .bind(Identifiers.Cryptography.Signature.Instance) + .toConstantValue(context.walletSignature) + .whenTagged("type", "wallet"); + context.app + .bind(Identifiers.Cryptography.Identity.PublicKey.Factory) + .toConstantValue(context.publicKeyFactory) + .whenTagged("type", "consensus"); + + context.impl = context.app.resolve(WorkerImplementation); + }); + + it("calls the method on the consensus signature and returns its result", async ({ impl, consensusSignature }) => { + const sign = spy(consensusSignature, "sign"); + const message = Buffer.from("message"); + const privateKey = Buffer.from("private-key"); + + const result = await impl.callConsensusSignature("sign", [message, privateKey]); + + sign.calledWith(message, privateKey); + assert.equal(result, "signature"); + }); + + it("revives a serialized Buffer argument before forwarding it", async ({ impl, consensusSignature }) => { + const sign = spy(consensusSignature, "sign"); + + // The IPC channel serializes Buffers to { type: "Buffer", data: [...] }. + await impl.callConsensusSignature("sign", [ + { data: [1, 2, 3], type: "Buffer" }, + { data: [4, 5, 6], type: "Buffer" }, + ] as any); + + sign.calledWith(Buffer.from([1, 2, 3]), Buffer.from([4, 5, 6])); + }); + + it("revives an array of serialized Buffers before forwarding it", async ({ impl, consensusSignature }) => { + const aggregate = spy(consensusSignature, "aggregate"); + + await impl.callConsensusSignature("aggregate", [ + [ + { data: [1], type: "Buffer" }, + { data: [2], type: "Buffer" }, + ], + ] as any); + + aggregate.calledWith([Buffer.from([1]), Buffer.from([2])]); + }); + + it("leaves non-Buffer arguments untouched", async ({ impl, publicKeyFactory }) => { + const fromMnemonic = spy(publicKeyFactory, "fromMnemonic"); + + await impl.callPublicKeyFactory("fromMnemonic", ["clay harbor essay analyst"]); + + fromMnemonic.calledWith("clay harbor essay analyst"); + }); + + it("throws when the requested method does not exist on the target", async ({ impl }) => { + await assert.rejects( + () => impl.callBlockFactory("missing" as any, [] as any), + 'property "missing" is not a function', + ); + }); + + it("routes wallet signature calls to the wallet-tagged signature", async ({ impl, walletSignature }) => { + const signRecoverable = spy(walletSignature, "signRecoverable"); + const message = Buffer.from("message"); + const privateKey = Buffer.from("private-key"); + + await impl.callWalletSignature("signRecoverable", [message, privateKey]); + + signRecoverable.calledWith(message, privateKey); + }); + + it("routes block factory calls to the block factory", async ({ impl, blockFactory }) => { + const fromHex = spy(blockFactory, "fromHex"); + + const result = await impl.callBlockFactory("fromHex", ["0a1b2c3d"]); + + fromHex.calledWith("0a1b2c3d"); + assert.equal(result, { block: true }); + }); + + it("routes transaction factory calls to the transaction factory", async ({ impl, transactionFactory }) => { + const fromHex = spy(transactionFactory, "fromHex"); + + const result = await impl.callTransactionFactory("fromHex", ["ff00ff"]); + + fromHex.calledWith("ff00ff"); + assert.equal(result, { transaction: true }); + }); +}); diff --git a/packages/crypto-worker/source/worker-implementation.ts b/packages/crypto-worker/source/worker-implementation.ts new file mode 100644 index 000000000..e7994d2c8 --- /dev/null +++ b/packages/crypto-worker/source/worker-implementation.ts @@ -0,0 +1,106 @@ +import type { Contracts } from "@mainsail/contracts"; + +import { Identifiers } from "@mainsail/constants"; +import { inject, injectable, tagged } from "@mainsail/container"; + +type BufferJson = { + type: "Buffer"; + data: Uint8Array | number[]; +}; + +function isBufferJson(value: unknown): value is BufferJson { + if (typeof value !== "object" || value === null) { + return false; + } + + const v = value as { type?: unknown; data?: unknown }; + if (v.type !== "Buffer") { + return false; + } + + return Array.isArray(v.data) || v.data instanceof Uint8Array; +} + +@injectable() +export class WorkerImplementation { + @inject(Identifiers.Cryptography.Block.Factory) + private readonly blockFactory!: Contracts.Crypto.BlockFactory; + + @inject(Identifiers.Cryptography.Transaction.Factory) + private readonly transactionFactory!: Contracts.Crypto.TransactionFactory; + + @inject(Identifiers.Cryptography.Signature.Instance) + @tagged("type", "consensus") + private readonly consensusSignature!: Contracts.Crypto.SignatureBls; + + @inject(Identifiers.Cryptography.Identity.PublicKey.Factory) + @tagged("type", "consensus") + private readonly publicKeyFactory!: Contracts.Crypto.PublicKeyFactory; + + @inject(Identifiers.Cryptography.Signature.Instance) + @tagged("type", "wallet") + private readonly walletSignature!: Contracts.Crypto.SignatureEcdsa; + + public async callConsensusSignature>( + method: K, + arguments_: Contracts.Kernel.IPC.MethodArguments, + ): Promise> { + return this.#call(this.consensusSignature, method, arguments_); + } + + public async callWalletSignature>( + method: K, + arguments_: Contracts.Kernel.IPC.MethodArguments, + ): Promise> { + return this.#call(this.walletSignature, method, arguments_); + } + + public async callTransactionFactory>( + method: K, + arguments_: Contracts.Kernel.IPC.MethodArguments, + ): Promise> { + return this.#call(this.transactionFactory, method, arguments_); + } + + public async callBlockFactory>( + method: K, + arguments_: Contracts.Kernel.IPC.MethodArguments, + ): Promise> { + return this.#call(this.blockFactory, method, arguments_); + } + + public async callPublicKeyFactory>( + method: K, + arguments_: Contracts.Kernel.IPC.MethodArguments, + ): Promise> { + return this.#call(this.publicKeyFactory, method, arguments_); + } + + async #call & keyof T>( + object: T, + method: K, + arguments_: Contracts.Kernel.IPC.MethodArguments, + ): Promise> { + const normalizedArguments = (arguments_ as unknown[]).map((argument) => { + if (isBufferJson(argument)) { + return Buffer.from(argument.data); + } + + if (Array.isArray(argument) && argument.length > 0 && isBufferJson(argument[0])) { + return argument.map((item) => Buffer.from(item.data)); + } + + return argument; + }) as Contracts.Kernel.IPC.MethodArguments; + + if (typeof object[method] !== "function") { + throw new TypeError(`property "${method}" is not a function`); + } + + return ( + object[method] as ( + ...arguments__: Contracts.Kernel.IPC.MethodArguments + ) => Contracts.Kernel.IPC.MethodReturn + )(...normalizedArguments); + } +} diff --git a/packages/crypto-worker/source/worker-pool.test.ts b/packages/crypto-worker/source/worker-pool.test.ts new file mode 100644 index 000000000..a4a1348fd --- /dev/null +++ b/packages/crypto-worker/source/worker-pool.test.ts @@ -0,0 +1,128 @@ +import { Identifiers } from "@mainsail/constants"; +import { Application } from "@mainsail/kernel"; + +import { describe } from "@mainsail/test-runner"; +import { WorkerPool } from "./worker-pool"; + +const makeWorker = () => ({ + boot: async () => {}, + dispose: async () => {}, + getQueueSize: () => 0, + isStopped: () => false, +}); + +describe<{ + app: Application; + workerPool: WorkerPool; + pool: any[]; + flags: any; + logger: any; + options: { workerCount: number; workerLoggingEnabled: boolean }; +}>("WorkerPool", ({ assert, beforeEach, it, spy, stub }) => { + beforeEach((context) => { + context.flags = { network: "testnet" }; + context.logger = { info: () => {} }; + context.pool = [makeWorker(), makeWorker(), makeWorker()]; + context.options = { workerCount: context.pool.length, workerLoggingEnabled: false }; + + let index = 0; + const createWorker = () => context.pool[index++]; + + context.app = new Application(); + context.app.bind(Identifiers.Config.Flags).toConstantValue(context.flags); + context.app.bind(Identifiers.Services.Log.Service).toConstantValue(context.logger); + context.app.bind(Identifiers.CryptoWorker.Worker.Factory).toConstantValue(() => createWorker()); + context.app + .bind(Identifiers.ServiceProvider.Configuration) + .toConstantValue({ getRequired: (key: string) => (context.options as any)[key] }) + .whenTagged("plugin", "crypto-worker"); + + context.workerPool = context.app.resolve(WorkerPool); + }); + + it("boot creates the configured number of workers and boots each with the merged flags", async ({ + workerPool, + pool, + flags, + }) => { + const boots = pool.map((worker) => spy(worker, "boot")); + + await workerPool.boot(); + + for (const boot of boots) { + boot.calledOnce(); + boot.calledWith({ ...flags, thread: "crypto-worker", workerLoggingEnabled: false }); + } + }); + + it("boot logs how many workers it starts", async ({ workerPool, logger, pool }) => { + const info = spy(logger, "info"); + + await workerPool.boot(); + + info.calledWith(`Booting up ${pool.length} crypto workers`); + }); + + it("boot forwards the workerLoggingEnabled flag from configuration", async ({ workerPool, pool, options }) => { + options.workerLoggingEnabled = true; + const boot = spy(pool[0], "boot"); + + await workerPool.boot(); + + boot.calledWith({ network: "testnet", thread: "crypto-worker", workerLoggingEnabled: true }); + }); + + it("dispose disposes every worker and empties the pool", async ({ workerPool, pool }) => { + const disposes = pool.map((worker) => spy(worker, "dispose")); + + await workerPool.boot(); + await workerPool.dispose(); + + for (const dispose of disposes) { + dispose.calledOnce(); + } + assert.throws(() => workerPool.getWorker(), "No crypto workers available"); + }); + + it("getWorker throws when no workers have been booted", ({ workerPool }) => { + assert.throws(() => workerPool.getWorker(), "No crypto workers available"); + }); + + it("getWorker throws when every worker is stopped", async ({ workerPool, pool }) => { + for (const worker of pool) { + stub(worker, "isStopped").returnValue(true); + } + + await workerPool.boot(); + + assert.throws(() => workerPool.getWorker(), "No crypto workers available"); + }); + + it("getWorker selects the worker with the smallest queue", async ({ workerPool, pool }) => { + stub(pool[0], "getQueueSize").returnValue(5); + stub(pool[1], "getQueueSize").returnValue(1); + stub(pool[2], "getQueueSize").returnValue(3); + + await workerPool.boot(); + + assert.equal(workerPool.getWorker(), pool[1]); + assert.equal(workerPool.getWorker(), pool[1]); + }); + + it("getWorker round-robins when the queues are tied", async ({ workerPool, pool }) => { + await workerPool.boot(); + + assert.equal(workerPool.getWorker(), pool[0]); + assert.equal(workerPool.getWorker(), pool[1]); + assert.equal(workerPool.getWorker(), pool[2]); + assert.equal(workerPool.getWorker(), pool[0]); + }); + + it("getWorker ignores stopped workers", async ({ workerPool, pool }) => { + stub(pool[0], "isStopped").returnValue(true); + + await workerPool.boot(); + + assert.not.equal(workerPool.getWorker(), pool[0]); + }); +}); diff --git a/packages/crypto-worker/source/worker-script.test.ts b/packages/crypto-worker/source/worker-script.test.ts new file mode 100644 index 000000000..f7cf7777b --- /dev/null +++ b/packages/crypto-worker/source/worker-script.test.ts @@ -0,0 +1,10 @@ +import { describe } from "@mainsail/test-runner"; + +// worker-script.ts is the worker thread entrypoint: importing it wires an Ipc.Handler to a +// WorkerScriptHandler. On the main thread parentPort is null, so the handler registers no +// listener — the import should simply complete without throwing. +describe("WorkerScript", ({ assert, it }) => { + it("loads without throwing", async () => { + await assert.resolves(() => import("./worker-script.js")); + }); +}); diff --git a/packages/crypto-worker/source/worker.test.ts b/packages/crypto-worker/source/worker.test.ts new file mode 100644 index 000000000..74a26e58c --- /dev/null +++ b/packages/crypto-worker/source/worker.test.ts @@ -0,0 +1,130 @@ +import { Identifiers } from "@mainsail/constants"; +import { Application } from "@mainsail/kernel"; + +import { describe } from "@mainsail/test-runner"; +import { Worker } from "./worker"; + +describe<{ + app: Application; + worker: Worker; + ipc: any; +}>("Worker", ({ assert, beforeEach, it, spy, stub }) => { + beforeEach((context) => { + context.ipc = { + dispose: async () => 0, + drain: async () => {}, + getQueueSize: () => 3, + isStopped: () => false, + kill: async () => 7, + sendRequest: async () => {}, + }; + + context.app = new Application(); + // The injected factory hands back our fake subprocess instead of spawning a thread. + context.app.bind(Identifiers.CryptoWorker.WorkerSubprocess.Factory).toConstantValue(() => context.ipc); + + context.worker = context.app.resolve(Worker); + }); + + it("boot sends a single boot request and memoizes it", async ({ worker, ipc }) => { + const sendRequest = spy(ipc, "sendRequest"); + const flags = { thread: "crypto-worker" } as any; + + await worker.boot(flags); + await worker.boot(flags); + + sendRequest.calledOnce(); + sendRequest.calledWith("boot", flags); + }); + + it("dispose drains, requests an inner dispose, then terminates the subprocess", async ({ worker, ipc }) => { + const drain = spy(ipc, "drain"); + const sendRequest = spy(ipc, "sendRequest"); + const dispose = spy(ipc, "dispose"); + + await worker.dispose(); + + drain.calledOnce(); + sendRequest.calledWith("dispose"); + dispose.calledOnce(); + }); + + it("dispose still terminates the subprocess when the inner dispose request fails", async ({ worker, ipc }) => { + ipc.sendRequest = async () => { + throw new Error("worker already gone"); + }; + const dispose = spy(ipc, "dispose"); + + await assert.resolves(() => worker.dispose()); + + dispose.calledOnce(); + }); + + it("dispose is memoized across calls", async ({ worker, ipc }) => { + const drain = spy(ipc, "drain"); + + await worker.dispose(); + await worker.dispose(); + + drain.calledOnce(); + }); + + it("kill terminates the subprocess and returns its exit code", async ({ worker, ipc }) => { + const kill = spy(ipc, "kill"); + + assert.equal(await worker.kill(), 7); + kill.calledOnce(); + }); + + it("getQueueSize reports the subprocess queue size", ({ worker }) => { + assert.equal(worker.getQueueSize(), 3); + }); + + it("isStopped reflects the subprocess state", ({ worker, ipc }) => { + assert.false(worker.isStopped()); + + stub(ipc, "isStopped").returnValue(true); + + assert.true(worker.isStopped()); + }); + + it("consensusSignature forwards the method and arguments to the subprocess", async ({ worker, ipc }) => { + const sendRequest = spy(ipc, "sendRequest"); + + await worker.consensusSignature("sign" as any, "a" as any, "b" as any); + + sendRequest.calledWith("consensusSignature", "sign", ["a", "b"]); + }); + + it("walletSignature forwards the method and arguments to the subprocess", async ({ worker, ipc }) => { + const sendRequest = spy(ipc, "sendRequest"); + + await worker.walletSignature("verify" as any, "a" as any); + + sendRequest.calledWith("walletSignature", "verify", ["a"]); + }); + + it("blockFactory forwards the method and arguments to the subprocess", async ({ worker, ipc }) => { + const sendRequest = spy(ipc, "sendRequest"); + + await worker.blockFactory("make" as any, "data" as any); + + sendRequest.calledWith("blockFactory", "make", ["data"]); + }); + + it("transactionFactory forwards the method and arguments to the subprocess", async ({ worker, ipc }) => { + const sendRequest = spy(ipc, "sendRequest"); + + await worker.transactionFactory("fromBytes" as any, "bytes" as any); + + sendRequest.calledWith("transactionFactory", "fromBytes", ["bytes"]); + }); + + it("publicKeyFactory forwards the method and arguments to the subprocess", async ({ worker, ipc }) => { + const sendRequest = spy(ipc, "sendRequest"); + + await worker.publicKeyFactory("fromMnemonic" as any, "mnemonic" as any); + + sendRequest.calledWith("publicKeyFactory", "fromMnemonic", ["mnemonic"]); + }); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 07fc34c97..9a0887bb5 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1944,6 +1944,9 @@ importers: '@types/tmp': specifier: 0.2.6 version: 0.2.6 + esmock: + specifier: 2.7.5 + version: 2.7.5 uvu: specifier: 0.5.6 version: 0.5.6