Skip to content

Commit 2da0a8f

Browse files
committed
Add tests for destroy() after post-response ClientRequest errors
Issue: HD-4352
1 parent 1876d19 commit 2da0a8f

1 file changed

Lines changed: 311 additions & 0 deletions

File tree

tests/unit/hdSocketCleanup.spec.ts

Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
'use strict';
2+
3+
/**
4+
* Regression tests: after the HTTP response callback has run, Node may still emit
5+
* ClientRequest 'error' (e.g. ERR_SOCKET_TIMEOUT on keep-alive / agent sockets).
6+
* HDProxydClient must call request.destroy() so FDs do not accumulate under retries.
7+
*
8+
*/
9+
10+
import * as assert from 'assert';
11+
import * as sinon from 'sinon';
12+
import * as stream from 'stream';
13+
import * as http from 'http';
14+
import { AddressInfo } from 'net';
15+
import { HDProxydClient, HDProxydOptions } from '../../src/hdcontroller';
16+
17+
describe('HDProxydClient — outbound socket cleanup after response', () => {
18+
let sandbox: sinon.SinonSandbox;
19+
let server: http.Server;
20+
21+
beforeEach(() => {
22+
sandbox = sinon.createSandbox();
23+
server = http.createServer((req, res) => {
24+
if (req.url?.startsWith('/store/')) {
25+
res.writeHead(200, { 'scal-key': 'test-key' });
26+
res.end('ok');
27+
return;
28+
}
29+
// eslint-disable-next-line no-param-reassign
30+
res.statusCode = 404;
31+
res.end();
32+
});
33+
});
34+
35+
afterEach(done => {
36+
sandbox.restore();
37+
const s = server as http.Server & { closeAllConnections?: () => void };
38+
if (typeof s.closeAllConnections === 'function') {
39+
s.closeAllConnections();
40+
}
41+
server.close(() => done());
42+
});
43+
44+
function listenServer(cb: (port: number) => void): void {
45+
server.listen(0, '127.0.0.1', () => {
46+
const addr = server.address() as AddressInfo;
47+
cb(addr.port);
48+
});
49+
}
50+
51+
type DestroyCapture = {
52+
destroyCalled: boolean;
53+
destroyCallCount: number;
54+
lastDestroyArg?: unknown;
55+
firstDestroyArgFromStream?: unknown;
56+
};
57+
58+
59+
function stubRequestAndCaptureDestroy(options: {
60+
assignCapture: (cap: DestroyCapture) => void;
61+
postResponseError?: {
62+
code: 'ERR_SOCKET_TIMEOUT' | 'ECONNRESET';
63+
message: string;
64+
};
65+
}): void {
66+
const origRequest = http.request;
67+
sandbox.stub(http, 'request').callsFake(((opts, cb) => {
68+
const req = origRequest(
69+
opts as string | http.RequestOptions,
70+
cb as (res: http.IncomingMessage) => void,
71+
);
72+
const destroyCapture: DestroyCapture = { destroyCalled: false, destroyCallCount: 0 };
73+
options.assignCapture(destroyCapture);
74+
const innerDestroy = req.destroy.bind(req);
75+
req.destroy = (error?: Error) => {
76+
destroyCapture.destroyCalled = true;
77+
destroyCapture.destroyCallCount += 1;
78+
destroyCapture.lastDestroyArg = error;
79+
if (destroyCapture.firstDestroyArgFromStream === undefined) {
80+
destroyCapture.firstDestroyArgFromStream = error;
81+
}
82+
return innerDestroy();
83+
};
84+
if (options.postResponseError) {
85+
req.once('response', () => {
86+
process.nextTick(() => {
87+
const err = new Error(options.postResponseError!.message) as NodeJS.ErrnoException;
88+
err.code = options.postResponseError!.code;
89+
req.emit('error', err);
90+
});
91+
});
92+
}
93+
return req;
94+
}) as typeof http.request);
95+
}
96+
97+
function countAgentSockets(agent: unknown): number {
98+
const a = agent as {
99+
sockets?: Record<string, unknown>;
100+
freeSockets?: Record<string, unknown>;
101+
};
102+
const countPool = (pool?: Record<string, unknown>) => {
103+
if (!pool) {
104+
return 0;
105+
}
106+
return (Object.values(pool) as unknown[]).reduce<number>((sum, entry) => {
107+
if (Array.isArray(entry)) {
108+
return sum + entry.length;
109+
}
110+
return sum + 1;
111+
}, 0);
112+
};
113+
return countPool(a.sockets) + countPool(a.freeSockets);
114+
}
115+
116+
it('calls ClientRequest.destroy when ERR_SOCKET_TIMEOUT fires after response', done => {
117+
let destroyCapture: DestroyCapture | undefined;
118+
let finished = false;
119+
const finish = (e?: Error) => {
120+
if (finished) {return;}
121+
finished = true;
122+
done(e);
123+
};
124+
stubRequestAndCaptureDestroy({
125+
postResponseError: {
126+
code: 'ERR_SOCKET_TIMEOUT',
127+
message: 'simulated idle timeout',
128+
},
129+
assignCapture: (capture: DestroyCapture) => { destroyCapture = capture; },
130+
});
131+
132+
listenServer(port => {
133+
const h = new HDProxydClient({
134+
bootstrap: [`127.0.0.1:${port}`],
135+
} as HDProxydOptions);
136+
h.get('anykey', [], '', (err, st) => {
137+
assert.ifError(err);
138+
assert.ok(st);
139+
const body = st as stream.Readable;
140+
body.on('error', () => {});
141+
body.resume();
142+
setImmediate(() => {
143+
try {
144+
assert.ok(destroyCapture!, 'capture should be set');
145+
assert.ok(destroyCapture!.destroyCalled, 'request.destroy must run for post-response socket error');
146+
assert.strictEqual(destroyCapture!.destroyCallCount, 1, 'request.destroy should run once');
147+
const arg0 = destroyCapture!.lastDestroyArg as NodeJS.ErrnoException;
148+
assert.strictEqual(arg0?.code, 'ERR_SOCKET_TIMEOUT');
149+
h.destroy();
150+
finish();
151+
} catch (e) {
152+
h.destroy();
153+
finish(e as Error);
154+
}
155+
});
156+
});
157+
});
158+
});
159+
160+
it('calls ClientRequest.destroy when a non-timeout error fires after response', done => {
161+
let destroyCapture: DestroyCapture | undefined;
162+
let finished = false;
163+
const finish = (e?: Error) => {
164+
if (finished) { return; }
165+
finished = true;
166+
done(e);
167+
};
168+
stubRequestAndCaptureDestroy({
169+
postResponseError: {
170+
code: 'ECONNRESET',
171+
message: 'simulated reset',
172+
},
173+
assignCapture: (capture: DestroyCapture) => { destroyCapture = capture; },
174+
});
175+
176+
listenServer(port => {
177+
const h = new HDProxydClient({
178+
bootstrap: [`127.0.0.1:${port}`],
179+
} as HDProxydOptions);
180+
h.get('anykey', [], '', (err, st) => {
181+
assert.ifError(err);
182+
assert.ok(st);
183+
const body = st as stream.Readable;
184+
body.on('error', () => {});
185+
body.resume();
186+
setImmediate(() => {
187+
try {
188+
assert.ok(destroyCapture, 'capture should be set');
189+
assert.ok(destroyCapture!.destroyCalled, 'request.destroy must run');
190+
assert.strictEqual(destroyCapture!.destroyCallCount, 1, 'request.destroy should run once');
191+
const arg0 = destroyCapture!.lastDestroyArg as NodeJS.ErrnoException;
192+
assert.strictEqual(arg0?.code, 'ECONNRESET');
193+
h.destroy();
194+
finish();
195+
} catch (e) {
196+
h.destroy();
197+
finish(e as Error);
198+
}
199+
});
200+
});
201+
});
202+
});
203+
204+
it('calls ClientRequest.destroy when upload stream errors mid-transfer (PUT stream path)', done => {
205+
let destroyCapture: DestroyCapture | undefined;
206+
let finished = false;
207+
const finish = (e?: Error) => {
208+
if (finished) {return;}
209+
finished = true;
210+
done(e);
211+
};
212+
const injectedError = new Error('simulated upload stream failure');
213+
stubRequestAndCaptureDestroy({
214+
assignCapture: (capture: DestroyCapture) => { destroyCapture = capture; },
215+
});
216+
217+
listenServer(port => {
218+
const h = new HDProxydClient({
219+
bootstrap: [`127.0.0.1:${port}`],
220+
} as HDProxydOptions);
221+
const upload = new stream.Readable({
222+
read() {
223+
this.push(Buffer.from('hello'));
224+
this.destroy(injectedError);
225+
},
226+
});
227+
h.put(upload, 5, {
228+
bucketName: 'test',
229+
owner: 'test',
230+
namespace: 'zenko',
231+
}, '', () => {
232+
setImmediate(() => {
233+
try {
234+
assert.ok(destroyCapture, 'capture should be set');
235+
assert.ok(destroyCapture!.destroyCalled, 'request.destroy must run for upload stream error');
236+
// Node may issue a second destroy from its internal pipe error handling;
237+
// we are allowing both while requiring our stream error to trigger first.
238+
assert.ok(
239+
destroyCapture!.destroyCallCount === 1 || destroyCapture!.destroyCallCount === 2,
240+
`request.destroy should run once (or twice at most if http request also errors);
241+
callCount=${destroyCapture!.destroyCallCount}`,
242+
);
243+
assert.strictEqual(destroyCapture!.firstDestroyArgFromStream, injectedError);
244+
h.destroy();
245+
finish();
246+
} catch (e) {
247+
h.destroy();
248+
finish(e as Error);
249+
}
250+
});
251+
});
252+
});
253+
});
254+
255+
it('does not accumulate agent sockets across repeated post-response ECONNRESET errors', done => {
256+
let finished = false;
257+
const finish = (e?: Error) => {
258+
if (finished) {return;}
259+
finished = true;
260+
done(e);
261+
};
262+
stubRequestAndCaptureDestroy({
263+
postResponseError: {
264+
code: 'ECONNRESET',
265+
message: 'simulated reset for leak regression',
266+
},
267+
assignCapture: () => {},
268+
});
269+
270+
listenServer(port => {
271+
const h = new HDProxydClient({
272+
bootstrap: [`127.0.0.1:${port}`],
273+
} as HDProxydOptions);
274+
const baselineSockets = countAgentSockets((h as unknown as { httpAgent: unknown }).httpAgent);
275+
let remaining = 20;
276+
277+
const runOnce = () => {
278+
h.get('anykey', [], '', (err, st) => {
279+
if (err) {
280+
h.destroy();
281+
return finish(err);
282+
}
283+
assert.ok(st);
284+
const body = st as stream.Readable;
285+
body.on('error', () => {});
286+
body.resume();
287+
return setImmediate(() => {
288+
remaining -= 1;
289+
if (remaining === 0) {
290+
const afterSockets = countAgentSockets((h as unknown as { httpAgent: unknown }).httpAgent);
291+
try {
292+
assert.ok(
293+
afterSockets <= baselineSockets + 2,
294+
`socket count should stay bounded (baseline=${baselineSockets},
295+
after=${afterSockets})`,
296+
);
297+
h.destroy();
298+
return finish();
299+
} catch (e) {
300+
h.destroy();
301+
return finish(e as Error);
302+
}
303+
}
304+
return runOnce();
305+
});
306+
});
307+
};
308+
runOnce();
309+
});
310+
});
311+
});

0 commit comments

Comments
 (0)