Skip to content

Commit 4d1c26f

Browse files
killaguclaude
andcommitted
feat: log cluster module IPC via onelogger
Add structured logging on every master<->app worker IPC point in Node.js cluster mode (process mode only). Covers both application-layer messages and the internal NODE_CLUSTER fd dispatch / fd ack. Application layer (always on): - master->app send (AppProcessWorker#send) - master->app sticky-session Socket direct send (master.ts) - master<-app recv (cluster.on 'fork' -> worker.on 'message') - app->master send (static AppProcessWorker.send) - app<-master recv (process.on 'message' in app_worker.ts) Internal cluster layer (opt-in via EGG_CLUSTER_IPC_LOG=1, very verbose): - master<-app internal (worker.process.on 'internalMessage'): online / listening / queryServer / accepted (fd ack) / close - app<-master internal (process.on 'internalMessage'): newconn (fd dispatch, with handle) / disconnect / suicide Users can replace the default console sink via onelogger.setLogger(). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3304051 commit 4d1c26f

5 files changed

Lines changed: 141 additions & 2 deletions

File tree

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@
4343
"graceful-process": "^2.0.0",
4444
"sendmessage": "^3.0.1",
4545
"terminal-link": "^2.1.1",
46-
"utility": "^2.2.0"
46+
"utility": "^2.2.0",
47+
"onelogger": "^1.0.1"
4748
},
4849
"devDependencies": {
4950
"@arethetypeswrong/cli": "^0.17.1",

src/app_worker.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { importModule } from '@eggjs/utils';
88
import { BaseAppWorker } from './utils/mode/base/app.js';
99
import { AppThreadWorker } from './utils/mode/impl/worker_threads/app.js';
1010
import { AppProcessWorker } from './utils/mode/impl/process/app.js';
11+
import { ipcLogger, formatIpcMessage, internalIpcLogEnabled } from './utils/ipc_logger.js';
1112

1213
const debug = debuglog('@eggjs/cluster/app_worker');
1314

@@ -38,6 +39,29 @@ async function main() {
3839
AppWorker = AppThreadWorker as any;
3940
} else {
4041
AppWorker = AppProcessWorker as any;
42+
// D. master -> app (recv): log every IPC message delivered to this worker via the cluster channel.
43+
// Handle is present when master forwards a `net.Socket` (sticky-session mode).
44+
// This listener is read-only; other `process.on('message')` listeners (framework, sticky handler,
45+
// etc.) are unaffected.
46+
process.on('message', (msg: any, handle: any) => {
47+
const body = typeof msg === 'string' ? { action: msg } : msg;
48+
ipcLogger.info(formatIpcMessage(`app#${process.pid}<-master`, body, handle));
49+
});
50+
51+
// F. master -> app internal NODE_CLUSTER messages (newconn with fd, disconnect, suicide, ...).
52+
// `internalMessage` is an undocumented but stable Node.js event.
53+
// Opt-in via EGG_CLUSTER_IPC_LOG because `newconn` fires once per HTTP request.
54+
if (internalIpcLogEnabled) {
55+
process.on('internalMessage', (msg: { cmd?: string; act?: string; ack?: number }, handle: unknown) => {
56+
if (!msg || msg.cmd !== 'NODE_CLUSTER') return;
57+
const label = msg.act ? `cluster:${msg.act}` : `cluster:ack#${msg.ack ?? '?'}`;
58+
ipcLogger.info(formatIpcMessage(
59+
`app#${process.pid}<-master`,
60+
{ action: label, data: msg },
61+
handle,
62+
));
63+
});
64+
}
4165
}
4266

4367
const consoleLogger = new ConsoleLogger({

src/master.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {
2323
} from './utils/mode/impl/worker_threads/agent.js';
2424
import { AppThreadWorker, AppThreadUtils as WorkerThreadsAppWorker } from './utils/mode/impl/worker_threads/app.js';
2525
import { ClusterWorkerExceptionError } from './error/ClusterWorkerExceptionError.js';
26+
import { ipcLogger, formatIpcMessage } from './utils/ipc_logger.js';
2627

2728
const debug = debuglog('@eggjs/cluster/master');
2829

@@ -265,6 +266,11 @@ export class Master extends ReadyEventEmitter {
265266
connection.destroy();
266267
} else {
267268
const worker = this.stickyWorker(connection.remoteAddress) as AppProcessWorker;
269+
ipcLogger.info(formatIpcMessage(
270+
`master->app#${worker.workerId}`,
271+
{ action: 'sticky-session:connection' },
272+
connection,
273+
));
268274
worker.instance.send('sticky-session:connection', connection);
269275
}
270276
}).listen(this.#realPort, cb);

src/utils/ipc_logger.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import net from 'node:net';
2+
import { getLogger } from 'onelogger';
3+
import type { MessageBody } from './messenger.js';
4+
5+
/**
6+
* onelogger instance for Node.js cluster module IPC traffic between master and app workers.
7+
* Users can override the underlying sink via `onelogger.setLogger()` / `setCustomLogger()`.
8+
*/
9+
export const ipcLogger = getLogger('@eggjs/cluster:ipc');
10+
11+
/**
12+
* Whether internal-level cluster IPC logs (NODE_CLUSTER newconn/accepted/listening/...) are enabled.
13+
* These are very verbose (one `cluster:newconn` per HTTP request), so they are opt-in
14+
* via the `EGG_CLUSTER_IPC_LOG` environment variable (any truthy value enables).
15+
*/
16+
export const internalIpcLogEnabled = !!process.env.EGG_CLUSTER_IPC_LOG;
17+
18+
const MAX_STRING_LEN = 200;
19+
const MAX_TOTAL_LEN = 1024;
20+
21+
function describeHandle(handle: unknown): string {
22+
if (handle == null) return '';
23+
if (handle instanceof net.Socket) {
24+
const fd = (handle as unknown as { _handle?: { fd?: number } })._handle?.fd;
25+
return fd != null ? `<Socket fd=${fd}>` : '<Socket>';
26+
}
27+
if (handle instanceof net.Server) {
28+
return '<Server>';
29+
}
30+
const ctor = (handle as { constructor?: { name?: string } })?.constructor?.name;
31+
return ctor ? `<${ctor}>` : '<handle>';
32+
}
33+
34+
function makeReplacer() {
35+
const seen = new WeakSet<object>();
36+
return function replacer(_key: string, value: unknown) {
37+
if (value && typeof value === 'object') {
38+
if (seen.has(value as object)) return '<Circular>';
39+
seen.add(value as object);
40+
if (value instanceof net.Socket) return describeHandle(value);
41+
if (value instanceof net.Server) return '<Server>';
42+
if (Buffer.isBuffer(value)) return `<Buffer len=${value.length}>`;
43+
}
44+
if (typeof value === 'string' && value.length > MAX_STRING_LEN) {
45+
return `${value.slice(0, MAX_STRING_LEN)}...(truncated)`;
46+
}
47+
return value;
48+
};
49+
}
50+
51+
function stringifyData(data: unknown): string {
52+
let out: string;
53+
try {
54+
out = JSON.stringify(data, makeReplacer());
55+
} catch (err) {
56+
return `<unserializable: ${(err as Error).message}>`;
57+
}
58+
if (out && out.length > MAX_TOTAL_LEN) {
59+
out = `${out.slice(0, MAX_TOTAL_LEN)}...(truncated)`;
60+
}
61+
return out;
62+
}
63+
64+
/**
65+
* Format a single IPC message into a one-line log string.
66+
* @param direction e.g. 'master->app#12345' / 'app#12345<-master'
67+
* @param msg the message body (supports cluster internal msgs via `action: 'cluster:<act>'`)
68+
* @param handle optional handle (net.Socket / net.Server) attached to the IPC message
69+
*/
70+
export function formatIpcMessage(
71+
direction: string,
72+
msg: Partial<MessageBody> & { action?: string; data?: unknown },
73+
handle?: unknown,
74+
): string {
75+
const action = msg?.action ?? '<unknown>';
76+
let out = `[${direction}] action=${action}`;
77+
if (msg && msg.data !== undefined) {
78+
out += ` data=${stringifyData(msg.data)}`;
79+
}
80+
if (handle) {
81+
out += ` +handle=${describeHandle(handle)}`;
82+
}
83+
return out;
84+
}

src/utils/mode/impl/process/app.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { graceful as gracefulExit, type Options as gracefulExitOptions } from 'g
55
import { BaseAppWorker, BaseAppUtils } from '../../base/app.js';
66
import { terminate } from '../../../terminate.js';
77
import type { MessageBody } from '../../../messenger.js';
8+
import { ipcLogger, formatIpcMessage, internalIpcLogEnabled } from '../../../ipc_logger.js';
89

910
export class AppProcessWorker extends BaseAppWorker<ClusterProcessWorker> {
1011
get id() {
@@ -24,6 +25,7 @@ export class AppProcessWorker extends BaseAppWorker<ClusterProcessWorker> {
2425
}
2526

2627
send(message: MessageBody) {
28+
ipcLogger.info(formatIpcMessage(`master->app#${this.workerId}`, message));
2729
sendmessage(this.instance, message);
2830
}
2931

@@ -43,6 +45,7 @@ export class AppProcessWorker extends BaseAppWorker<ClusterProcessWorker> {
4345

4446
static send(message: MessageBody) {
4547
message.senderWorkerId = String(process.pid);
48+
ipcLogger.info(formatIpcMessage(`app#${process.pid}->master`, message));
4649
process.send!(message);
4750
}
4851

@@ -78,16 +81,37 @@ export class AppProcessUtils extends BaseAppUtils {
7881
const appWorker = new AppProcessWorker(worker);
7982
this.emit('worker_forked', appWorker);
8083
appWorker.disableRefork = true;
81-
worker.on('message', msg => {
84+
worker.on('message', (msg, handle) => {
8285
if (typeof msg === 'string') {
8386
msg = {
8487
action: msg,
8588
data: msg,
8689
};
8790
}
8891
msg.from = 'app';
92+
ipcLogger.info(formatIpcMessage(
93+
`master<-app#${worker.process.pid}`,
94+
msg,
95+
handle,
96+
));
8997
this.messenger.send(msg);
9098
});
99+
100+
// cluster internal NODE_CLUSTER messages (listening / online / queryServer / accepted (fd ack) / close / ...)
101+
// Must hook on `worker.process` (ChildProcess) — `cluster.Worker` doesn't forward `internalMessage`.
102+
// Note: `internalMessage` is not a documented Node.js event but has been stable across major versions.
103+
// Opt-in via EGG_CLUSTER_IPC_LOG because this is very verbose under load.
104+
if (internalIpcLogEnabled) {
105+
worker.process.on('internalMessage', (msg: { cmd?: string; act?: string; ack?: number }, handle: unknown) => {
106+
if (!msg || msg.cmd !== 'NODE_CLUSTER') return;
107+
const label = msg.act ? `cluster:${msg.act}` : `cluster:ack#${msg.ack ?? '?'}`;
108+
ipcLogger.info(formatIpcMessage(
109+
`master<-app#${worker.process.pid}`,
110+
{ action: label, data: msg },
111+
handle,
112+
));
113+
});
114+
}
91115
this.log('[master] app_worker#%s:%s start, state: %s, current workers: %j',
92116
appWorker.id, appWorker.workerId, appWorker.state,
93117
Object.keys(cluster.workers!));

0 commit comments

Comments
 (0)