@@ -13,17 +13,30 @@ const EventEmitter = require('events').EventEmitter;
1313const makeSelector = {
1414 RR ( ) {
1515 let index = 0 ;
16- return clusterIds => clusterIds [ index ++ % clusterIds . length ] ;
16+ return ( clusterIds ) => clusterIds [ index ++ % clusterIds . length ] ;
1717 } ,
1818 RANDOM ( ) {
19- return clusterIds =>
19+ return ( clusterIds ) =>
2020 clusterIds [ Math . floor ( Math . random ( ) * clusterIds . length ) ] ;
2121 } ,
2222 ORDER ( ) {
23- return clusterIds => clusterIds [ 0 ] ;
23+ return ( clusterIds ) => clusterIds [ 0 ] ;
2424 }
2525} ;
2626
27+ const getMonotonicMilliseconds = function ( ) {
28+ let ms ;
29+
30+ if ( typeof process . hrtime === 'function' ) {
31+ ms = process . hrtime ( ) ;
32+ ms = ms [ 0 ] * 1e3 + ms [ 1 ] * 1e-6 ;
33+ } else {
34+ ms = process . uptime ( ) * 1000 ;
35+ }
36+
37+ return Math . floor ( ms ) ;
38+ } ;
39+
2740class PoolNamespace {
2841 constructor ( cluster , pattern , selector ) {
2942 this . _cluster = cluster ;
@@ -34,15 +47,28 @@ class PoolNamespace {
3447 getConnection ( cb ) {
3548 const clusterNode = this . _getClusterNode ( ) ;
3649 if ( clusterNode === null ) {
37- return cb ( new Error ( 'Pool does Not exists.' ) ) ;
50+ let err = new Error ( 'Pool does Not exist.' ) ;
51+ err . code = 'POOL_NOEXIST' ;
52+
53+ if ( this . _cluster . _findNodeIds ( this . _pattern , true ) . length !== 0 ) {
54+ err = new Error ( 'Pool does Not have online node.' ) ;
55+ err . code = 'POOL_NONEONLINE' ;
56+ }
57+
58+ return cb ( err ) ;
3859 }
3960 return this . _cluster . _getConnection ( clusterNode , ( err , connection ) => {
4061 if ( err ) {
62+ if (
63+ this . _cluster . _canRetry &&
64+ this . _cluster . _findNodeIds ( this . _pattern ) . length !== 0
65+ ) {
66+ this . _cluster . emit ( 'warn' , err ) ;
67+ return this . getConnection ( cb ) ;
68+ }
69+
4170 return cb ( err ) ;
4271 }
43- if ( connection === 'retry' ) {
44- return this . getConnection ( cb ) ;
45- }
4672 return cb ( null , connection ) ;
4773 } ) ;
4874 }
@@ -79,9 +105,9 @@ class PoolNamespace {
79105
80106 /**
81107 * pool cluster execute
82- * @param {* } sql
83- * @param {* } values
84- * @param {* } cb
108+ * @param {* } sql
109+ * @param {* } values
110+ * @param {* } cb
85111 */
86112 execute ( sql , values , cb ) {
87113 if ( typeof values === 'function' ) {
@@ -123,6 +149,7 @@ class PoolCluster extends EventEmitter {
123149 this . _canRetry =
124150 typeof config . canRetry === 'undefined' ? true : config . canRetry ;
125151 this . _removeNodeErrorCount = config . removeNodeErrorCount || 5 ;
152+ this . _restoreNodeTimeout = config . restoreNodeTimeout || 0 ;
126153 this . _defaultSelector = config . defaultSelector || 'RR' ;
127154 this . _closed = false ;
128155 this . _lastId = 0 ;
@@ -155,13 +182,26 @@ class PoolCluster extends EventEmitter {
155182 this . _nodes [ id ] = {
156183 id : id ,
157184 errorCount : 0 ,
158- pool : new Pool ( { config : new PoolConfig ( config ) } )
185+ pool : new Pool ( { config : new PoolConfig ( config ) } ) ,
186+ _offlineUntil : 0
159187 } ;
160188 this . _serviceableNodeIds . push ( id ) ;
161189 this . _clearFindCaches ( ) ;
162190 }
163191 }
164192
193+ remove ( pattern ) {
194+ const foundNodeIds = this . _findNodeIds ( pattern , true ) ;
195+
196+ for ( let i = 0 ; i < foundNodeIds . length ; i ++ ) {
197+ const node = this . _getNode ( foundNodeIds [ i ] ) ;
198+
199+ if ( node ) {
200+ this . _removeNode ( node ) ;
201+ }
202+ }
203+ }
204+
165205 getConnection ( pattern , selector , cb ) {
166206 let namespace ;
167207 if ( typeof pattern === 'function' ) {
@@ -181,7 +221,7 @@ class PoolCluster extends EventEmitter {
181221 const cb =
182222 callback !== undefined
183223 ? callback
184- : err => {
224+ : ( err ) => {
185225 if ( err ) {
186226 throw err ;
187227 }
@@ -190,11 +230,12 @@ class PoolCluster extends EventEmitter {
190230 process . nextTick ( cb ) ;
191231 return ;
192232 }
233+
193234 this . _closed = true ;
194235
195236 let calledBack = false ;
196237 let waitingClose = 0 ;
197- const onEnd = err => {
238+ const onEnd = ( err ) => {
198239 if ( ! calledBack && ( err || -- waitingClose <= 0 ) ) {
199240 calledBack = true ;
200241 return cb ( err ) ;
@@ -205,67 +246,98 @@ class PoolCluster extends EventEmitter {
205246 waitingClose ++ ;
206247 this . _nodes [ id ] . pool . end ( onEnd ) ;
207248 }
249+
208250 if ( waitingClose === 0 ) {
209251 process . nextTick ( onEnd ) ;
210252 }
211253 }
212254
213- _findNodeIds ( pattern ) {
214- if ( typeof this . _findCaches [ pattern ] !== 'undefined' ) {
215- return this . _findCaches [ pattern ] ;
216- }
217- let foundNodeIds ;
218- if ( pattern === '*' ) {
219- // all
220- foundNodeIds = this . _serviceableNodeIds ;
221- } else if ( this . _serviceableNodeIds . indexOf ( pattern ) !== - 1 ) {
222- // one
223- foundNodeIds = [ pattern ] ;
224- } else {
225- // wild matching
226- const keyword = pattern . substring ( pattern . length - 1 , 0 ) ;
227- foundNodeIds = this . _serviceableNodeIds . filter ( id =>
228- id . startsWith ( keyword )
229- ) ;
255+ _findNodeIds ( pattern , includeOffline ) {
256+ let currentTime = 0 ;
257+ let foundNodeIds = this . _findCaches [ pattern ] ;
258+
259+ if ( typeof this . _findCaches [ pattern ] === 'undefined' ) {
260+ if ( pattern === '*' ) {
261+ // all
262+ foundNodeIds = this . _serviceableNodeIds ;
263+ } else if ( this . _serviceableNodeIds . indexOf ( pattern ) !== - 1 ) {
264+ // one
265+ foundNodeIds = [ pattern ] ;
266+ } else {
267+ // wild matching
268+ const keyword = pattern . substring ( pattern . length - 1 , 0 ) ;
269+ foundNodeIds = this . _serviceableNodeIds . filter ( ( id ) =>
270+ id . startsWith ( keyword )
271+ ) ;
272+ }
230273 }
274+
231275 this . _findCaches [ pattern ] = foundNodeIds ;
232- return foundNodeIds ;
276+
277+ if ( includeOffline ) {
278+ return foundNodeIds ;
279+ }
280+
281+ return foundNodeIds . filter ( ( nodeId ) => {
282+ const node = this . _getNode ( nodeId ) ;
283+
284+ if ( ! node . _offlineUntil ) {
285+ return true ;
286+ }
287+
288+ if ( ! currentTime ) {
289+ currentTime = getMonotonicMilliseconds ( ) ;
290+ }
291+
292+ return node . _offlineUntil <= currentTime ;
293+ } ) ;
233294 }
234295
235296 _getNode ( id ) {
236297 return this . _nodes [ id ] || null ;
237298 }
238299
239300 _increaseErrorCount ( node ) {
240- if ( ++ node . errorCount >= this . _removeNodeErrorCount ) {
241- const index = this . _serviceableNodeIds . indexOf ( node . id ) ;
242- if ( index !== - 1 ) {
243- this . _serviceableNodeIds . splice ( index , 1 ) ;
244- delete this . _nodes [ node . id ] ;
245- this . _clearFindCaches ( ) ;
246- node . pool . end ( ) ;
247- this . emit ( 'remove' , node . id ) ;
248- }
301+ const errorCount = ++ node . errorCount ;
302+
303+ if ( this . _removeNodeErrorCount > errorCount ) {
304+ return ;
249305 }
306+
307+ if ( this . _restoreNodeTimeout > 0 ) {
308+ node . _offlineUntil =
309+ getMonotonicMilliseconds ( ) + this . _restoreNodeTimeout ;
310+ this . emit ( 'offline' , node . id ) ;
311+ return ;
312+ }
313+
314+ this . _removeNode ( node ) ;
315+ this . emit ( 'remove' , node . id ) ;
250316 }
251317
252318 _decreaseErrorCount ( node ) {
253- if ( node . errorCount > 0 ) {
254- -- node . errorCount ;
319+ let errorCount = node . errorCount ;
320+
321+ if ( errorCount > this . _removeNodeErrorCount ) {
322+ errorCount = this . _removeNodeErrorCount ;
323+ }
324+
325+ if ( errorCount < 1 ) {
326+ errorCount = 1 ;
327+ }
328+
329+ node . errorCount = errorCount - 1 ;
330+
331+ if ( node . _offlineUntil ) {
332+ node . _offlineUntil = 0 ;
333+ this . emit ( 'online' , node . id ) ;
255334 }
256335 }
257336
258337 _getConnection ( node , cb ) {
259338 node . pool . getConnection ( ( err , connection ) => {
260339 if ( err ) {
261340 this . _increaseErrorCount ( node ) ;
262- if ( this . _canRetry ) {
263- // REVIEW: this seems wrong?
264- this . emit ( 'warn' , err ) ;
265- // eslint-disable-next-line no-console
266- console . warn ( `[Error] PoolCluster : ${ err } ` ) ;
267- return cb ( null , 'retry' ) ;
268- }
269341 return cb ( err ) ;
270342 }
271343 this . _decreaseErrorCount ( node ) ;
@@ -275,6 +347,16 @@ class PoolCluster extends EventEmitter {
275347 } ) ;
276348 }
277349
350+ _removeNode ( node ) {
351+ const index = this . _serviceableNodeIds . indexOf ( node . id ) ;
352+ if ( index !== - 1 ) {
353+ this . _serviceableNodeIds . splice ( index , 1 ) ;
354+ delete this . _nodes [ node . id ] ;
355+ this . _clearFindCaches ( ) ;
356+ node . pool . end ( ) ;
357+ }
358+ }
359+
278360 _clearFindCaches ( ) {
279361 this . _findCaches = { } ;
280362 }
0 commit comments