Skip to content

Commit 38bd66d

Browse files
fix: redirect worker logs to main process (#988)
* add workerMode to pino logger * redirect worker stdout * style: resolve style guide violations * use Set * expose isValidLevel * adjust format * Rename instance --------- Co-authored-by: sebastijankuzner <sebastijan.kuzner@outlook.com>
1 parent ba21b69 commit 38bd66d

13 files changed

Lines changed: 99 additions & 22 deletions

File tree

packages/contracts/source/contracts/kernel/log.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ export interface Logger {
1515

1616
debug(message: string): void;
1717

18+
isValidLevel(level: string): boolean;
19+
1820
suppressConsoleOutput(suppress: boolean): void;
1921

2022
dispose(): Promise<void>;

packages/crypto-worker/source/service-provider.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@ export class ServiceProvider extends Providers.ServiceProvider {
2525
this.app
2626
.bind<() => Ipc.Subprocess<any>>(Identifiers.CryptoWorker.WorkerSubprocess.Factory)
2727
.toFactory(() => () => {
28-
const subprocess = new Worker(`${new URL(".", import.meta.url).pathname}/worker-script.js`, {});
29-
return new Ipc.Subprocess(subprocess);
28+
const subprocess = new Worker(`${new URL(".", import.meta.url).pathname}/worker-script.js`, {
29+
stderr: true,
30+
stdout: true,
31+
});
32+
return new Ipc.Subprocess(this.app, subprocess);
3033
});
3134
}
3235

packages/evm-api-worker/source/service-provider.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@ export class ServiceProvider extends Providers.ServiceProvider {
1313

1414
public async register(): Promise<void> {
1515
this.app.bind<() => Ipc.Subprocess<any>>(Identifiers.Evm.WorkerSubprocess.Factory).toFactory(() => () => {
16-
const subprocess = new Worker(`${new URL(".", import.meta.url).pathname}/worker-script.js`, {});
17-
return new Ipc.Subprocess(subprocess);
16+
const subprocess = new Worker(`${new URL(".", import.meta.url).pathname}/worker-script.js`, {
17+
stderr: true,
18+
stdout: true,
19+
});
20+
return new Ipc.Subprocess(this.app, subprocess);
1821
});
1922

2023
this.app.bind(Identifiers.Evm.Worker).toConstantValue(this.app.resolve(WorkerInstance));

packages/evm-service/source/instances/evm.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ export class EvmInstance implements Contracts.Evm.Instance, Contracts.Evm.Storag
2121

2222
@postConstruct()
2323
public initialize() {
24-
const logPrefix = `${this.constructor.name}`;
24+
const logPrefix = `evm`;
2525

2626
this.#evm = new Evm({
2727
historySize: 256n,

packages/kernel/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,15 @@
4242
"log-process-errors": "12.0.0",
4343
"nanomatch": "1.2.13",
4444
"nsfw": "2.2.5",
45-
"semver": "7.7.1"
45+
"semver": "7.7.1",
46+
"split2": "4.2.0"
4647
},
4748
"devDependencies": {
4849
"@types/capture-console": "1.0.5",
4950
"@types/fs-extra": "11.0.4",
5051
"@types/log-process-errors": "6.3.1",
5152
"@types/semver": "7.5.8",
53+
"@types/split2": "4.2.3",
5254
"@types/tmp": "0.2.6",
5355
"capture-console": "1.0.2",
5456
"moment-timezone": "0.5.47",

packages/kernel/source/ipc/sub-process.ts

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { Contracts } from "@mainsail/contracts";
1+
import { Contracts, Identifiers } from "@mainsail/contracts";
2+
import split from "split2";
23
import { Worker } from "worker_threads";
34

45
export class Subprocess<T extends Record<string, any>> implements Contracts.Kernel.IPC.Subprocess<T> {
@@ -7,10 +8,34 @@ export class Subprocess<T extends Record<string, any>> implements Contracts.Kern
78
private readonly callbacks = new Map<number, Contracts.Kernel.IPC.RequestCallbacks<T>>();
89
private readonly eventHandlers = new Map<string, Contracts.Kernel.IPC.EventCallback<any>>();
910

10-
public constructor(subprocess: Worker) {
11+
public constructor(app: Contracts.Kernel.Application, subprocess: Worker) {
1112
this.subprocess = subprocess;
1213
this.subprocess.on("message", this.onSubprocessMessage.bind(this));
1314
this.subprocess.on("message", this.onEmit.bind(this));
15+
16+
const logger = app.get<Contracts.Kernel.Logger>(Identifiers.Services.Log.Service);
17+
18+
this.subprocess.stdout.pipe(split()).on("data", (line) => {
19+
// [LEVEL] MESSAGE
20+
const match = line.match(/^\[(\w+)]\s+(.*)$/);
21+
if (!match) {
22+
// Fallback to normal console.log if output doesn't match expected format.
23+
// For example, this is the case when worker uses `console.log` directly instead of logger service.
24+
console.log(line);
25+
return;
26+
}
27+
28+
const [, level, message] = match;
29+
if (logger.isValidLevel(level)) {
30+
logger[level](message);
31+
} else {
32+
logger.warning(`[unknown:${level}] ${message}`);
33+
}
34+
});
35+
36+
this.subprocess.stderr.pipe(split()).on("data", (line) => {
37+
logger.error(line);
38+
});
1439
}
1540

1641
public async kill(): Promise<number> {

packages/kernel/source/services/log/drivers/memory.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ export class MemoryLogger implements Contracts.Kernel.Logger {
5858
this.log("debug", message);
5959
}
6060

61+
public isValidLevel(level: string): boolean {
62+
return !!this.levelStyles[level];
63+
}
64+
6165
public suppressConsoleOutput(suppress: boolean): void {
6266
this.silentConsole = suppress;
6367
}

packages/kernel/source/services/log/drivers/null.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ export class NullLogger implements Contracts.Kernel.Logger {
3939
//
4040
}
4141

42+
public isValidLevel(level: string): boolean {
43+
return true;
44+
}
45+
4246
public suppressConsoleOutput(suppress: boolean): void {
4347
//
4448
}

packages/logger-pino/source/driver.ts

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import { inspect } from "util";
1515

1616
@injectable()
1717
export class PinoLogger implements Contracts.Kernel.Logger {
18+
static LOG_LEVELS = new Set(["emergency", "alert", "critical", "error", "warning", "notice", "info", "debug"]);
19+
1820
@inject(Identifiers.Application.Instance)
1921
private readonly app!: Contracts.Kernel.Application;
2022

@@ -39,6 +41,22 @@ export class PinoLogger implements Contracts.Kernel.Logger {
3941

4042
public async make(options?: any): Promise<Contracts.Kernel.Logger> {
4143
this.#stream = new PassThrough();
44+
45+
if (options.workerMode) {
46+
this.#logger = Object.fromEntries(
47+
[...PinoLogger.LOG_LEVELS].map((level) => [
48+
level,
49+
(message: string) => {
50+
process.stdout.write(`[${level}] (${this.app.thread()}) ${message}\n`);
51+
},
52+
]),
53+
) as unknown as pino.Logger<
54+
"alert" | "critical" | "debug" | "emergency" | "error" | "info" | "notice" | "warning"
55+
>;
56+
57+
return this;
58+
}
59+
4260
this.#logger = pino.default(
4361
{
4462
base: null,
@@ -64,7 +82,7 @@ export class PinoLogger implements Contracts.Kernel.Logger {
6482
this.#stream,
6583
);
6684

67-
if (this.#isValidLevel(options.levels.console)) {
85+
if (this.isValidLevel(options.levels.console)) {
6886
pump(
6987
this.#stream,
7088
split(),
@@ -77,7 +95,7 @@ export class PinoLogger implements Contracts.Kernel.Logger {
7795
);
7896
}
7997

80-
if (this.#isValidLevel(options.levels.file)) {
98+
if (this.isValidLevel(options.levels.file)) {
8199
this.#combinedFileStream = new pumpify(
82100
split(),
83101
this.#createPrettyTransport(options.levels.file, { colorize: false }),
@@ -164,14 +182,8 @@ export class PinoLogger implements Contracts.Kernel.Logger {
164182
}
165183

166184
#createPrettyTransport(level: string, prettyOptions?: PrettyOptions): Transform {
167-
const thread = this.app.thread();
168-
const ignore = this.app.isWorker() ? `` : `pid`;
169-
170185
const pinoPretty = prettyFactory({
171-
customPrettifiers: {
172-
pid: () => thread,
173-
},
174-
ignore,
186+
ignore: "pid",
175187
levelFirst: false,
176188
translateTime: "yyyy-mm-dd HH:MM:ss.l",
177189
...prettyOptions,
@@ -229,7 +241,7 @@ export class PinoLogger implements Contracts.Kernel.Logger {
229241
);
230242
}
231243

232-
#isValidLevel(level: string): boolean {
233-
return ["emergency", "alert", "critical", "error", "warning", "notice", "info", "debug"].includes(level);
244+
public isValidLevel(level: string): boolean {
245+
return PinoLogger.LOG_LEVELS.has(level);
234246
}
235247
}

packages/logger-pino/source/service-provider.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,16 @@ export class ServiceProvider extends Providers.ServiceProvider {
1212
Identifiers.Services.Log.Manager,
1313
);
1414

15-
await logManager.extend("pino", async () => this.app.resolve<PinoLogger>(PinoLogger).make(this.config().all()));
15+
await logManager.extend("pino", async () => {
16+
const logger = this.app.resolve<PinoLogger>(PinoLogger);
17+
18+
if (this.app.thread() === "main") {
19+
return logger.make(this.config().all());
20+
}
21+
22+
// Log output from workers is piped to main logger via Ipc.Subprocess.
23+
return logger.make({ workerMode: true });
24+
});
1625

1726
logManager.setDefaultDriver("pino");
1827
}

0 commit comments

Comments
 (0)