Skip to content

Commit ab6817c

Browse files
feat: improve worker support (#1312)
* Fix boot * Use post construct * Limit default workers to 4 * Move method is not defined to try catch * Handle errors * messageerror * Log on error * Add thread id * Low worker names * Add spawning log * Change log position * Improve worker selection * Workers are private * Handle no workers available * Make get worker async * Fix paths * Fix worker contracts * Remove TODO * Use events for emits * Join message events * Fix types * Fix types * Handle stopped worker * Remove stopped workers from pool * Fix emit * Update emit type * style: resolve style guide violations [ci-lint-fix] * Deps * Initial dispose * Log when disposed * Dispose workers * Use terminate * Fix logs * style: resolve style guide violations [ci-lint-fix] * Support drain * Remove dipose from app * Fix functional
1 parent 5993367 commit ab6817c

34 files changed

Lines changed: 408 additions & 188 deletions

File tree

packages/consensus/source/aggregator.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ export class Aggregator implements Contracts.Consensus.Aggregator {
2929
validators[key] = true;
3030
}
3131

32-
const worker = await this.workerPool.getWorker();
32+
const worker = this.workerPool.getWorker();
3333
const signature = await worker.consensusSignature("aggregate", signatures);
3434

3535
return {
@@ -51,7 +51,7 @@ export class Aggregator implements Contracts.Consensus.Aggregator {
5151
return false;
5252
}
5353

54-
const worker = await this.workerPool.getWorker();
54+
const worker = this.workerPool.getWorker();
5555

5656
const aggregatedPublicKey = await worker.publicKeyFactory("aggregate", validatorPublicKeys);
5757

packages/consensus/source/processors/message-processor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ export class MessageProcessor extends AbstractProcessor implements Contracts.Con
106106
}
107107

108108
async #hasValidSignature(message: Contracts.Crypto.Message): Promise<boolean> {
109-
const worker = await this.workerPool.getWorker();
109+
const worker = this.workerPool.getWorker();
110110
return worker.consensusSignature(
111111
"verify",
112112
Buffer.from(message.signature, "hex"),
Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { BlockFactory } from "../crypto/block.js";
22
import type { PublicKeyFactory, SignatureBls, SignatureEcdsa } from "../crypto/identities.js";
33
import type { TransactionFactory } from "../crypto/transactions.js";
4-
import type { Requests, Subprocess } from "../kernel/ipc.js";
4+
import type { MethodArguments, Requests } from "../kernel/ipc.js";
55
import type { JsonObject } from "../types/index.js";
66

77
export interface WorkerFlags extends JsonObject {
@@ -10,41 +10,61 @@ export interface WorkerFlags extends JsonObject {
1010

1111
export interface WorkerScriptHandler {
1212
boot(flags: WorkerFlags): Promise<void>;
13+
dispose(): Promise<void>;
1314
consensusSignature<K extends Requests<SignatureBls>>(
1415
method: K,
15-
...arguments_: Parameters<SignatureBls[K]>
16+
arguments_: MethodArguments<SignatureBls, K>,
1617
): Promise<ReturnType<SignatureBls[K]>>;
1718
walletSignature<K extends Requests<SignatureEcdsa>>(
1819
method: K,
19-
...arguments_: Parameters<SignatureEcdsa[K]>
20+
arguments_: MethodArguments<SignatureEcdsa, K>,
2021
): Promise<ReturnType<SignatureEcdsa[K]>>;
2122
blockFactory<K extends Requests<BlockFactory>>(
2223
method: K,
23-
...arguments_: Parameters<BlockFactory[K]>
24+
arguments_: MethodArguments<BlockFactory, K>,
2425
): Promise<ReturnType<BlockFactory[K]>>;
2526
transactionFactory<K extends Requests<TransactionFactory>>(
2627
method: K,
27-
...arguments_: Parameters<TransactionFactory[K]>
28+
arguments_: MethodArguments<TransactionFactory, K>,
2829
): Promise<ReturnType<TransactionFactory[K]>>;
2930
publicKeyFactory<K extends Requests<PublicKeyFactory>>(
3031
method: K,
31-
...arguments_: Parameters<PublicKeyFactory[K]>
32+
arguments_: MethodArguments<PublicKeyFactory, K>,
3233
): Promise<ReturnType<PublicKeyFactory[K]>>;
3334
}
3435

3536
export type WorkerFactory = () => Worker;
3637

37-
export type WorkerSubprocess = Subprocess<WorkerScriptHandler>;
38-
39-
export type WorkerSubprocessFactory = () => WorkerSubprocess;
40-
41-
export interface Worker extends WorkerScriptHandler {
38+
export interface Worker {
39+
boot(flags: WorkerFlags): Promise<void>;
40+
dispose(): Promise<void>;
4241
getQueueSize(): number;
42+
isStopped(): boolean;
4343
kill(): Promise<number>;
44+
consensusSignature<K extends Requests<SignatureBls>>(
45+
method: K,
46+
...arguments_: Parameters<SignatureBls[K]>
47+
): Promise<ReturnType<SignatureBls[K]>>;
48+
walletSignature<K extends Requests<SignatureEcdsa>>(
49+
method: K,
50+
...arguments_: Parameters<SignatureEcdsa[K]>
51+
): Promise<ReturnType<SignatureEcdsa[K]>>;
52+
blockFactory<K extends Requests<BlockFactory>>(
53+
method: K,
54+
...arguments_: Parameters<BlockFactory[K]>
55+
): Promise<ReturnType<BlockFactory[K]>>;
56+
transactionFactory<K extends Requests<TransactionFactory>>(
57+
method: K,
58+
...arguments_: Parameters<TransactionFactory[K]>
59+
): Promise<ReturnType<TransactionFactory[K]>>;
60+
publicKeyFactory<K extends Requests<PublicKeyFactory>>(
61+
method: K,
62+
...arguments_: Parameters<PublicKeyFactory[K]>
63+
): Promise<ReturnType<PublicKeyFactory[K]>>;
4464
}
4565

4666
export interface WorkerPool {
4767
boot(): Promise<void>;
48-
shutdown(): Promise<void>;
49-
getWorker(): Promise<Worker>;
68+
dispose(): Promise<void>;
69+
getWorker(): Worker;
5070
}

packages/contracts/source/contracts/evm/worker.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,19 @@
11
import type { CommitHandler } from "../crypto/index.js";
22
import type { EventListener } from "../kernel/index.js";
3-
import type { Subprocess } from "../kernel/ipc.js";
43
import type { KeyValuePair } from "../types/index.js";
54

65
export type WorkerFlags = KeyValuePair;
76

87
export interface WorkerScriptHandler {
98
boot(flags: WorkerFlags): Promise<void>;
9+
dispose(): Promise<void>;
1010
setPeerCount(peerCount: number): Promise<void>;
1111
commit(blockNumber: number): Promise<void>;
1212
start(blockNumber: number): Promise<void>;
1313
}
1414

1515
export type WorkerFactory = () => Worker;
1616

17-
export type WorkerSubprocess = Subprocess<WorkerScriptHandler>;
18-
19-
export type WorkerSubprocessFactory = () => WorkerSubprocess;
20-
2117
export interface Worker extends Omit<WorkerScriptHandler, "commit">, CommitHandler, EventListener {
2218
getQueueSize(): number;
2319
kill(): Promise<number>;

packages/contracts/source/contracts/kernel/ipc.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ export type ErrorReply = {
2222

2323
export type Event = {
2424
event: string;
25-
data: string;
25+
data: unknown;
2626
};
2727

2828
export type Reply<T = unknown> = SuccessReply<T> | ErrorReply;
@@ -39,9 +39,14 @@ export interface Handler<T extends object> {
3939
handleRequest<K extends Requests<T>>(method: K): void;
4040
}
4141

42-
export interface Subprocess<T> {
42+
export interface Subprocess {
43+
dispose(): Promise<number>;
44+
drain(): Promise<void>;
4345
getQueueSize(): number;
46+
isStopped(): boolean;
4447
kill(): Promise<number>;
4548
sendRequest<T>(method: string, ...arguments_: unknown[]): Promise<T>;
4649
registerEventHandler<T>(event: string, callback: EventCallback<T>): void;
4750
}
51+
52+
export type SubprocessFactory = () => Subprocess;

packages/contracts/source/contracts/transaction-pool/worker.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import type { CommitHandler } from "../crypto/index.js";
22
import type { EventListener } from "../kernel/index.js";
3-
import type { EventCallback, Subprocess } from "../kernel/ipc.js";
3+
import type { EventCallback } from "../kernel/ipc.js";
44
import type { KeyValuePair } from "../types/index.js";
55
import type { GetBatchResult, GetBatchOptions } from "./selector.js";
66

77
export type WorkerFlags = KeyValuePair;
88

99
export interface WorkerScriptHandler {
1010
boot(flags: WorkerFlags): Promise<void>;
11+
dispose(): Promise<void>;
1112
getTransactions(options: GetBatchOptions): Promise<GetBatchResult>;
1213
removeTransaction(address: string, id: string): Promise<void>;
1314
commit(height: number, sendersAddresses: string[], consumedGas: number, isSyncing: boolean): Promise<void>;
@@ -19,10 +20,6 @@ export interface WorkerScriptHandler {
1920

2021
export type WorkerFactory = () => Worker;
2122

22-
export type WorkerSubprocess = Subprocess<WorkerScriptHandler>;
23-
24-
export type WorkerSubprocessFactory = () => WorkerSubprocess;
25-
2623
export interface Worker extends Omit<WorkerScriptHandler, "commit">, CommitHandler, EventListener {
2724
getQueueSize(): number;
2825
kill(): Promise<number>;

packages/core/bin/config/devnet/core/.env

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,4 @@ MAINSAIL_API_TRANSACTION_POOL_DISABLED=
2828
MAINSAIL_API_TRANSACTION_POOL_HOST=0.0.0.0
2929
MAINSAIL_API_TRANSACTION_POOL_PORT=4007
3030

31-
MAINSAIL_CRYPTO_WORKER_COUNT=2
3231
MAINSAIL_CRYPTO_WORKER_LOGGING_ENABLED=

packages/crypto-messages/source/factory.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export class Factory implements Contracts.Crypto.MessageFactory {
2525
keyPair: Contracts.Crypto.KeyPair,
2626
context: Contracts.Crypto.SignatureMessageContext,
2727
): Promise<Contracts.Crypto.Message> {
28-
const worker = await this.workerPool.getWorker();
28+
const worker = this.workerPool.getWorker();
2929

3030
const bytes = await this.serializer.serializeMessageForSignature(data, context);
3131
const signature = await worker.consensusSignature("sign", bytes, Buffer.from(keyPair.privateKey, "hex"));

packages/crypto-proposal/source/factory.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ export class Factory implements Contracts.Crypto.ProposalFactory {
3131
data: Contracts.Crypto.ProposalDataSerializableUnsigned,
3232
keyPair: Contracts.Crypto.KeyPair,
3333
): Promise<Contracts.Crypto.Proposal> {
34-
const worker = await this.workerPool.getWorker();
34+
const worker = this.workerPool.getWorker();
3535

3636
this.#verifySchema("proposalUnsigned", data);
3737

packages/crypto-transaction/source/factory.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ export class TransactionFactory implements Contracts.Crypto.TransactionFactory {
122122
try {
123123
const { data: transaction } = await this.deserializer.deserialize(serialized);
124124

125-
const worker = this.workerPool ? await this.workerPool.getWorker() : undefined;
125+
const worker = this.workerPool ? this.workerPool.getWorker() : undefined;
126126
const cryptoData = worker
127127
? await worker.transactionFactory("computeCryptoData", transaction)
128128
: await this.computeCryptoData(transaction);

0 commit comments

Comments
 (0)