Skip to content

Commit 3d97fda

Browse files
committed
child-process: watch pipe peer close event
1 parent 052aec7 commit 3d97fda

File tree

4 files changed

+146
-1
lines changed

4 files changed

+146
-1
lines changed

lib/internal/child_process.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,14 @@ function flushStdio(subprocess) {
333333

334334

335335
function createSocket(pipe, readable) {
336-
return net.Socket({ handle: pipe, readable });
336+
const socket = net.Socket({ handle: pipe, readable });
337+
if (!readable &&
338+
process.platform !== 'win32' &&
339+
typeof pipe?.watchPeerClose === 'function') {
340+
pipe.watchPeerClose(() => socket.destroy());
341+
socket.once('close', () => pipe.unwatchPeerClose?.());
342+
}
343+
return socket;
337344
}
338345

339346

src/pipe_wrap.cc

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "handle_wrap.h"
2929
#include "node.h"
3030
#include "node_buffer.h"
31+
#include "node_errors.h"
3132
#include "node_external_reference.h"
3233
#include "stream_base-inl.h"
3334
#include "stream_wrap.h"
@@ -80,6 +81,8 @@ void PipeWrap::Initialize(Local<Object> target,
8081
SetProtoMethod(isolate, t, "listen", Listen);
8182
SetProtoMethod(isolate, t, "connect", Connect);
8283
SetProtoMethod(isolate, t, "open", Open);
84+
SetProtoMethod(isolate, t, "watchPeerClose", WatchPeerClose);
85+
SetProtoMethod(isolate, t, "unwatchPeerClose", UnwatchPeerClose);
8386

8487
#ifdef _WIN32
8588
SetProtoMethod(isolate, t, "setPendingInstances", SetPendingInstances);
@@ -110,6 +113,8 @@ void PipeWrap::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
110113
registry->Register(Listen);
111114
registry->Register(Connect);
112115
registry->Register(Open);
116+
registry->Register(WatchPeerClose);
117+
registry->Register(UnwatchPeerClose);
113118
#ifdef _WIN32
114119
registry->Register(SetPendingInstances);
115120
#endif
@@ -159,6 +164,10 @@ PipeWrap::PipeWrap(Environment* env,
159164
// Suggestion: uv_pipe_init() returns void.
160165
}
161166

167+
PipeWrap::~PipeWrap() {
168+
peer_close_watching_ = false;
169+
peer_close_cb_.Reset();
170+
162171
void PipeWrap::Bind(const FunctionCallbackInfo<Value>& args) {
163172
PipeWrap* wrap;
164173
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This());
@@ -213,6 +222,96 @@ void PipeWrap::Open(const FunctionCallbackInfo<Value>& args) {
213222
args.GetReturnValue().Set(err);
214223
}
215224

225+
void PipeWrap::WatchPeerClose(const FunctionCallbackInfo<Value>& args) {
226+
PipeWrap* wrap;
227+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This());
228+
229+
if (!wrap->IsAlive()) {
230+
return args.GetReturnValue().Set(UV_EBADF);
231+
}
232+
233+
if (wrap->peer_close_watching_) {
234+
return args.GetReturnValue().Set(0);
235+
}
236+
237+
CHECK_GT(args.Length(), 0);
238+
CHECK(args[0]->IsFunction());
239+
240+
Environment* env = wrap->env();
241+
Isolate* isolate = env->isolate();
242+
243+
// Store the JS callback securely so it isn't garbage collected.
244+
wrap->peer_close_cb_.Reset(isolate, args[0].As<Function>());
245+
wrap->peer_close_watching_ = true;
246+
247+
// Start reading to detect EOF/ECONNRESET from the peer.
248+
// We use our custom allocator and reader, ignoring actual data.
249+
int err = uv_read_start(wrap->stream(), PeerCloseAlloc, PeerCloseRead);
250+
if (err != 0) {
251+
wrap->peer_close_watching_ = false;
252+
wrap->peer_close_cb_.Reset();
253+
}
254+
args.GetReturnValue().Set(err);
255+
}
256+
257+
void PipeWrap::UnwatchPeerClose(const FunctionCallbackInfo<Value>& args) {
258+
PipeWrap* wrap;
259+
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.This());
260+
261+
if (!wrap->peer_close_watching_) {
262+
wrap->peer_close_cb_.Reset();
263+
return args.GetReturnValue().Set(0);
264+
}
265+
266+
// Stop listening and release the JS callback to prevent memory leaks.
267+
wrap->peer_close_watching_ = false;
268+
wrap->peer_close_cb_.Reset();
269+
args.GetReturnValue().Set(uv_read_stop(wrap->stream()));
270+
}
271+
272+
void PipeWrap::PeerCloseAlloc(uv_handle_t* handle,
273+
size_t suggested_size,
274+
uv_buf_t* buf) {
275+
// We only care about EOF, not the actual data.
276+
// Using a static 1-byte buffer avoids dynamic memory allocation overhead.
277+
static char scratch;
278+
*buf = uv_buf_init(&scratch, 1);
279+
}
280+
281+
void PipeWrap::PeerCloseRead(uv_stream_t* stream,
282+
ssize_t nread,
283+
const uv_buf_t* buf) {
284+
PipeWrap* wrap = static_cast<PipeWrap*>(stream->data);
285+
if (wrap == nullptr || !wrap->peer_close_watching_) return;
286+
287+
// Ignore actual data reads or EAGAIN (0). We only watch for disconnects.
288+
if (nread > 0 || nread == 0) return;
289+
290+
// Wait specifically for EOF or connection reset (peer closed).
291+
if (nread != UV_EOF && nread != UV_ECONNRESET) return;
292+
293+
// Peer has closed the connection. Stop reading immediately.
294+
wrap->peer_close_watching_ = false;
295+
uv_read_stop(stream);
296+
297+
if (wrap->peer_close_cb_.IsEmpty()) return;
298+
Environment* env = wrap->env();
299+
Isolate* isolate = env->isolate();
300+
301+
// Set up V8 context and handles to safely execute the JS callback.
302+
v8::HandleScope handle_scope(isolate);
303+
v8::Context::Scope context_scope(env->context());
304+
Local<Function> cb = wrap->peer_close_cb_.Get(isolate);
305+
// Reset before calling to prevent re-entrancy issues
306+
wrap->peer_close_cb_.Reset();
307+
308+
errors::TryCatchScope try_catch(env);
309+
try_catch.SetVerbose(true);
310+
311+
// MakeCallback properly tracks AsyncHooks context and flushes microtasks.
312+
wrap->MakeCallback(cb, 0, nullptr);
313+
}
314+
216315
void PipeWrap::Connect(const FunctionCallbackInfo<Value>& args) {
217316
Environment* env = Environment::GetCurrent(args);
218317

src/pipe_wrap.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class PipeWrap : public ConnectionWrap<PipeWrap, uv_pipe_t> {
5454
SET_SELF_SIZE(PipeWrap)
5555

5656
private:
57+
~PipeWrap() override;
5758
PipeWrap(Environment* env,
5859
v8::Local<v8::Object> object,
5960
ProviderType provider,
@@ -64,12 +65,23 @@ class PipeWrap : public ConnectionWrap<PipeWrap, uv_pipe_t> {
6465
static void Listen(const v8::FunctionCallbackInfo<v8::Value>& args);
6566
static void Connect(const v8::FunctionCallbackInfo<v8::Value>& args);
6667
static void Open(const v8::FunctionCallbackInfo<v8::Value>& args);
68+
static void WatchPeerClose(const v8::FunctionCallbackInfo<v8::Value>& args);
69+
static void UnwatchPeerClose(const v8::FunctionCallbackInfo<v8::Value>& args);
70+
static void PeerCloseAlloc(uv_handle_t* handle,
71+
size_t suggested_size,
72+
uv_buf_t* buf);
73+
static void PeerCloseRead(uv_stream_t* stream,
74+
ssize_t nread,
75+
const uv_buf_t* buf);
6776

6877
#ifdef _WIN32
6978
static void SetPendingInstances(
7079
const v8::FunctionCallbackInfo<v8::Value>& args);
7180
#endif
7281
static void Fchmod(const v8::FunctionCallbackInfo<v8::Value>& args);
82+
83+
bool peer_close_watching_ = false;
84+
v8::Global<v8::Function> peer_close_cb_;
7385
};
7486

7587

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { spawn } = require('child_process');
6+
7+
if (common.isWindows) {
8+
common.skip('Not applicable on Windows');
9+
}
10+
11+
const child = spawn(process.execPath, [
12+
'-e',
13+
'require("fs").closeSync(0); setTimeout(() => {}, 2000)',
14+
], { stdio: ['pipe', 'ignore', 'ignore'] });
15+
16+
const timeout = setTimeout(() => {
17+
assert.fail('stdin close event was not emitted');
18+
}, 1000);
19+
20+
child.stdin.on('close', common.mustCall(() => {
21+
clearTimeout(timeout);
22+
child.kill();
23+
}));
24+
25+
child.on('exit', common.mustCall(() => {
26+
clearTimeout(timeout);
27+
}));

0 commit comments

Comments
 (0)