Skip to content

Commit 06abf84

Browse files
Unitechclaude
andcommitted
Surface broken IPC channel + bound send queue (regression from ebd629e)
Commit ebd629e ("Silence cluster worker events to prevent boot crashes") fixed a real boot crash but made every broken-IPC-channel failure silently swallowed, which is the mechanism behind #6101 (RAM grows until reset) and #6111 (Node 24 EBADF masked into a silent symptom). - IPCTransport.send(): surface every failure path via a listener-count guarded 'error' event with a console.error fallback (emitting 'error' with no listener would itself crash the process; only services/actions attaches a 'data' listener, none for 'error'). - IPCTransport.send(): bound in-flight un-acked sends to IPC_MAX_INFLIGHT (new constant) so a saturated-but-connected pipe cannot grow retained callbacks / queued messages unbounded; excess dropped, surfaced once edge-triggered. - ProcessContainer.js / ProcessContainerBun.js: keep a cluster.worker 'error' handler registered (boot-crash protection preserved) but forward it via the existing process:exception path + console.error instead of discarding it silently. Adds modules/pm2-io-bpm/test/transports/IPCTransport.spec.js (TDD): 3 surface cases + 1 back-pressure bound case. Full pm2-io-bpm suite green; 2-instance cluster boots online with 0 restarts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent ebd629e commit 06abf84

5 files changed

Lines changed: 226 additions & 6 deletions

File tree

lib/ProcessContainer.js

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,25 @@
1111

1212
var cluster = require('cluster');
1313
if (cluster.isWorker) {
14-
cluster.worker.on('error', function () {});
14+
// A handler must stay registered so a worker channel 'error' never becomes
15+
// an unhandled throw that crashes the process during boot. But it must not
16+
// be silently discarded (regression: hides EBADF / channel faults) — forward
17+
// it to the daemon via the existing process:exception path and log it.
18+
cluster.worker.on('error', function (err) {
19+
try {
20+
if (typeof process.send === 'function' && process.connected) {
21+
process.send({
22+
type: 'process:exception',
23+
data: {
24+
message: err && err.message,
25+
stack: err && err.stack,
26+
syscall: 'cluster.worker'
27+
}
28+
});
29+
}
30+
} catch (e) { /* channel already gone */ }
31+
console.error('[PM2][cluster] worker channel error:', err && (err.stack || err.message || err));
32+
});
1533
}
1634

1735
var p = require('path');

lib/ProcessContainerBun.js

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,25 @@
66

77
var cluster = require('cluster');
88
if (cluster.isWorker) {
9-
cluster.worker.on('error', function () {});
9+
// A handler must stay registered so a worker channel 'error' never becomes
10+
// an unhandled throw that crashes the process during boot. But it must not
11+
// be silently discarded (regression: hides EBADF / channel faults) — forward
12+
// it to the daemon via the existing process:exception path and log it.
13+
cluster.worker.on('error', function (err) {
14+
try {
15+
if (typeof process.send === 'function' && process.connected) {
16+
process.send({
17+
type: 'process:exception',
18+
data: {
19+
message: err && err.message,
20+
stack: err && err.stack,
21+
syscall: 'cluster.worker'
22+
}
23+
});
24+
}
25+
} catch (e) { /* channel already gone */ }
26+
console.error('[PM2][cluster] worker channel error:', err && (err.stack || err.message || err));
27+
});
1028
}
1129

1230
var p = require('path');

modules/pm2-io-bpm/constants.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
'use strict'
22

33
module.exports = {
4-
METRIC_INTERVAL: 990
4+
METRIC_INTERVAL: 990,
5+
// Max number of in-flight (un-acked) IPC sends before back-pressure kicks
6+
// in and further messages are dropped. Bounds retained callbacks / queued
7+
// messages so a saturated-but-connected pipe cannot grow memory unbounded.
8+
IPC_MAX_INFLIGHT: 100
59
}
610

