Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
73568a1
Fix boot
sebastijankuzner May 27, 2026
8d394d7
Use post construct
sebastijankuzner May 27, 2026
a7537ef
Limit default workers to 4
sebastijankuzner May 27, 2026
3e29db4
Move method is not defined to try catch
sebastijankuzner May 27, 2026
3a3b628
Handle errors
sebastijankuzner May 27, 2026
b0497f5
messageerror
sebastijankuzner May 27, 2026
828c2a3
Log on error
sebastijankuzner May 27, 2026
66cd5ad
Add thread id
sebastijankuzner May 27, 2026
63d658b
Low worker names
sebastijankuzner May 27, 2026
508b436
Add spawning log
sebastijankuzner May 27, 2026
d8cbfeb
Change log position
sebastijankuzner May 27, 2026
03577a8
Improve worker selection
sebastijankuzner May 27, 2026
c32ec2d
Workers are private
sebastijankuzner May 27, 2026
f61c2d6
Handle no workers available
sebastijankuzner May 27, 2026
38f62e0
Make get worker async
sebastijankuzner May 27, 2026
19ee81c
Merge branch 'develop' into test/crypto-worker
sebastijankuzner May 27, 2026
1004685
Fix paths
sebastijankuzner May 27, 2026
2781d0e
Fix worker contracts
sebastijankuzner May 27, 2026
27569f8
Remove TODO
sebastijankuzner May 27, 2026
49f8686
Use events for emits
sebastijankuzner May 27, 2026
20dafcc
Join message events
sebastijankuzner May 27, 2026
57455f5
Fix types
sebastijankuzner May 27, 2026
2bc04bf
Fix types
sebastijankuzner May 27, 2026
2e2fd68
Handle stopped worker
sebastijankuzner May 27, 2026
de2814f
Remove stopped workers from pool
sebastijankuzner May 27, 2026
2647af4
Fix emit
sebastijankuzner May 27, 2026
d469fe3
Update emit type
sebastijankuzner May 27, 2026
ac01356
style: resolve style guide violations [ci-lint-fix]
sebastijankuzner May 27, 2026
e06afd4
Deps
sebastijankuzner May 27, 2026
d006ce2
Merge branch 'develop' into feat/ipc/imporvements
sebastijankuzner May 28, 2026
d2b70d5
Initial dispose
sebastijankuzner May 28, 2026
1990008
Log when disposed
sebastijankuzner May 28, 2026
ad53f0d
Dispose workers
sebastijankuzner May 28, 2026
641ab11
Use terminate
sebastijankuzner May 28, 2026
7598cbf
Fix logs
sebastijankuzner May 28, 2026
80daa9b
style: resolve style guide violations [ci-lint-fix]
sebastijankuzner May 28, 2026
bc7a426
Support drain
sebastijankuzner May 28, 2026
c62c412
Remove dipose from app
sebastijankuzner May 28, 2026
df9e075
Fix functional
sebastijankuzner May 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/consensus/source/aggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class Aggregator implements Contracts.Consensus.Aggregator {
validators[key] = true;
}

const worker = await this.workerPool.getWorker();
const worker = this.workerPool.getWorker();
const signature = await worker.consensusSignature("aggregate", signatures);

return {
Expand All @@ -51,7 +51,7 @@ export class Aggregator implements Contracts.Consensus.Aggregator {
return false;
}

const worker = await this.workerPool.getWorker();
const worker = this.workerPool.getWorker();

const aggregatedPublicKey = await worker.publicKeyFactory("aggregate", validatorPublicKeys);

Expand Down
2 changes: 1 addition & 1 deletion packages/consensus/source/processors/message-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export class MessageProcessor extends AbstractProcessor implements Contracts.Con
}

