Skip to content

Commit 9a38601

Browse files
authored
feat(PoolCluster): restoreNodeTimeout implementation (#3218)
1 parent 2e62d46 commit 9a38601

File tree

10 files changed

+520
-71
lines changed

10 files changed

+520
-71
lines changed

lib/pool_cluster.js

Lines changed: 131 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,30 @@ const EventEmitter = require('events').EventEmitter;
1313
const makeSelector = {
1414
RR() {
1515
let index = 0;
16-
return clusterIds => clusterIds[index++ % clusterIds.length];
16+
return (clusterIds) => clusterIds[index++ % clusterIds.length];
1717
},
1818
RANDOM() {
19-
return clusterIds =>
19+
return (clusterIds) =>
2020
clusterIds[Math.floor(Math.random() * clusterIds.length)];
2121
},
2222
ORDER() {
23-
return clusterIds => clusterIds[0];
23+
return (clusterIds) => clusterIds[0];
2424
}
2525
};
2626

27+
const getMonotonicMilliseconds = function () {
28+
let ms;
29+
30+
if (typeof process.hrtime === 'function') {
31+
ms = process.hrtime();
32+
ms = ms[0] * 1e3 + ms[1] * 1e-6;
33+
} else {
34+
ms = process.uptime() * 1000;
35+
}
36+
37+
return Math.floor(ms);
38+
};
39+
2740
class PoolNamespace {
2841
constructor(cluster, pattern, selector) {
2942
this._cluster = cluster;
@@ -34,15 +47,28 @@ class PoolNamespace {
3447
getConnection(cb) {
3548
const clusterNode = this._getClusterNode();
3649
if (clusterNode === null) {
37-
return cb(new Error('Pool does Not exists.'));
50+
let err = new Error('Pool does Not exist.');
51+
err.code = 'POOL_NOEXIST';
52+
53+
if (this._cluster._findNodeIds(this._pattern, true).length !== 0) {
54+
err = new Error('Pool does Not have online node.');
55+
err.code = 'POOL_NONEONLINE';
56+
}
57+
58+
return cb(err);
3859
}
3960
return this._cluster._getConnection(clusterNode, (err, connection) => {
4061
if (err) {
62+
if (
63+
this._cluster._canRetry &&
64+
this._cluster._findNodeIds(this._pattern).length !== 0
65+
) {
66+
this._cluster.emit('warn', err);
67+
return this.getConnection(cb);
68+
}
69+
4170
return cb(err);
4271
}
43-
if (connection === 'retry') {
44-
return this.getConnection(cb);
45-
}
4672
return cb(null, connection);
4773
});
4874
}
@@ -79,9 +105,9 @@ class PoolNamespace {
79105

80106
/**
81107
* pool cluster execute
82-
* @param {*} sql
83-
* @param {*} values
84-
* @param {*} cb
108+
* @param {*} sql
109+
* @param {*} values
110+
* @param {*} cb
85111
*/
86112
execute(sql, values, cb) {
87113
if (typeof values === 'function') {
@@ -123,6 +149,7 @@ class PoolCluster extends EventEmitter {
123149
this._canRetry =
124150
typeof config.canRetry === 'undefined' ? true : config.canRetry;
125151
this._removeNodeErrorCount = config.removeNodeErrorCount || 5;
152+
this._restoreNodeTimeout = config.restoreNodeTimeout || 0;
126153
this._defaultSelector = config.defaultSelector || 'RR';
127154
this._closed = false;
128155
this._lastId = 0;
@@ -155,13 +182,26 @@ class PoolCluster extends EventEmitter {
155182
this._nodes[id] = {
156183
id: id,
157184
errorCount: 0,
158-
pool: new Pool({ config: new PoolConfig(config) })
185+
pool: new Pool({ config: new PoolConfig(config) }),
186+
_offlineUntil: 0
159187
};
160188
this._serviceableNodeIds.push(id);
161189
this._clearFindCaches();
162190
}
163191
}
164192

193+
remove(pattern) {
194+
const foundNodeIds = this._findNodeIds(pattern, true);
195+
196+
for (let i = 0; i < foundNodeIds.length; i++) {
197+
const node = this._getNode(foundNodeIds[i]);
198+
199+
if (node) {
200+
this._removeNode(node);
201+
}
202+
}
203+
}
204+
165205
getConnection(pattern, selector, cb) {
166206
let namespace;
167207
if (typeof pattern === 'function') {
@@ -181,7 +221,7 @@ class PoolCluster extends EventEmitter {
181221
const cb =
182222
callback !== undefined
183223
? callback
184-
: err => {
224+
: (err) => {
185225
if (err) {
186226
throw err;
187227
}
@@ -190,11 +230,12 @@ class PoolCluster extends EventEmitter {
190230
process.nextTick(cb);
191231
return;
192232
}
233+
193234
this._closed = true;
194235

195236
let calledBack = false;
196237
let waitingClose = 0;
197-
const onEnd = err => {
238+
const onEnd = (err) => {
198239
if (!calledBack && (err || --waitingClose <= 0)) {
199240
calledBack = true;
200241
return cb(err);
@@ -205,67 +246,98 @@ class PoolCluster extends EventEmitter {
205246
waitingClose++;
206247
this._nodes[id].pool.end(onEnd);
207248
}
249+
208250
if (waitingClose === 0) {
209251
process.nextTick(onEnd);
210252
}
211253
}
212254

213-
_findNodeIds(pattern) {
214-
if (typeof this._findCaches[pattern] !== 'undefined') {
215-
return this._findCaches[pattern];
216-
}
217-
let foundNodeIds;
218-
if (pattern === '*') {
219-
// all
220-
foundNodeIds = this._serviceableNodeIds;
221-
} else if (this._serviceableNodeIds.indexOf(pattern) !== -1) {
222-
// one
223-
foundNodeIds = [pattern];
224-
} else {
225-
// wild matching
226-
const keyword = pattern.substring(pattern.length - 1, 0);
227-
foundNodeIds = this._serviceableNodeIds.filter(id =>
228-
id.startsWith(keyword)
229-
);
255+
_findNodeIds(pattern, includeOffline) {
256+
let currentTime = 0;
257+
let foundNodeIds = this._findCaches[pattern];
258+
259+
if (typeof this._findCaches[pattern] === 'undefined') {
260+
if (pattern === '*') {
261+
// all
262+
foundNodeIds = this._serviceableNodeIds;
263+
} else if (this._serviceableNodeIds.indexOf(pattern) !== -1) {
264+
// one
265+
foundNodeIds = [pattern];
266+
} else {
267+
// wild matching
268+
const keyword = pattern.substring(pattern.length - 1, 0);
269+
foundNodeIds = this._serviceableNodeIds.filter((id) =>
270+
id.startsWith(keyword)
271+
);
272+
}
230273
}
274+
231275
this._findCaches[pattern] = foundNodeIds;
232-
return foundNodeIds;
276+
277+
if (includeOffline) {
278+
return foundNodeIds;
279+
}
280+
281+
return foundNodeIds.filter((nodeId) => {
282+
const node = this._getNode(nodeId);
283+
284+
if (!node._offlineUntil) {
285+
return true;
286+
}
287+
288+
if (!currentTime) {
289+
currentTime = getMonotonicMilliseconds();
290+
}
291+
292+
return node._offlineUntil <= currentTime;
293+
});
233294
}
234295

235296
_getNode(id) {
236297
return this._nodes[id] || null;
237298
}
238299

239300
_increaseErrorCount(node) {
240-
if (++node.errorCount >= this._removeNodeErrorCount) {
241-
const index = this._serviceableNodeIds.indexOf(node.id);
242-
if (index !== -1) {
243-
this._serviceableNodeIds.splice(index, 1);
244-
delete this._nodes[node.id];
245-
this._clearFindCaches();
246-
node.pool.end();
247-
this.emit('remove', node.id);
248-
}
301+
const errorCount = ++node.errorCount;
302+
303+
if (this._removeNodeErrorCount > errorCount) {
304+
return;
249305
}
306+
307+
if (this._restoreNodeTimeout > 0) {
308+
node._offlineUntil =
309+
getMonotonicMilliseconds() + this._restoreNodeTimeout;
310+
this.emit('offline', node.id);
311+
return;
312+
}
313+
314+
this._removeNode(node);
315+
this.emit('remove', node.id);
250316
}
251317

252318
_decreaseErrorCount(node) {
253-
if (node.errorCount > 0) {
254-
--node.errorCount;
319+
let errorCount = node.errorCount;
320+
321+
if (errorCount > this._removeNodeErrorCount) {
322+
errorCount = this._removeNodeErrorCount;
323+
}
324+
325+
if (errorCount < 1) {
326+
errorCount = 1;
327+
}
328+
329+
node.errorCount = errorCount - 1;
330+
331+
if (node._offlineUntil) {
332+
node._offlineUntil = 0;
333+
this.emit('online', node.id);
255334
}
256335
}
257336

258337
_getConnection(node, cb) {
259338
node.pool.getConnection((err, connection) => {
260339
if (err) {
261340
this._increaseErrorCount(node);
262-
if (this._canRetry) {
263-
// REVIEW: this seems wrong?
264-
this.emit('warn', err);
265-
// eslint-disable-next-line no-console
266-
console.warn(`[Error] PoolCluster : ${err}`);
267-
return cb(null, 'retry');
268-
}
269341
return cb(err);
270342
}
271343
this._decreaseErrorCount(node);
@@ -275,6 +347,16 @@ class PoolCluster extends EventEmitter {
275347
});
276348
}
277349

350+
_removeNode(node) {
351+
const index = this._serviceableNodeIds.indexOf(node.id);
352+
if (index !== -1) {
353+
this._serviceableNodeIds.splice(index, 1);
354+
delete this._nodes[node.id];
355+
this._clearFindCaches();
356+
node.pool.end();
357+
}
358+
}
359+
278360
_clearFindCaches() {
279361
this._findCaches = {};
280362
}

promise.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class PromisePoolCluster extends EventEmitter {
6060
super();
6161
this.poolCluster = poolCluster;
6262
this.Promise = thePromise || Promise;
63-
inheritEvents(poolCluster, this, ['warn', 'remove']);
63+
inheritEvents(poolCluster, this, ['warn', 'remove' , 'online', 'offline']);
6464
}
6565

6666
getConnection(pattern, selector) {
@@ -156,7 +156,7 @@ class PromisePoolCluster extends EventEmitter {
156156
})(func);
157157
}
158158
}
159-
})(['add']);
159+
})(['add', 'remove']);
160160

161161
function createPromisePoolCluster(opts) {
162162
const corePoolCluster = createPoolCluster(opts);

test/esm/integration/pool-cluster/test-promise-wrapper.test.mjs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,40 @@ const { createPoolCluster } = require('../../../../promise.js');
4343

4444
poolCluster.poolCluster.emit('remove');
4545
});
46+
47+
await test(async () => {
48+
const poolCluster = createPoolCluster();
49+
50+
poolCluster.once('offline', async function () {
51+
await new Promise((resolve) => {
52+
assert.equal(
53+
// eslint-disable-next-line no-invalid-this
54+
this,
55+
poolCluster,
56+
'should propagate offline event to promise wrapper',
57+
);
58+
resolve(true);
59+
});
60+
});
61+
62+
poolCluster.poolCluster.emit('offline');
63+
});
64+
65+
await test(async () => {
66+
const poolCluster = createPoolCluster();
67+
68+
poolCluster.once('online', async function () {
69+
await new Promise((resolve) => {
70+
assert.equal(
71+
// eslint-disable-next-line no-invalid-this
72+
this,
73+
poolCluster,
74+
'should propagate online event to promise wrapper',
75+
);
76+
resolve(true);
77+
});
78+
});
79+
80+
poolCluster.poolCluster.emit('online');
81+
});
4682
})();
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import { mysql } from '../../index.test.js';
2+
3+
const poolCluster = mysql.createPoolCluster();
4+
5+
// Overload: poolCluster.add(group, connectionUri);
6+
poolCluster.remove('cluster1');

0 commit comments

Comments
 (0)