Skip to content

Commit 4fbb171

Browse files
committed
child_process: watch child_process stdin pipe peer close event
Fixes: #25131
1 parent 052aec7 commit 4fbb171

File tree

5 files changed

+182
-32
lines changed

5 files changed

+182
-32
lines changed

lib/internal/child_process.js

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ const handleConversion = {
176176
// waiting for the NODE_HANDLE_ACK of the current passing handle.
177177
assert(!target._pendingMessage);
178178
target._pendingMessage =
179-
{ callback, message, handle, options, retransmissions: 0 };
179+
{ callback, message, handle, options, retransmissions: 0 };
180180
} else {
181181
handle.close();
182182
}
@@ -332,8 +332,15 @@ function flushStdio(subprocess) {
332332
}
333333

334334

335-
function createSocket(pipe, readable) {
336-
return net.Socket({ handle: pipe, readable });
335+
function createSocket(pipe, readable, watchPeerClose) {
336+
const sock = net.Socket({ handle: pipe, readable });
337+
if (watchPeerClose &&
338+
process.platform !== 'win32' &&
339+
typeof pipe?.watchPeerClose === 'function') {
340+
pipe.watchPeerClose(() => sock.destroy());
341+
sock.once('close', () => pipe.unwatchPeerClose?.());
342+
}
343+
return sock;
337344
}
338345

339346

@@ -368,7 +375,7 @@ ChildProcess.prototype.spawn = function spawn(options) {
368375

369376

370377
validateOneOf(options.serialization, 'options.serialization',
371-
[undefined, 'json', 'advanced']);
378+
[undefined, 'json', 'advanced']);
372379
const serialization = options.serialization || 'json';
373380

374381
if (ipc !== undefined) {
@@ -380,7 +387,7 @@ ChildProcess.prototype.spawn = function spawn(options) {
380387

381388
ArrayPrototypePush(options.envPairs, `NODE_CHANNEL_FD=${ipcFd}`);
382389
ArrayPrototypePush(options.envPairs,
383-
`NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`);
390+
`NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`);
384391
}
385392

386393
validateString(options.file, 'options.file');
@@ -401,10 +408,10 @@ ChildProcess.prototype.spawn = function spawn(options) {
401408

402409
// Run-time errors should emit an error, not throw an exception.
403410
if (err === UV_EACCES ||
404-
err === UV_EAGAIN ||
405-
err === UV_EMFILE ||
406-
err === UV_ENFILE ||
407-
err === UV_ENOENT) {
411+
err === UV_EAGAIN ||
412+
err === UV_EMFILE ||
413+
err === UV_ENFILE ||
414+
err === UV_ENOENT) {
408415
if (childProcessSpawn.hasSubscribers) {
409416
childProcessSpawn.error.publish({
410417
process: this,
@@ -472,9 +479,9 @@ ChildProcess.prototype.spawn = function spawn(options) {
472479

473480
if (stream.handle) {
474481
stream.socket = createSocket(this.pid !== 0 ?
475-
stream.handle : null, i > 0);
482+
stream.handle : null, i > 0, i == 0);
476483

477-
if (i > 0 && this.pid !== 0) {
484+
if (i > 0 && this.pid !== 0) {
478485
this._closesNeeded++;
479486
stream.socket.on('close', () => {
480487
maybeClose(this);
@@ -494,7 +501,7 @@ ChildProcess.prototype.spawn = function spawn(options) {
494501

495502
for (i = 0; i < stdio.length; i++)
496503
ArrayPrototypePush(this.stdio,
497-
stdio[i].socket === undefined ? null : stdio[i].socket);
504+
stdio[i].socket === undefined ? null : stdio[i].socket);
498505

499506
// Add .send() method and start listening for IPC data
500507
if (ipc !== undefined) setupChannel(this, ipc, serialization);
@@ -660,16 +667,16 @@ function setupChannel(target, channel, serializationMode) {
660667
target.on('internalMessage', function(message, handle) {
661668
// Once acknowledged - continue sending handles.
662669
if (message.cmd === 'NODE_HANDLE_ACK' ||
663-
message.cmd === 'NODE_HANDLE_NACK') {
670+
message.cmd === 'NODE_HANDLE_NACK') {
664671

665672
if (target._pendingMessage) {
666673
if (message.cmd === 'NODE_HANDLE_ACK') {
667674
closePendingHandle(target);
668675
} else if (target._pendingMessage.retransmissions++ ===
669-
MAX_HANDLE_RETRANSMISSIONS) {
676+
MAX_HANDLE_RETRANSMISSIONS) {
670677
closePendingHandle(target);
671678
process.emitWarning('Handle did not reach the receiving process ' +
672-
'correctly', 'SentHandleNotReceivedWarning');
679+
'correctly', 'SentHandleNotReceivedWarning');
673680
}
674681
}
675682

@@ -679,9 +686,9 @@ function setupChannel(target, channel, serializationMode) {
679686

680687
if (target._pendingMessage) {
681688
target._send(target._pendingMessage.message,
682-
target._pendingMessage.handle,
683-
target._pendingMessage.options,
684-
target._pendingMessage.callback);
689+
target._pendingMessage.handle,
690+
target._pendingMessage.options,
691+
target._pendingMessage.callback);
685692
}
686693

687694
for (let i = 0; i < queue.length; i++) {
@@ -778,9 +785,9 @@ function setupChannel(target, channel, serializationMode) {
778785
// will result in error message that is weakly consumable.
779786
// So perform a final check on message prior to sending.
780787
if (typeof message !== 'string' &&
781-
typeof message !== 'object' &&
782-
typeof message !== 'number' &&
783-
typeof message !== 'boolean') {
788+
typeof message !== 'object' &&
789+
typeof message !== 'number' &&
790+
typeof message !== 'boolean') {
784791
throw new ERR_INVALID_ARG_TYPE(
785792
'message', ['string', 'object', 'number', 'boolean'], message);
786793
}
@@ -841,8 +848,8 @@ function setupChannel(target, channel, serializationMode) {
841848
handle.setSimultaneousAccepts(true);
842849
}
843850
} else if (this._handleQueue &&
844-
!(message && (message.cmd === 'NODE_HANDLE_ACK' ||
845-
message.cmd === 'NODE_HANDLE_NACK'))) {
851+
!(message && (message.cmd === 'NODE_HANDLE_ACK' ||
852+
message.cmd === 'NODE_HANDLE_NACK'))) {
846853
// Queue request anyway to avoid out-of-order messages.
847854
ArrayPrototypePush(this._handleQueue, {
848855
callback: callback,
@@ -979,11 +986,11 @@ function setupChannel(target, channel, serializationMode) {
979986
const INTERNAL_PREFIX = 'NODE_';
980987
function isInternal(message) {
981988
return (message !== null &&
982-
typeof message === 'object' &&
983-
typeof message.cmd === 'string' &&
984-
message.cmd.length > INTERNAL_PREFIX.length &&
985-
StringPrototypeSlice(message.cmd, 0, INTERNAL_PREFIX.length) ===
986-
INTERNAL_PREFIX);
989+
typeof message === 'object' &&
990+
typeof message.cmd === 'string' &&
991+
message.cmd.length > INTERNAL_PREFIX.length &&
992+
StringPrototypeSlice(message.cmd, 0, INTERNAL_PREFIX.length) ===
993+
INTERNAL_PREFIX);
987994
}
988995

989996
const nop = FunctionPrototype;
@@ -1021,7 +1028,7 @@ function getValidStdio(stdio, sync) {
10211028
if (stdio === 'ignore') {
10221029
ArrayPrototypePush(acc, { type: 'ignore' });
10231030
} else if (stdio === 'pipe' || stdio === 'overlapped' ||
1024-
(typeof stdio === 'number' && stdio < 0)) {
1031+
(typeof stdio === 'number' && stdio < 0)) {
10251032
const a = {
10261033
type: stdio === 'overlapped' ? 'overlapped' : 'pipe',
10271034
readable: i === 0,
@@ -1061,7 +1068,7 @@ function getValidStdio(stdio, sync) {
10611068
fd: typeof stdio === 'number' ? stdio : stdio.fd,
10621069
});
10631070
} else if (getHandleWrapType(stdio) || getHandleWrapType(stdio.handle) ||
1064-
getHandleWrapType(stdio._handle)) {
1071+
getHandleWrapType(stdio._handle)) {
10651072
const handle = getHandleWrapType(stdio) ?
10661073
stdio :
10671074
getHandleWrapType(stdio.handle) ? stdio.handle : stdio._handle;

src/pipe_wrap.cc

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "handle_wrap.h"
2929
#include "node.h"
3030
#include "node_buffer.h"
31+
#include "node_errors.h"
3132
#include "node_external_reference.h"
3233
#include "stream_base-inl.h"
3334
#include "stream_wrap.h"
@@ -80,6 +81,8 @@ void PipeWrap::Initialize(Local<Object> target,
8081
SetProtoMethod(isolate, t, "listen", Listen);
8182
SetProtoMethod(isolate, t, "connect", Connect);
8283
SetProtoMethod(isolate, t, "open", Open);
84+
SetProtoMethod(isolate, t, "watchPeerClose", WatchPeerClose);
85+
SetProtoMethod(isolate, t, "unwatchPeerClose", UnwatchPeerClose);
8386

8487
#ifdef _WIN32
8588
SetProtoMethod(isolate, t, "setPendingInstances", SetPendingInstances);
@@ -110,6 +113,8 @@ void PipeWrap::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
110113
registry->Register(Listen);
111114
registry->Register(Connect);
112115
registry->Register(Open);
116+
registry->Register(WatchPeerClose);
117+
registry->Register(UnwatchPeerClose);
113118
#ifdef _WIN32
114119
registry->Register(SetPendingInstances);
115120
#endif
@@ -159,6 +164,11 @@ PipeWrap::PipeWrap(Environment* env,
159164
// Suggestion: uv_pipe_init() returns void.
160165
}
161166

167+
PipeWrap::~PipeWrap() {
168+
peer_close_watching_ = false;
169+
peer_close_cb_.Reset();
170+
}
171+
162172
void PipeWrap::Bind(const FunctionCallbackInfo<Value>& args) {
163173
PipeWrap* wrap;
164174
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This());
@@ -213,6 +223,96 @@ void PipeWrap::Open(const FunctionCallbackInfo<Value>& args) {
213223
args.GetReturnValue().Set(err);
214224
}
215225

226+
void PipeWrap::WatchPeerClose(const FunctionCallbackInfo<Value>& args) {
227+
PipeWrap* wrap;
228+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This());
229+
230+
if (!wrap->IsAlive()) {
231+
return args.GetReturnValue().Set(UV_EBADF);
232+
}
233+
234+
if (wrap->peer_close_watching_) {
235+
return args.GetReturnValue().Set(0);
236+
}
237+
238+
CHECK_GT(args.Length(), 0);
239+
CHECK(args[0]->IsFunction());
240+
241+
Environment* env = wrap->env();
242+
Isolate* isolate = env->isolate();
243+
244+
// Store the JS callback securely so it isn't garbage collected.
245+
wrap->peer_close_cb_.Reset(isolate, args[0].As<Function>());
246+
wrap->peer_close_watching_ = true;
247+
248+
// Start reading to detect EOF/ECONNRESET from the peer.
249+
// We use our custom allocator and reader, ignoring actual data.
250+
int err = uv_read_start(wrap->stream(), PeerCloseAlloc, PeerCloseRead);
251+
if (err != 0) {
252+
wrap->peer_close_watching_ = false;
253+
wrap->peer_close_cb_.Reset();
254+
}
255+
args.GetReturnValue().Set(err);
256+
}
257+
258+
void PipeWrap::UnwatchPeerClose(const FunctionCallbackInfo<Value>& args) {
259+
PipeWrap* wrap;
260+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This());
261+
262+
if (!wrap->peer_close_watching_) {
263+
wrap->peer_close_cb_.Reset();
264+
return args.GetReturnValue().Set(0);
265+
}
266+
267+
// Stop listening and release the JS callback to prevent memory leaks.
268+
wrap->peer_close_watching_ = false;
269+
wrap->peer_close_cb_.Reset();
270+
args.GetReturnValue().Set(uv_read_stop(wrap->stream()));
271+
}
272+
273+
void PipeWrap::PeerCloseAlloc(uv_handle_t* handle,
274+
size_t suggested_size,
275+
uv_buf_t* buf) {
276+
// We only care about EOF, not the actual data.
277+
// Using a static 1-byte buffer avoids dynamic memory allocation overhead.
278+
static char scratch;
279+
*buf = uv_buf_init(&scratch, 1);
280+
}
281+
282+
void PipeWrap::PeerCloseRead(uv_stream_t* stream,
283+
ssize_t nread,
284+
const uv_buf_t* buf) {
285+
PipeWrap* wrap = static_cast<PipeWrap*>(stream->data);
286+
if (wrap == nullptr || !wrap->peer_close_watching_) return;
287+
288+
// Ignore actual data reads or EAGAIN (0). We only watch for disconnects.
289+
if (nread > 0 || nread == 0) return;
290+
291+
// Wait specifically for EOF or connection reset (peer closed).
292+
if (nread != UV_EOF && nread != UV_ECONNRESET) return;
293+
294+
// Peer has closed the connection. Stop reading immediately.
295+
wrap->peer_close_watching_ = false;
296+
uv_read_stop(stream);
297+
298+
if (wrap->peer_close_cb_.IsEmpty()) return;
299+
Environment* env = wrap->env();
300+
Isolate* isolate = env->isolate();
301+
302+
// Set up V8 context and handles to safely execute the JS callback.
303+
v8::HandleScope handle_scope(isolate);
304+
v8::Context::Scope context_scope(env->context());
305+
Local<Function> cb = wrap->peer_close_cb_.Get(isolate);
306+
// Reset before calling to prevent re-entrancy issues
307+
wrap->peer_close_cb_.Reset();
308+
309+
errors::TryCatchScope try_catch(env);
310+
try_catch.SetVerbose(true);
311+
312+
// MakeCallback properly tracks AsyncHooks context and flushes microtasks.
313+
wrap->MakeCallback(cb, 0, nullptr);
314+
}
315+
216316
void PipeWrap::Connect(const FunctionCallbackInfo<Value>& args) {
217317
Environment* env = Environment::GetCurrent(args);
218318

@@ -252,7 +352,6 @@ void PipeWrap::Connect(const FunctionCallbackInfo<Value>& args) {
252352

253353
args.GetReturnValue().Set(err);
254354
}
255-
256355
} // namespace node
257356

258357
NODE_BINDING_CONTEXT_AWARE_INTERNAL(pipe_wrap, node::PipeWrap::Initialize)

src/pipe_wrap.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class PipeWrap : public ConnectionWrap<PipeWrap, uv_pipe_t> {
5454
SET_SELF_SIZE(PipeWrap)
5555

5656
private:
57+
~PipeWrap() override;
5758
PipeWrap(Environment* env,
5859
v8::Local<v8::Object> object,
5960
ProviderType provider,
@@ -64,12 +65,23 @@ class PipeWrap : public ConnectionWrap<PipeWrap, uv_pipe_t> {
6465
static void Listen(const v8::FunctionCallbackInfo<v8::Value>& args);
6566
static void Connect(const v8::FunctionCallbackInfo<v8::Value>& args);
6667
static void Open(const v8::FunctionCallbackInfo<v8::Value>& args);
68+
static void WatchPeerClose(const v8::FunctionCallbackInfo<v8::Value>& args);
69+
static void UnwatchPeerClose(const v8::FunctionCallbackInfo<v8::Value>& args);
70+
static void PeerCloseAlloc(uv_handle_t* handle,
71+
size_t suggested_size,
72+
uv_buf_t* buf);
73+
static void PeerCloseRead(uv_stream_t* stream,
74+
ssize_t nread,
75+
const uv_buf_t* buf);
6776

6877
#ifdef _WIN32
6978
static void SetPendingInstances(
7079
const v8::FunctionCallbackInfo<v8::Value>& args);
7180
#endif
7281
static void Fchmod(const v8::FunctionCallbackInfo<v8::Value>& args);
82+
83+
bool peer_close_watching_ = false;
84+
v8::Global<v8::Function> peer_close_cb_;
7385
};
7486

7587

test/async-hooks/test-pipewrap.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ const processwrap = processes[0];
3535
const pipe1 = pipes[0];
3636
const pipe2 = pipes[1];
3737
const pipe3 = pipes[2];
38+
const pipe1ExpectedInvocations = process.platform === 'win32' ? 1 : 2;
3839

3940
assert.strictEqual(processwrap.type, 'PROCESSWRAP');
4041
assert.strictEqual(processwrap.triggerAsyncId, 1);
@@ -83,7 +84,11 @@ function onexit() {
8384
// Usually it is just one event, but it can be more.
8485
assert.ok(ioEvents >= 3, `at least 3 stdout io events, got ${ioEvents}`);
8586

86-
checkInvocations(pipe1, { init: 1, before: 1, after: 1 },
87+
checkInvocations(pipe1, {
88+
init: 1,
89+
before: pipe1ExpectedInvocations,
90+
after: pipe1ExpectedInvocations,
91+
},
8792
'pipe wrap when sleep.spawn was called');
8893
checkInvocations(pipe2, { init: 1, before: ioEvents, after: ioEvents },
8994
'pipe wrap when sleep.spawn was called');
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { spawn } = require('child_process');
6+
7+
if (common.isWindows) {
8+
common.skip('Not applicable on Windows');
9+
}
10+
11+
const child = spawn(process.execPath, [
12+
'-e',
13+
'require("fs").closeSync(0); setTimeout(() => {}, 2000)',
14+
], { stdio: ['pipe', 'ignore', 'ignore'] });
15+
16+
const timeout = setTimeout(() => {
17+
assert.fail('stdin close event was not emitted');
18+
}, 1000);
19+
20+
child.stdin.on('close', common.mustCall(() => {
21+
clearTimeout(timeout);
22+
child.kill();
23+
}));
24+
25+
child.on('exit', common.mustCall(() => {
26+
clearTimeout(timeout);
27+
}));

0 commit comments

Comments
 (0)