Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/crypto-worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"@types/chance": "1.1.8",
"@types/fs-extra": "11.0.4",
"@types/tmp": "0.2.6",
"esmock": "2.7.5",
"uvu": "0.5.6"
},
"engines": {
Expand Down
47 changes: 47 additions & 0 deletions packages/crypto-worker/source/defaults.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { EnvironmentVariables } from "@mainsail/constants";
import esmock from "esmock";

import { describe } from "@mainsail/test-runner";

let bust = 0;
const load = async (): Promise<{ workerCount: number | string; workerLoggingEnabled: boolean }> =>
(await import(`./defaults.js?bust=${bust++}`)).defaults;

// Re-import defaults with os.cpus() mocked to report a specific core count.
const loadWithCpus = async (cores: number): Promise<{ workerCount: number | string }> => {
delete process.env[EnvironmentVariables.MAINSAIL_CRYPTO_WORKER_COUNT];
return (await esmock("./defaults", { os: { cpus: () => Array.from({ length: cores }, () => ({})) } })).defaults;
};

describe("Defaults", ({ assert, it }) => {
it("falls back to a CPU-derived worker count and disabled logging", async () => {
const defaults = await load();

assert.number(defaults.workerCount);
assert.gte(defaults.workerCount as number, 1);
assert.lte(defaults.workerCount as number, 4);
assert.false(defaults.workerLoggingEnabled);
});

it("caps the worker count at 4 on machines with more cores", async () => {
const defaults = await loadWithCpus(16);

assert.equal(defaults.workerCount, 4);
});

it("uses the cpu count when fewer than 4 cores are available", async () => {
const defaults = await loadWithCpus(2);

assert.equal(defaults.workerCount, 2);
});

it("reads the worker count and logging flag from the environment", async () => {
process.env[EnvironmentVariables.MAINSAIL_CRYPTO_WORKER_COUNT] = "7";
process.env[EnvironmentVariables.MAINSAIL_CRYPTO_WORKER_LOGGING_ENABLED] = "true";

const defaults = await load();

assert.equal(defaults.workerCount, "7");
assert.true(defaults.workerLoggingEnabled);
});
});
200 changes: 200 additions & 0 deletions packages/crypto-worker/source/service-provider.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import { Identifiers } from "@mainsail/constants";
import { Application, Ipc } from "@mainsail/kernel";
import { EventEmitter } from "events";
import esmock from "esmock";
import { PassThrough } from "stream";

import { describe } from "@mainsail/test-runner";
import { Worker as WorkerInstance } from "./worker";

// Records every `new Worker(...)` so the factory test can assert how the thread is spawned.
const constructions: any[][] = [];

// Stand-in for worker_threads.Worker: an EventEmitter exposing the stdout/stderr streams and
// threadId that Ipc.Subprocess reads, so the real Subprocess wraps it without a real thread.
class FakeWorker extends EventEmitter {
public threadId = 1;
public readonly stdout = new PassThrough();
public readonly stderr = new PassThrough();

public constructor(...arguments_: any[]) {
super();
constructions.push(arguments_);
}

public postMessage(): void {}
public async terminate(): Promise<number> {
return 0;
}
}

// Load the provider with worker_threads.Worker swapped for the fake; the real Ipc.Subprocess
// and ./worker.js stay in place.
const { ServiceProvider } = await esmock("./service-provider", {
worker_threads: { Worker: FakeWorker },
});

describe<{
app: Application;
serviceProvider: any;
}>("ServiceProvider", ({ assert, beforeEach, it, spy }) => {
beforeEach((context) => {
constructions.length = 0;

context.app = new Application();
context.app.bind(Identifiers.Config.Flags).toConstantValue({ network: "testnet" });
// Ipc.Subprocess resolves the logger from the container when the factory runs.
context.app.bind(Identifiers.Services.Log.Service).toConstantValue({ debug: () => {}, error: () => {} });

context.serviceProvider = context.app.resolve(ServiceProvider);
});

it("register binds the worker instance, worker factory, worker pool and subprocess factory", async (context) => {
await context.serviceProvider.register();

assert.true(context.app.isBound(Identifiers.CryptoWorker.Worker.Instance));
assert.true(context.app.isBound(Identifiers.CryptoWorker.Worker.Factory));
assert.true(context.app.isBound(Identifiers.CryptoWorker.WorkerPool));
assert.true(context.app.isBound(Identifiers.CryptoWorker.WorkerSubprocess.Factory));
assert.function(context.app.get(Identifiers.CryptoWorker.WorkerSubprocess.Factory));
});

it("the subprocess factory spawns the worker script with piped stdio and wraps it in an Ipc.Subprocess", async (context) => {
await context.serviceProvider.register();

const factory = context.app.get(Identifiers.CryptoWorker.WorkerSubprocess.Factory) as () => Ipc.Subprocess;
const subprocess = factory();

assert.length(constructions, 1);
const [scriptPath, options] = constructions[0];
assert.true(scriptPath.endsWith("worker-script.js"));
assert.equal(options, { stderr: true, stdout: true });
assert.instance(subprocess, Ipc.Subprocess);
});

it("the worker factory resolves a Worker instance", async (context) => {
await context.serviceProvider.register();

const factory = context.app.get<() => WorkerInstance>(Identifiers.CryptoWorker.Worker.Factory);

assert.instance(factory(), WorkerInstance);
});

it("boot boots the worker pool", async (context) => {
await context.serviceProvider.register();

const pool = { boot: async () => {}, dispose: async () => {} };
context.app.rebind(Identifiers.CryptoWorker.WorkerPool).toConstantValue(pool);
const boot = spy(pool, "boot");

await context.serviceProvider.boot();

boot.calledOnce();
});

it("dispose disposes the worker pool", async (context) => {
await context.serviceProvider.register();

const pool = { boot: async () => {}, dispose: async () => {} };
context.app.rebind(Identifiers.CryptoWorker.WorkerPool).toConstantValue(pool);
const dispose = spy(pool, "dispose");

await context.serviceProvider.dispose();

dispose.calledOnce();
});

it("is required", async (context) => {
assert.true(await context.serviceProvider.required());
});
});

const importFresh = (moduleName: string) => import(`${moduleName}?${Date.now()}`);

describe<{
app: Application;
serviceProvider: any;
}>("ServiceProvider.configSchema", ({ assert, beforeEach, it }) => {
const importDefaults = async () => (await importFresh("../distribution/defaults.js")).defaults;

beforeEach((context) => {
context.app = new Application();
context.serviceProvider = context.app.resolve(ServiceProvider);

for (const key of Object.keys(process.env)) {
if (key.includes("MAINSAIL_CRYPTO_WORKER")) {
delete process.env[key];
}
}
});

it("should validate schema using defaults", async ({ serviceProvider }) => {
const result = serviceProvider.configSchema().validate(await importDefaults());

assert.undefined(result.error);
assert.number(result.value.workerCount);
assert.boolean(result.value.workerLoggingEnabled);
});

it("should allow configuration extension", async ({ serviceProvider }) => {
const defaults = await importDefaults();
defaults.customField = "dummy";

const result = serviceProvider.configSchema().validate(defaults);

assert.undefined(result.error);
assert.equal(result.value.customField, "dummy");
});

it("should parse process.env.MAINSAIL_CRYPTO_WORKER_COUNT", async ({ serviceProvider }) => {
process.env.MAINSAIL_CRYPTO_WORKER_COUNT = "1";

const result = serviceProvider.configSchema().validate(await importDefaults());

assert.undefined(result.error);
assert.equal(result.value.workerCount, 1);
});

it("should throw if process.env.MAINSAIL_CRYPTO_WORKER_COUNT is below the minimum", async ({ serviceProvider }) => {
process.env.MAINSAIL_CRYPTO_WORKER_COUNT = "0";

const result = serviceProvider.configSchema().validate(await importDefaults());

assert.defined(result.error);
});

it("should throw if process.env.MAINSAIL_CRYPTO_WORKER_COUNT is not a number", async ({ serviceProvider }) => {
process.env.MAINSAIL_CRYPTO_WORKER_COUNT = "not-a-number";

const result = serviceProvider.configSchema().validate(await importDefaults());

assert.defined(result.error);
});

it("should throw if process.env.MAINSAIL_CRYPTO_WORKER_COUNT is not an integer", async ({ serviceProvider }) => {
process.env.MAINSAIL_CRYPTO_WORKER_COUNT = "1.5";

const result = serviceProvider.configSchema().validate(await importDefaults());

assert.defined(result.error);
});

it("should throw if process.env.MAINSAIL_CRYPTO_WORKER_COUNT exceeds the available cpus", async ({
serviceProvider,
}) => {
process.env.MAINSAIL_CRYPTO_WORKER_COUNT = "9999";

const result = serviceProvider.configSchema().validate(await importDefaults());

assert.defined(result.error);
});

it("should parse process.env.MAINSAIL_CRYPTO_WORKER_LOGGING_ENABLED", async ({ serviceProvider }) => {
process.env.MAINSAIL_CRYPTO_WORKER_LOGGING_ENABLED = "true";

const result = serviceProvider.configSchema().validate(await importDefaults());

assert.undefined(result.error);
assert.true(result.value.workerLoggingEnabled);
});
});
122 changes: 122 additions & 0 deletions packages/crypto-worker/source/worker-handler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { Identifiers } from "@mainsail/constants";
import { Application, Services } from "@mainsail/kernel";

import { describe } from "@mainsail/test-runner";
import { WorkerScriptHandler } from "./worker-handler";

describe<{
subject: WorkerScriptHandler;
impl: any;
toCalls: unknown[];
resolve: any;
bootstrap: any;
boot: any;
terminate: any;
rebind: any;
}>("WorkerScriptHandler", ({ assert, beforeEach, it, spy, stub }) => {
beforeEach((context) => {
context.impl = {
callBlockFactory: async () => {},
callConsensusSignature: async () => {},
callPublicKeyFactory: async () => {},
callTransactionFactory: async () => {},
callWalletSignature: async () => {},
};
context.toCalls = [];

// WorkerScriptHandler owns a private `new Application()`; stub the prototype so bootstrap,
// boot, the WorkerImplementation resolution and the logger rebind all stay in-process.
context.resolve = stub(Application.prototype, "resolve").returnValue(context.impl);
context.bootstrap = stub(Application.prototype, "bootstrap").resolvedValue(undefined);
context.boot = stub(Application.prototype, "boot").resolvedValue(undefined);
context.terminate = stub(Application.prototype, "terminate").resolvedValue(undefined);
context.rebind = stub(Application.prototype, "rebind").returnValue({
to: (value: unknown) => context.toCalls.push(value),
});

context.subject = new WorkerScriptHandler();
});

it("boot bootstraps with the flags, boots and resolves the worker impl", async ({
subject,
bootstrap,
boot,
resolve,
rebind,
}) => {
await subject.boot({ workerLoggingEnabled: true } as any);

bootstrap.calledWith({ flags: { workerLoggingEnabled: true } });
boot.calledOnce();
resolve.calledOnce();
// Logging enabled → the logger is left in place.
rebind.neverCalled();
});

it("boot rebinds the logger to the null logger when worker logging is disabled", async ({
subject,
rebind,
toCalls,
}) => {
await subject.boot({ workerLoggingEnabled: false } as any);

rebind.calledWith(Identifiers.Services.Log.Service);
assert.equal(toCalls, [Services.Log.NullLogger]);
});

it("dispose terminates the app", async ({ subject, terminate }) => {
await subject.dispose();

terminate.calledOnce();
});

it("consensusSignature delegates to the worker impl", async ({ subject, impl }) => {
await subject.boot({ workerLoggingEnabled: true } as any);
const call = spy(impl, "callConsensusSignature");
const message = Buffer.from("message-to-sign");
const privateKey = Buffer.from("consensus-private-key");

await subject.consensusSignature("sign", [message, privateKey]);

call.calledWith("sign", [message, privateKey]);
});

it("walletSignature delegates to the worker impl", async ({ subject, impl }) => {
await subject.boot({ workerLoggingEnabled: true } as any);
const call = spy(impl, "callWalletSignature");
const message = Buffer.from("message-to-sign");
const privateKey = Buffer.from("wallet-private-key");

await subject.walletSignature("signRecoverable", [message, privateKey]);

call.calledWith("signRecoverable", [message, privateKey]);
});

it("blockFactory delegates to the worker impl", async ({ subject, impl }) => {
await subject.boot({ workerLoggingEnabled: true } as any);
const call = spy(impl, "callBlockFactory");

await subject.blockFactory("fromHex", ["0a1b2c3d"]);

call.calledWith("fromHex", ["0a1b2c3d"]);
});

it("transactionFactory delegates to the worker impl", async ({ subject, impl }) => {
await subject.boot({ workerLoggingEnabled: true } as any);
const call = spy(impl, "callTransactionFactory");
const bytes = Buffer.from("deadbeef", "hex");

await subject.transactionFactory("fromBytes", [bytes]);

call.calledWith("fromBytes", [bytes]);
});

it("publicKeyFactory delegates to the worker impl", async ({ subject, impl }) => {
await subject.boot({ workerLoggingEnabled: true } as any);
const call = spy(impl, "callPublicKeyFactory");

await subject.publicKeyFactory("fromMnemonic", ["clay harbor essay analyst"]);

call.calledWith("fromMnemonic", ["clay harbor essay analyst"]);
});
});
Loading
Loading