711
module.exports.canUseInspector = function canUseInspector () {
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
'use strict'
2+
3+
/**
4+
* Regression tests for commit ebd629ef ("Silence cluster worker events to
5+
* prevent boot crashes").
6+
*
7+
* That commit rewrote IPCTransport.send() so that every broken-channel
8+
* failure path is silently swallowed:
9+
*
10+
* - process.connected === false -> `return -1` (was: console.error + exit)
11+
* - process.send(msg, cb) async -> debug logger only (was: synchronous send)
12+
* - process.send() throws -> debug logger only (was: re-thrown / exit)
13+
*
14+
* Result: when the IPC pipe to the PM2 daemon breaks, BPM keeps emitting
15+
* metrics into the void with zero observable signal. This is the mechanism
16+
* that aggravates the unbounded-memory growth (#6101) and masks the EBADF
17+
* channel fault into a confusing silent symptom (#6111).
18+
*
19+
* Contract under test: IPCTransport is an EventEmitter. A send failure on a
20+
* broken channel MUST be surfaced to observers via an 'error' event — not
21+
* swallowed, and not by killing the whole process (process.exit was itself
22+
* the boot-crash the commit tried to fix). These tests are expected to FAIL
23+
* against the current implementation (TDD).
24+
*/
25+
26+
const assert = require('assert')
27+
const { IPCTransport } = require('../../transports/IPCTransport')
28+
const { IPC_MAX_INFLIGHT } = require('../../constants')
29+
30+
describe('IPCTransport — broken channel must surface, not swallow (regression #6101/#6111)', function () {
31+
let originalSend
32+
let connectedDescriptor
33+
34+
beforeEach(() => {
35+
originalSend = process.send
36+
connectedDescriptor = Object.getOwnPropertyDescriptor(process, 'connected')
37+
})
38+
39+
afterEach(() => {
40+
process.send = originalSend
41+
if (connectedDescriptor) {
42+
Object.defineProperty(process, 'connected', connectedDescriptor)
43+
} else {
44+
delete process.connected
45+
}
46+
})
47+
48+
const stubConnected = (value) => {
49+
Object.defineProperty(process, 'connected', {
50+
value,
51+
configurable: true,
52+
writable: true
53+
})
54+
}
55+
56+
it('emits an "error" event when the async process.send callback fails', function () {
57+
stubConnected(true)
58+
// Synchronous callback so the test owns no async work past its lifetime.
59+
process.send = function (_msg, cb) {
60+
cb(new Error('channel closed (EPIPE)'))
61+
}
62+
63+
const transport = new IPCTransport()
64+
let surfaced = null
65+
transport.on('error', (err) => { surfaced = err })
66+
67+
transport.send('axm:monitor', { cpu: 1 })
68+
69+
// Current code only calls this.logger(...) in the callback — no 'error'
70+
// event is ever emitted, so `surfaced` stays null and this fails.
71+
assert.ok(surfaced instanceof Error,
72+
'async send failure must be surfaced via an "error" event, not swallowed')
73+
})
74+
75+
it('emits an "error" event when the channel is disconnected instead of silently dropping', function () {
76+
stubConnected(false)
77+
process.send = function () { /* present but channel is dead */ }
78+
79+
const transport = new IPCTransport()
80+
let surfaced = null
81+
transport.on('error', (err) => { surfaced = err })
82+
83+
transport.send('axm:monitor', { cpu: 1 })
84+
85+
// The contract is observability — a disconnected channel must surface an
86+
// 'error' event. The -1 return value is an intentionally retained sentinel
87+
// (callers like setOptions() depend on it); it is not asserted here.
88+
assert.ok(surfaced instanceof Error,
89+
'disconnected channel must be observable via an "error" event')
90+
})
91+
92+
it('emits an "error" event when process.send throws synchronously', function () {
93+
stubConnected(true)
94+
process.send = function () {
95+
throw new Error('Channel closed')
96+
}
97+
98+
const transport = new IPCTransport()
99+
let surfaced = null
100+
transport.on('error', (err) => { surfaced = err })
101+
102+
transport.send('axm:monitor', { cpu: 1 })
103+
104+
// Current code swallows the throw into this.logger(...) only.
105+
assert.ok(surfaced instanceof Error,
106+
'thrown send error must be surfaced via an "error" event, not swallowed')
107+
})
108+
109+
it('bounds in-flight sends under back-pressure instead of growing unbounded (#6101)', function () {
110+
stubConnected(true)
111+
let sendCalls = 0
112+
// Saturated-but-connected pipe: process.send accepts the message but the
113+
// ack callback never fires (libuv write buffer full). The retained
114+
// callbacks must NOT accumulate without bound.
115+
process.send = function () { sendCalls++ }
116+
117+
const transport = new IPCTransport()
118+
const surfaced = []
119+
transport.on('error', (err) => { surfaced.push(err) })
120+
121+
const attempts = IPC_MAX_INFLIGHT + 5
122+
for (let i = 0; i < attempts; i++) {
123+
transport.send('axm:monitor', { i })
124+
}
125+
126+
assert.strictEqual(sendCalls, IPC_MAX_INFLIGHT,
127+
`at most ${IPC_MAX_INFLIGHT} messages may be in flight; excess must be dropped, not queued`)
128+
assert.strictEqual(transport._inflight, IPC_MAX_INFLIGHT,
129+
'in-flight counter must be capped at IPC_MAX_INFLIGHT')
130+
assert.strictEqual(transport._dropped, attempts - IPC_MAX_INFLIGHT,
131+
'every over-cap message must be counted as dropped')
132+
assert.strictEqual(surfaced.length, 1,
133+
'back-pressure must be surfaced exactly once (edge-triggered), not per dropped message')
134+
assert.ok(surfaced[0] instanceof Error,
135+
'back-pressure must be surfaced as an Error')
136+
})
137+
})

modules/pm2-io-bpm/transports/IPCTransport.js

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
const cluster = require('cluster')
44
const Debug = require('debug')
55
const EventEmitter = require('events').EventEmitter
6+
const { IPC_MAX_INFLIGHT } = require('../constants')
67

78
class IPCTransport extends EventEmitter {
89
constructor () {
@@ -11,6 +12,20 @@ class IPCTransport extends EventEmitter {
1112
this.logger = Debug('axm:transport:ipc')
1213
this.onMessage = undefined
1314
this.autoExitHandle = undefined
15+
this._inflight = 0
16+
this._dropped = 0
17+
this._saturated = false
18+
}
19+
20+
// A broken IPC channel must be observable, never silently swallowed
21+
// (regression #6101/#6111). Emitting 'error' with no listener would itself
22+
// crash the process, so guard with a listener count and fall back to stderr.
23+
_surface (err) {
24+
if (this.listenerCount('error') > 0) {
25+
this.emit('error', err)
26+
} else {
27+
console.error('[pm2-io-bpm] IPC transport: ' + (err && err.message))
28+
}
1429
}
1530

1631
init (config) {
@@ -85,16 +100,44 @@ class IPCTransport extends EventEmitter {
85100
}
86101

87102
send (channel, payload) {
88-
if (typeof process.send !== 'function') return -1
89-
if (process.connected === false) return -1
103+
if (typeof process.send !== 'function') {
104+
this._surface(new Error('IPC send unavailable: process.send is not a function'))
105+
return -1
106+
}
107+
if (process.connected === false) {
108+
this._surface(new Error('IPC channel disconnected (process.connected === false)'))
109+
return -1
110+
}
111+
112+
// Back-pressure guard: bound retained callbacks / queued messages so a
113+
// saturated-but-connected pipe cannot grow memory unbounded (#6101).
114+
if (this._inflight >= IPC_MAX_INFLIGHT) {
115+
this._dropped++
116+
if (this._saturated === false) {
117+
this._saturated = true
118+
this._surface(new Error(`IPC back-pressure: channel saturated (${this._inflight} in-flight), dropping messages`))
119+
}
120+
return -1
121+
}
90122

91123
this.logger(`Send on channel ${channel}`)
124+
this._inflight++
92125
try {
93126
process.send({ type: channel, data: payload }, (err) => {
94-
if (err) this.logger('async send failed: %s', err && err.code)
127+
this._inflight--
128+
if (this._saturated === true && this._inflight < IPC_MAX_INFLIGHT) {
129+
this._saturated = false
130+
this.logger('IPC back-pressure cleared (dropped %d total)', this._dropped)
131+
}
132+
if (err) {
133+
this.logger('async send failed: %s', err && err.code)
134+
this._surface(err)
135+
}
95136
})
96137
} catch (err) {
138+
this._inflight--
97139
this.logger('Process disconnected from parent: %s', err && err.message)
140+
this._surface(err)
98141
}
99142
}
100143

0 commit comments

Comments
 (0)