Skip to content

Commit d39eb82

Browse files
Merge remote-tracking branch 'upstream/master' into hotfix/fix_3091
# Conflicts: # test/esm/integration/pool-cluster/test-promise-wrapper.test.mjs
2 parents 7c728ec + f703a97 commit d39eb82

13 files changed

Lines changed: 589 additions & 97 deletions

File tree

lib/pool_cluster.js

Lines changed: 128 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,30 @@ const EventEmitter = require('events').EventEmitter;
1313
const makeSelector = {
1414
RR() {
1515
let index = 0;
16-
return clusterIds => clusterIds[index++ % clusterIds.length];
16+
return (clusterIds) => clusterIds[index++ % clusterIds.length];
1717
},
1818
RANDOM() {
19-
return clusterIds =>
19+
return (clusterIds) =>
2020
clusterIds[Math.floor(Math.random() * clusterIds.length)];
2121
},
2222
ORDER() {
23-
return clusterIds => clusterIds[0];
23+
return (clusterIds) => clusterIds[0];
2424
}
2525
};
2626

27+
const getMonotonicMilliseconds = function () {
28+
let ms;
29+
30+
if (typeof process.hrtime === 'function') {
31+
ms = process.hrtime();
32+
ms = ms[0] * 1e3 + ms[1] * 1e-6;
33+
} else {
34+
ms = process.uptime() * 1000;
35+
}
36+
37+
return Math.floor(ms);
38+
};
39+
2740
class PoolNamespace {
2841
constructor(cluster, pattern, selector) {
2942
this._cluster = cluster;
@@ -34,15 +47,28 @@ class PoolNamespace {
3447
getConnection(cb) {
3548
const clusterNode = this._getClusterNode();
3649
if (clusterNode === null) {
37-
return cb(new Error('Pool does Not exists.'));
50+
let err = new Error('Pool does Not exist.');
51+
err.code = 'POOL_NOEXIST';
52+
53+
if (this._cluster._findNodeIds(this._pattern, true).length !== 0) {
54+
err = new Error('Pool does Not have online node.');
55+
err.code = 'POOL_NONEONLINE';
56+
}
57+
58+
return cb(err);
3859
}
3960
return this._cluster._getConnection(clusterNode, (err, connection) => {
4061
if (err) {
62+
if (
63+
this._cluster._canRetry &&
64+
this._cluster._findNodeIds(this._pattern).length !== 0
65+
) {
66+
this._cluster.emit('warn', err);
67+
return this.getConnection(cb);
68+
}
69+
4170
return cb(err);
4271
}
43-
if (connection === 'retry') {
44-
return this.getConnection(cb);
45-
}
4672
return cb(null, connection);
4773
});
4874
}
@@ -123,6 +149,7 @@ class PoolCluster extends EventEmitter {
123149
this._canRetry =
124150
typeof config.canRetry === 'undefined' ? true : config.canRetry;
125151
this._removeNodeErrorCount = config.removeNodeErrorCount || 5;
152+
this._restoreNodeTimeout = config.restoreNodeTimeout || 0;
126153
this._defaultSelector = config.defaultSelector || 'RR';
127154
this._closed = false;
128155
this._lastId = 0;
@@ -155,13 +182,26 @@ class PoolCluster extends EventEmitter {
155182
this._nodes[id] = {
156183
id: id,
157184
errorCount: 0,
158-
pool: new Pool({ config: new PoolConfig(config) })
185+
pool: new Pool({ config: new PoolConfig(config) }),
186+
_offlineUntil: 0
159187
};
160188
this._serviceableNodeIds.push(id);
161189
this._clearFindCaches();
162190
}
163191
}
164192

193+
remove(pattern) {
194+
const foundNodeIds = this._findNodeIds(pattern, true);
195+
196+
for (let i = 0; i < foundNodeIds.length; i++) {
197+
const node = this._getNode(foundNodeIds[i]);
198+
199+
if (node) {
200+
this._removeNode(node);
201+
}
202+
}
203+
}
204+
165205
getConnection(pattern, selector, cb) {
166206
let namespace;
167207
if (typeof pattern === 'function') {
@@ -181,7 +221,7 @@ class PoolCluster extends EventEmitter {
181221
const cb =
182222
callback !== undefined
183223
? callback
184-
: err => {
224+
: (err) => {
185225
if (err) {
186226
throw err;
187227
}
@@ -190,11 +230,12 @@ class PoolCluster extends EventEmitter {
190230
process.nextTick(cb);
191231
return;
192232
}
233+
193234
this._closed = true;
194235

195236
let calledBack = false;
196237
let waitingClose = 0;
197-
const onEnd = err => {
238+
const onEnd = (err) => {
198239
if (!calledBack && (err || --waitingClose <= 0)) {
199240
calledBack = true;
200241
return cb(err);
@@ -205,67 +246,98 @@ class PoolCluster extends EventEmitter {
205246
waitingClose++;
206247
this._nodes[id].pool.end(onEnd);
207248
}
249+
208250
if (waitingClose === 0) {
209251
process.nextTick(onEnd);
210252
}
211253
}
212254

213-
_findNodeIds(pattern) {
214-
if (typeof this._findCaches[pattern] !== 'undefined') {
215-
return this._findCaches[pattern];
216-
}
217-
let foundNodeIds;
218-
if (pattern === '*') {
219-
// all
220-
foundNodeIds = this._serviceableNodeIds;
221-
} else if (this._serviceableNodeIds.indexOf(pattern) !== -1) {
222-
// one
223-
foundNodeIds = [pattern];
224-
} else {
225-
// wild matching
226-
const keyword = pattern.substring(pattern.length - 1, 0);
227-
foundNodeIds = this._serviceableNodeIds.filter(id =>
228-
id.startsWith(keyword)
229-
);
255+
_findNodeIds(pattern, includeOffline) {
256+
let currentTime = 0;
257+
let foundNodeIds = this._findCaches[pattern];
258+
259+
if (typeof this._findCaches[pattern] === 'undefined') {
260+
if (pattern === '*') {
261+
// all
262+
foundNodeIds = this._serviceableNodeIds;
263+
} else if (this._serviceableNodeIds.indexOf(pattern) !== -1) {
264+
// one
265+
foundNodeIds = [pattern];
266+
} else {
267+
// wild matching
268+
const keyword = pattern.substring(pattern.length - 1, 0);
269+
foundNodeIds = this._serviceableNodeIds.filter((id) =>
270+
id.startsWith(keyword)
271+
);
272+
}
230273
}
274+
231275
this._findCaches[pattern] = foundNodeIds;
232-
return foundNodeIds;
276+
277+
if (includeOffline) {
278+
return foundNodeIds;
279+
}
280+
281+
return foundNodeIds.filter((nodeId) => {
282+
const node = this._getNode(nodeId);
283+
284+
if (!node._offlineUntil) {
285+
return true;
286+
}
287+
288+
if (!currentTime) {
289+
currentTime = getMonotonicMilliseconds();
290+
}
291+
292+
return node._offlineUntil <= currentTime;
293+
});
233294
}
234295

235296
_getNode(id) {
236297
return this._nodes[id] || null;
237298
}
238299

239300
_increaseErrorCount(node) {
240-
if (++node.errorCount >= this._removeNodeErrorCount) {
241-
const index = this._serviceableNodeIds.indexOf(node.id);
242-
if (index !== -1) {
243-
this._serviceableNodeIds.splice(index, 1);
244-
delete this._nodes[node.id];
245-
this._clearFindCaches();
246-
node.pool.end();
247-
this.emit('remove', node.id);
248-
}
301+
const errorCount = ++node.errorCount;
302+
303+
if (this._removeNodeErrorCount > errorCount) {
304+
return;
305+
}
306+
307+
if (this._restoreNodeTimeout > 0) {
308+
node._offlineUntil =
309+
getMonotonicMilliseconds() + this._restoreNodeTimeout;
310+
this.emit('offline', node.id);
311+
return;
249312
}
313+
314+
this._removeNode(node);
315+
this.emit('remove', node.id);
250316
}
251317

252318
_decreaseErrorCount(node) {
253-
if (node.errorCount > 0) {
254-
--node.errorCount;
319+
let errorCount = node.errorCount;
320+
321+
if (errorCount > this._removeNodeErrorCount) {
322+
errorCount = this._removeNodeErrorCount;
323+
}
324+
325+
if (errorCount < 1) {
326+
errorCount = 1;
327+
}
328+
329+
node.errorCount = errorCount - 1;
330+
331+
if (node._offlineUntil) {
332+
node._offlineUntil = 0;
333+
this.emit('online', node.id);
255334
}
256335
}
257336

258337
_getConnection(node, cb) {
259338
node.pool.getConnection((err, connection) => {
260339
if (err) {
261340
this._increaseErrorCount(node);
262-
if (this._canRetry) {
263-
// REVIEW: this seems wrong?
264-
this.emit('warn', err);
265-
// eslint-disable-next-line no-console
266-
console.warn(`[Error] PoolCluster : ${err}`);
267-
return cb(null, 'retry');
268-
}
269341
return cb(err);
270342
}
271343
this._decreaseErrorCount(node);
@@ -275,6 +347,16 @@ class PoolCluster extends EventEmitter {
275347
});
276348
}
277349

350+
_removeNode(node) {
351+
const index = this._serviceableNodeIds.indexOf(node.id);
352+
if (index !== -1) {
353+
this._serviceableNodeIds.splice(index, 1);
354+
delete this._nodes[node.id];
355+
this._clearFindCaches();
356+
node.pool.end();
357+
}
358+
}
359+
278360
_clearFindCaches() {
279361
this._findCaches = {};
280362
}

package-lock.json

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

promise.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class PromisePoolCluster extends EventEmitter {
109109
super();
110110
this.poolCluster = poolCluster;
111111
this.Promise = thePromise || Promise;
112-
inheritEvents(poolCluster, this, ['warn', 'remove']);
112+
inheritEvents(poolCluster, this, ['warn', 'remove' , 'online', 'offline']);
113113
}
114114

115115
getConnection(pattern, selector) {
@@ -205,7 +205,7 @@ class PromisePoolCluster extends EventEmitter {
205205
})(func);
206206
}
207207
}
208-
})(['add']);
208+
})(['add', 'remove']);
209209

210210
function createPromisePoolCluster(opts) {
211211
const corePoolCluster = createPoolCluster(opts);

0 commit comments

Comments
 (0)