async #hasValidSignature(message: Contracts.Crypto.Message): Promise<boolean> {
const worker = await this.workerPool.getWorker();
const worker = this.workerPool.getWorker();
return worker.consensusSignature(
"verify",
Buffer.from(message.signature, "hex"),
Expand Down
46 changes: 33 additions & 13 deletions packages/contracts/source/contracts/crypto/worker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { BlockFactory } from "../crypto/block.js";
import type { PublicKeyFactory, SignatureBls, SignatureEcdsa } from "../crypto/identities.js";
import type { TransactionFactory } from "../crypto/transactions.js";
import type { Requests, Subprocess } from "../kernel/ipc.js";
import type { MethodArguments, Requests } from "../kernel/ipc.js";
import type { JsonObject } from "../types/index.js";

export interface WorkerFlags extends JsonObject {
Expand All @@ -10,41 +10,61 @@ export interface WorkerFlags extends JsonObject {

export interface WorkerScriptHandler {
boot(flags: WorkerFlags): Promise<void>;
dispose(): Promise<void>;
consensusSignature<K extends Requests<SignatureBls>>(
method: K,
...arguments_: Parameters<SignatureBls[K]>
arguments_: MethodArguments<SignatureBls, K>,
): Promise<ReturnType<SignatureBls[K]>>;
walletSignature<K extends Requests<SignatureEcdsa>>(
method: K,
...arguments_: Parameters<SignatureEcdsa[K]>
arguments_: MethodArguments<SignatureEcdsa, K>,
): Promise<ReturnType<SignatureEcdsa[K]>>;
blockFactory<K extends Requests<BlockFactory>>(
method: K,
...arguments_: Parameters<BlockFactory[K]>
arguments_: MethodArguments<BlockFactory, K>,
): Promise<ReturnType<BlockFactory[K]>>;
transactionFactory<K extends Requests<TransactionFactory>>(
method: K,
...arguments_: Parameters<TransactionFactory[K]>
arguments_: MethodArguments<TransactionFactory, K>,
): Promise<ReturnType<TransactionFactory[K]>>;
publicKeyFactory<K extends Requests<PublicKeyFactory>>(
method: K,
...arguments_: Parameters<PublicKeyFactory[K]>
arguments_: MethodArguments<PublicKeyFactory, K>,
): Promise<ReturnType<PublicKeyFactory[K]>>;
}

export type WorkerFactory = () => Worker;

export type WorkerSubprocess = Subprocess<WorkerScriptHandler>;

export type WorkerSubprocessFactory = () => WorkerSubprocess;

export interface Worker extends WorkerScriptHandler {
export interface Worker {
boot(flags: WorkerFlags): Promise<void>;
dispose(): Promise<void>;
getQueueSize(): number;
isStopped(): boolean;
kill(): Promise<number>;
consensusSignature<K extends Requests<SignatureBls>>(
method: K,
...arguments_: Parameters<SignatureBls[K]>
): Promise<ReturnType<SignatureBls[K]>>;
walletSignature<K extends Requests<SignatureEcdsa>>(
method: K,
...arguments_: Parameters<SignatureEcdsa[K]>
): Promise<ReturnType<SignatureEcdsa[K]>>;
blockFactory<K extends Requests<BlockFactory>>(
method: K,
...arguments_: Parameters<BlockFactory[K]>
): Promise<ReturnType<BlockFactory[K]>>;
transactionFactory<K extends Requests<TransactionFactory>>(
method: K,
...arguments_: Parameters<TransactionFactory[K]>
): Promise<ReturnType<TransactionFactory[K]>>;
publicKeyFactory<K extends Requests<PublicKeyFactory>>(
method: K,
...arguments_: Parameters<PublicKeyFactory[K]>
): Promise<ReturnType<PublicKeyFactory[K]>>;
}

export interface WorkerPool {
boot(): Promise<void>;
shutdown(): Promise<void>;
getWorker(): Promise<Worker>;
dispose(): Promise<void>;
getWorker(): Worker;
}
6 changes: 1 addition & 5 deletions packages/contracts/source/contracts/evm/worker.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
import type { CommitHandler } from "../crypto/index.js";
import type { EventListener } from "../kernel/index.js";
import type { Subprocess } from "../kernel/ipc.js";
import type { KeyValuePair } from "../types/index.js";

export type WorkerFlags = KeyValuePair;

export interface WorkerScriptHandler {
boot(flags: WorkerFlags): Promise<void>;
dispose(): Promise<void>;
setPeerCount(peerCount: number): Promise<void>;
commit(blockNumber: number): Promise<void>;
start(blockNumber: number): Promise<void>;
}

export type WorkerFactory = () => Worker;

export type WorkerSubprocess = Subprocess<WorkerScriptHandler>;

export type WorkerSubprocessFactory = () => WorkerSubprocess;

export interface Worker extends Omit<WorkerScriptHandler, "commit">, CommitHandler, EventListener {
getQueueSize(): number;
kill(): Promise<number>;
Expand Down
9 changes: 7 additions & 2 deletions packages/contracts/source/contracts/kernel/ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export type ErrorReply = {

export type Event = {
event: string;
data: string;
data: unknown;
};

export type Reply<T = unknown> = SuccessReply<T> | ErrorReply;
Expand All @@ -39,9 +39,14 @@ export interface Handler<T extends object> {
handleRequest<K extends Requests<T>>(method: K): void;
}

export interface Subprocess<T> {
export interface Subprocess {
dispose(): Promise<number>;
drain(): Promise<void>;
getQueueSize(): number;
isStopped(): boolean;
kill(): Promise<number>;
sendRequest<T>(method: string, ...arguments_: unknown[]): Promise<T>;
registerEventHandler<T>(event: string, callback: EventCallback<T>): void;
}

export type SubprocessFactory = () => Subprocess;
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import type { CommitHandler } from "../crypto/index.js";
import type { EventListener } from "../kernel/index.js";
import type { EventCallback, Subprocess } from "../kernel/ipc.js";
import type { EventCallback } from "../kernel/ipc.js";
import type { KeyValuePair } from "../types/index.js";
import type { GetBatchResult, GetBatchOptions } from "./selector.js";

export type WorkerFlags = KeyValuePair;

export interface WorkerScriptHandler {
boot(flags: WorkerFlags): Promise<void>;
dispose(): Promise<void>;
getTransactions(options: GetBatchOptions): Promise<GetBatchResult>;
removeTransaction(address: string, id: string): Promise<void>;
commit(height: number, sendersAddresses: string[], consumedGas: number, isSyncing: boolean): Promise<void>;
Expand All @@ -19,10 +20,6 @@ export interface WorkerScriptHandler {

export type WorkerFactory = () => Worker;

export type WorkerSubprocess = Subprocess<WorkerScriptHandler>;

export type WorkerSubprocessFactory = () => WorkerSubprocess;

export interface Worker extends Omit<WorkerScriptHandler, "commit">, CommitHandler, EventListener {
getQueueSize(): number;
kill(): Promise<number>;
Expand Down
1 change: 0 additions & 1 deletion packages/core/bin/config/devnet/core/.env
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,4 @@ MAINSAIL_API_TRANSACTION_POOL_DISABLED=
MAINSAIL_API_TRANSACTION_POOL_HOST=0.0.0.0
MAINSAIL_API_TRANSACTION_POOL_PORT=4007

MAINSAIL_CRYPTO_WORKER_COUNT=2
MAINSAIL_CRYPTO_WORKER_LOGGING_ENABLED=
2 changes: 1 addition & 1 deletion packages/crypto-messages/source/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class Factory implements Contracts.Crypto.MessageFactory {
keyPair: Contracts.Crypto.KeyPair,
context: Contracts.Crypto.SignatureMessageContext,
): Promise<Contracts.Crypto.Message> {
const worker = await this.workerPool.getWorker();
const worker = this.workerPool.getWorker();

const bytes = await this.serializer.serializeMessageForSignature(data, context);
const signature = await worker.consensusSignature("sign", bytes, Buffer.from(keyPair.privateKey, "hex"));
Expand Down
2 changes: 1 addition & 1 deletion packages/crypto-proposal/source/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class Factory implements Contracts.Crypto.ProposalFactory {
data: Contracts.Crypto.ProposalDataSerializableUnsigned,
keyPair: Contracts.Crypto.KeyPair,
): Promise<Contracts.Crypto.Proposal> {
const worker = await this.workerPool.getWorker();
const worker = this.workerPool.getWorker();

this.#verifySchema("proposalUnsigned", data);

Expand Down
2 changes: 1 addition & 1 deletion packages/crypto-transaction/source/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ export class TransactionFactory implements Contracts.Crypto.TransactionFactory {
try {
const { data: transaction } = await this.deserializer.deserialize(serialized);

const worker = this.workerPool ? await this.workerPool.getWorker() : undefined;
const worker = this.workerPool ? this.workerPool.getWorker() : undefined;
const cryptoData = worker
? await worker.transactionFactory("computeCryptoData", transaction)
: await this.computeCryptoData(transaction);
Expand Down
1 change: 0 additions & 1 deletion packages/crypto-worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"@mainsail/constants": "workspace:*",
"@mainsail/container": "workspace:*",
"@mainsail/kernel": "workspace:*",
"@mainsail/utils": "workspace:*",
"joi": "18.2.1"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/crypto-worker/source/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ import { Environment } from "@mainsail/kernel";
import { cpus } from "os";

export const defaults = {
workerCount: Environment.get(EnvironmentVariables.MAINSAIL_CRYPTO_WORKER_COUNT, cpus().length),
workerCount: Environment.get(EnvironmentVariables.MAINSAIL_CRYPTO_WORKER_COUNT, Math.min(cpus().length, 4)),
workerLoggingEnabled: Environment.isTrue(EnvironmentVariables.MAINSAIL_CRYPTO_WORKER_LOGGING_ENABLED),
};
8 changes: 4 additions & 4 deletions packages/crypto-worker/source/service-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { injectable } from "@mainsail/container";
import { Ipc, Providers } from "@mainsail/kernel";
import Joi from "joi";
import { cpus } from "os";
import { URL } from "url";
import { fileURLToPath } from "url";
import { Worker } from "worker_threads";

import { WorkerPool } from "./worker-pool.js";
Expand All @@ -25,11 +25,11 @@ export class ServiceProvider extends Providers.ServiceProvider {
this.app.bind(Identifiers.CryptoWorker.WorkerPool).to(WorkerPool).inSingletonScope();

this.app.bind<() => Ipc.Subprocess>(Identifiers.CryptoWorker.WorkerSubprocess.Factory).toFactory(() => () => {
const subprocess = new Worker(`${new URL(".", import.meta.url).pathname}/worker-script.js`, {
const subprocess = new Worker(fileURLToPath(new URL("worker-script.js", import.meta.url)), {
stderr: true,
stdout: true,
});
return new Ipc.Subprocess(this.app, "system", subprocess);
return new Ipc.Subprocess(this.app, "crypto", "system", subprocess);
});
}

Expand All @@ -38,7 +38,7 @@ export class ServiceProvider extends Providers.ServiceProvider {
}

public async dispose(): Promise<void> {
await this.app.get<Contracts.Crypto.WorkerPool>(Identifiers.CryptoWorker.WorkerPool).shutdown();
await this.app.get<Contracts.Crypto.WorkerPool>(Identifiers.CryptoWorker.WorkerPool).dispose();
}

public async required(): Promise<boolean> {
Expand Down
40 changes: 19 additions & 21 deletions packages/crypto-worker/source/worker-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,60 +107,58 @@ function isBufferJson(value: unknown): value is BufferJson {
}

export class WorkerScriptHandler implements Contracts.Crypto.WorkerScriptHandler {
#app = new Application();
#impl!: WorkerImpl;

public async boot(flags: Contracts.Crypto.WorkerFlags): Promise<void> {
const app: Contracts.Kernel.Application = new Application();

await app.bootstrap({
await this.#app.bootstrap({
flags,
});

if (!flags.workerLoggingEnabled) {
app.rebind(Identifiers.Services.Log.Service).to(Services.Log.NullLogger);
this.#app.rebind(Identifiers.Services.Log.Service).to(Services.Log.NullLogger);
}

await app.boot();
this.#impl = app.resolve(WorkerImpl);
await this.#app.boot();
this.#impl = this.#app.resolve(WorkerImpl);
}

public async dispose(): Promise<void> {
await this.#app.terminate();
}

public async consensusSignature<K extends Contracts.Kernel.IPC.Requests<Contracts.Crypto.SignatureBls>>(
method: K,
...arguments_: Parameters<Contracts.Crypto.SignatureBls[K]>
arguments_: Contracts.Kernel.IPC.MethodArguments<Contracts.Crypto.SignatureBls, K>,
): Promise<ReturnType<Contracts.Crypto.SignatureBls[K]>> {
// @ts-ignore
return this.#impl.callConsensusSignature(method, arguments_[0]);
return this.#impl.callConsensusSignature(method, arguments_);
}

public async walletSignature<K extends Contracts.Kernel.IPC.Requests<Contracts.Crypto.SignatureEcdsa>>(
method: K,
...arguments_: Parameters<Contracts.Crypto.SignatureEcdsa[K]>
arguments_: Contracts.Kernel.IPC.MethodArguments<Contracts.Crypto.SignatureEcdsa, K>,
): Promise<ReturnType<Contracts.Crypto.SignatureEcdsa[K]>> {
// @ts-ignore
return this.#impl.callWalletSignature(method, arguments_[0]);
return this.#impl.callWalletSignature(method, arguments_);
}

public async blockFactory<K extends Contracts.Kernel.IPC.Requests<Contracts.Crypto.BlockFactory>>(
method: K,
...arguments_: Parameters<Contracts.Crypto.BlockFactory[K]>
arguments_: Contracts.Kernel.IPC.MethodArguments<Contracts.Crypto.BlockFactory, K>,
): Promise<ReturnType<Contracts.Crypto.BlockFactory[K]>> {
// @ts-ignore
return this.#impl.callBlockFactory(method, arguments_[0]);
return this.#impl.callBlockFactory(method, arguments_);
}

public async transactionFactory<K extends Contracts.Kernel.IPC.Requests<Contracts.Crypto.TransactionFactory>>(
method: K,
...arguments_: Parameters<Contracts.Crypto.TransactionFactory[K]>
arguments_: Contracts.Kernel.IPC.MethodArguments<Contracts.Crypto.TransactionFactory, K>,
): Promise<ReturnType<Contracts.Crypto.TransactionFactory[K]>> {
// @ts-ignore
return this.#impl.callTransactionFactory(method, arguments_[0]);
return this.#impl.callTransactionFactory(method, arguments_);
}

public async publicKeyFactory<K extends Contracts.Kernel.IPC.Requests<Contracts.Crypto.PublicKeyFactory>>(
method: K,
...arguments_: Parameters<Contracts.Crypto.PublicKeyFactory[K]>
arguments_: Contracts.Kernel.IPC.MethodArguments<Contracts.Crypto.PublicKeyFactory, K>,
): Promise<ReturnType<Contracts.Crypto.PublicKeyFactory[K]>> {
// @ts-ignore
return this.#impl.callPublicKeyFactory(method, arguments_[0]);
return this.#impl.callPublicKeyFactory(method, arguments_);
}
}
Loading
Loading