Skip to content
This repository was archived by the owner on Apr 22, 2023. It is now read-only.

Commit 25f5793

Browse files
committed
added disconnect method and event
1 parent 29647e8 commit 25f5793

1 file changed

Lines changed: 134 additions & 6 deletions

File tree

lib/cluster.js

Lines changed: 134 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,13 @@ if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
4646
// Used in the master:
4747
var masterStarted = false;
4848
var ids = 0;
49-
var servers = {};
49+
var serverHandlers = {};
5050
var workerFilename;
5151
var workerArgs;
5252
var workerTotal;
5353

5454
// Used in the worker:
55+
var serverLisenters = {};
5556
var queryIds = 0;
5657
var queryCallbacks = {};
5758

@@ -91,6 +92,7 @@ cluster.setupMaster = function(options) {
9192
workerArgs = options.args || process.argv.slice(2);
9293
workerTotal = options.workers || os.cpus().length;
9394

95+
/*
9496
//This is really bad
9597
process.on('uncaughtException', function(e) {
9698
// Quickly try to kill all the workers.
@@ -105,7 +107,7 @@ cluster.setupMaster = function(options) {
105107
console.error('Please report this bug.');
106108
process.exit(1);
107109
});
108-
110+
*/
109111
};
110112

111113
// Check if a message is internal only
@@ -161,6 +163,8 @@ function handleMessage(message, handle, worker) {
161163
}
162164
}
163165
}
166+
167+
//Messages to the master will be handled using this methods
164168
if (cluster.isMaster) {
165169

166170
//Handle online messages from workers
@@ -183,10 +187,10 @@ if (cluster.isMaster) {
183187
var key = args.join(':');
184188
var handler;
185189

186-
if (servers.hasOwnProperty(key)) {
187-
handler = servers[key];
190+
if (serverHandlers.hasOwnProperty(key)) {
191+
handler = serverHandlers[key];
188192
} else {
189-
handler = servers[key] = net._createServerHandle.apply(net, args);
193+
handler = serverHandlers[key] = net._createServerHandle.apply(net, args);
190194
}
191195

192196
//echo callback id, with the fd handler associated with it
@@ -219,6 +223,49 @@ if (cluster.isMaster) {
219223
worker.send(internalMessage({}, message));
220224
}
221225
};
226+
227+
//Handle disconnect messages from workers
228+
messageHandingObject.disconnect = function(message, worker) {
229+
230+
if (message.state === "setState") {
231+
worker.state = "disconnect";
232+
worker.suicide = true;
233+
234+
//Send echo if requested
235+
if (requireEcho(message)) {
236+
worker.send(internalMessage({}, message));
237+
}
238+
}
239+
240+
else if (message.state === "done") {
241+
//Send echo if requested before closing channel
242+
if (requireEcho(message)) {
243+
worker.send(internalMessage({}, message));
244+
}
245+
246+
worker.process._channel.close();
247+
worker.emit("disconnect", worker);
248+
cluster.emit("disconnect", worker);
249+
}
250+
251+
};
252+
253+
}
254+
255+
//Messages to a worker will be handled using this methods
256+
else if (cluster.isWorker) {
257+
258+
//Handle disconnect messages from master
259+
messageHandingObject.disconnect = function(message, worker) {
260+
//Run disconnect
261+
worker.disconnect();
262+
263+
//Send echo if requested
264+
if (requireEcho(message)) {
265+
worker.send(internalMessage({}, message));
266+
}
267+
};
268+
222269
}
223270

224271
// Create a worker call there works both for master and worker
@@ -292,6 +339,25 @@ function Worker(env) {
292339
self.emit('exit', self);
293340
cluster.emit('death', self);
294341
});
342+
343+
//Handle disconnect
344+
self.on('disconnect', function () {
345+
346+
//set state to disconnect
347+
self.disconnect = 'disconnect';
348+
349+
//Make suicide a boolean
350+
self.suicide = !!self.suicide;
351+
352+
//Remove from workers in the master
353+
if (cluster.isMaster) {
354+
delete cluster.workers[self.workerID];
355+
}
356+
357+
//Emit exit and death
358+
self.emit('exit', self);
359+
cluster.emit('death', self);
360+
});
295361

296362
}
297363
util.inherits(Worker, EventEmitter);
@@ -338,6 +404,64 @@ Worker.prototype.kill = function() {
338404
}
339405
};
340406

407+
// Kill the worker without restarting
408+
Worker.prototype.disconnect = function() {
409+
var self = this;
410+
411+
if (cluster.isMaster) {
412+
//Inform worker that is should disconnect from the master
413+
this.send(internalMessage({cmd: 'disconnect'}));
414+
} else {
415+
//Inform master that about state and suicide and make it emit disconnect
416+
this.send(internalMessage({cmd: 'disconnect', state: "setState"}), function () {
417+
var item;
418+
419+
//Predefine closeState
420+
var closeState = {master: false};
421+
for (item in serverLisenters) {
422+
if (serverLisenters.hasOwnProperty(item)) {
423+
closeState[item] = false;
424+
}
425+
}
426+
427+
//Check closeState
428+
var setState = function (name) {
429+
return function () {
430+
//Set State
431+
closeState[name] = true;
432+
433+
//Check all closeStates
434+
var state;
435+
for (state in closeState) {
436+
if (closeState.hasOwnProperty(state) && closeState === false) {
437+
return undefined;
438+
}
439+
}
440+
441+
//Emit a disconnect if all closeState are true
442+
self.emit('disconnect', self);
443+
};
444+
};
445+
446+
//Close TCP connections
447+
for (item in serverLisenters) {
448+
if (serverLisenters.hasOwnProperty(item)) {
449+
450+
//Close TCP connection and set closeState when done
451+
serverLisenters[item].once('end', setState(item));
452+
serverLisenters[item].close();
453+
}
454+
}
455+
456+
//Make master emit a disconnect event
457+
self.send(internalMessage({cmd: 'disconnect', state: "done"}), function () {
458+
//Close connection to the master
459+
setState('master')();
460+
});
461+
});
462+
}
463+
};
464+
341465
// Fork a new worker
342466
cluster.fork = function(env) {
343467
// This can only be called from the master.
@@ -402,7 +526,11 @@ cluster._setupWorker = function() {
402526
cluster._getServer = function(tcpSelf, address, port, addressType, cb) {
403527
// This can only be called from a worker.
404528
assert(cluster.isWorker);
405-
529+
530+
//Store tcp instance for later use
531+
var key = [address, port, addressType].join(":");
532+
serverLisenters[key] = tcpSelf;
533+
406534
//Send a listening message to the master
407535
tcpSelf.once('listening', function() {
408536
cluster.worker.state = 'listening';

0 commit comments

Comments
 (0)