Skip to content

Commit

Permalink
Sockets: allow changing types of workers while the server is running
Browse files Browse the repository at this point in the history
  • Loading branch information
Morfent committed Jun 7, 2017
1 parent 23879fb commit 2b71e64
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 55 deletions.
5 changes: 0 additions & 5 deletions sockets-workers.js
Original file line number Diff line number Diff line change
Expand Up @@ -527,16 +527,11 @@ if (cluster.isWorker) {

server.installHandlers(app, {});
app.listen(Config.port, Config.bindaddress);
console.log(`Worker ${cluster.worker.id} now listening on ${Config.bindaddress}:${Config.port}`);

if (appssl) {
server.installHandlers(appssl, {});
appssl.listen(Config.ssl.port, Config.bindaddress);
console.log(`Worker ${cluster.worker.id} now listening for SSL on port ${Config.ssl.port}`);
}

console.log(`Test your server at http://${Config.bindaddress === '0.0.0.0' ? 'localhost' : Config.bindaddress}:${Config.port}`);

require('./repl').start(
'sockets-',
`${cluster.worker.id}-${process.pid}`,
Expand Down
115 changes: 89 additions & 26 deletions sockets.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,25 @@ const DELIM = '\x03';
* them accordingly.
*/
class WorkerWrapper {
/** @param {any} worker */
constructor(worker) {
this.id = worker.id;
/**
* @param {any} worker
* @param {number} id
*/
constructor(worker, id) {
this.worker = worker;
this.id = id;
this.isTrustedProxyIp = Dnsbl.checker(Config.proxyip);

worker.once('listening', () => this.onListen());
worker.on('message',
/** @param {string} data */
data => this.onMessage(data)
);
worker.on('error', () => {});
worker.on('error', () => {
// Handle on disconnect.
});
worker.once('disconnect',
/** @param {Error?} err */
/** @param {Error=} err */
err => this.onDisconnect(err)
);
}
Expand Down Expand Up @@ -179,6 +185,18 @@ class WorkerWrapper {
return ip;
}

/**
* @description 'listening' event handler for the worker. Logs which
* hostname and worker ID is listening to console.
*/
onListen() {
console.log(`Worker ${this.id} now listening on ${Config.bindaddress}:${Config.port}`);
if (Config.ssl) {
console.log(`Worker ${this.id} now listening for SSL on port ${Config.ssl.port}`);
}
console.log(`Test your server at http://${Config.bindaddress === '0.0.0.0' ? 'localhost' : Config.bindaddress}:${Config.port}`);
}

/**
* @description 'message' event handler for the worker. Parses which type
* of command the incoming IPC message uses, then parses its parametres and
Expand All @@ -194,24 +212,26 @@ class WorkerWrapper {
case '*':
let [socketid, ip, header, protocol] = this.parseParams(params, 4);
ip = this.pluckUntrustedIp(ip, header);
Users.socketConnect(this.worker, this.id, socketid, ip, protocol);
Users.socketConnect(this, this.id, socketid, ip, protocol);
break;
case '!':
Users.socketDisconnect(this.worker, this.id, params);
Users.socketDisconnect(this, this.id, params);
break;
case '<':
Users.socketReceive(this.worker, this.id, ...this.parseParams(params, 2));
Users.socketReceive(this, this.id, ...this.parseParams(params, 2));
break;
}
}

/**
* @description Worker 'disconnect' event handler.
* @param {Error?} err
* @param {Error=} err
*/
onDisconnect(err) {
require('./crashlogger')(new Error(`Worker ${this.id} abruptly died with the following stack trace: ${err && err.stack}`), 'The main process');
console.error(`${Users.socketDisconnectAll(this.worker)} connections were lost.`);
if (err) {
require('./crashlogger')(new Error(`Worker ${this.id} abruptly died with the following stack trace: ${err.stack}`), 'The main process');
console.error(`${Users.socketDisconnectAll(this)} connections were lost.`);
}
spawnWorker();
}
}
Expand All @@ -234,24 +254,25 @@ class GoWorker extends EventEmitter {

/** @type {string[]} */
this.buffer = [];
/** @type {Error?} */
this.error = null;
/** @type {Error | void} */
this.error = undefined;

this.process = null;
this.connection = null;
this.server = require('net').createServer();
this.server.once('connection', connection => this.onChildConnect(connection));
this.server.once('close', () => this.emit('disconnect'));
this.server.on('error', () => {});
this.server.listen(() => this.spawnChild());
this.server.listen(() => process.nextTick(() => this.spawnChild()));
}

/**
* @description Worker#kill mock
* @param {string} signal
*/
kill(signal = 'SIGTERM') {
if (this.isConnected()) this.connection.close();
if (this.process && !this.isDead()) this.process.kill(signal);
if (this.connection) this.connection.destroy();
if (this.process) this.process.kill(signal);
this.server.close();
}

Expand Down Expand Up @@ -296,7 +317,7 @@ class GoWorker extends EventEmitter {
* @return {boolean}
*/
isDead() {
return !this.process || this.connection.exitCode !== null || this.connection.statusCode !== null;
return this.connection && !this.connection.destroyed;
}

/**
Expand All @@ -321,14 +342,23 @@ class GoWorker extends EventEmitter {
}
);

this.process.once('exit', () => {
if (this.server._eventsCount <= 2) {
// The child process died before ever opening the IPC
// connection and sending any messages over it. Let's avoid
// getting trapped in an endless loop of respawns and crashes
// if it crashed.
if (this.error) throw this.error;
}
});

this.process.stderr.setEncoding('utf8');
this.process.stderr.once('data',
/** @param {string} data */
data => {
this.error = new Error(data);
}
);
this.process.once('exit', () => this.emit('disconnect', this.error));
}

/**
Expand All @@ -349,28 +379,62 @@ class GoWorker extends EventEmitter {
}
);
this.connection.on('error', () => {});

this.emit('listening');
}
}

/**
* @description Map of worker IDs to worker processes.
* @description Map of worker IDs to workers.
* @type {Map<number, WorkerWrapper>}
*/
const workers = new Map();

/**
* @description Worker ID counter used for Go workers.
* @description Worker ID counter. We don't use cluster's internal counter so
* Config.golang can be freely changed while the server is still running.
* @type {number}
*/
let nextWorkerid = 0;
let nextWorkerid = 1;

/**
* @description Config.golang cache. Checked when spawning new workers to
* ensure that Node and Go workers will not try to run at the same time.
* @type {boolean}
*/
let golangCache = !!Config.golang;

/**
* @description Spawns a new worker process.
* @description Spawns a new worker.
* @return {WorkerWrapper}
*/
function spawnWorker() {
if (golangCache === !Config.golang) {
// Config settings were changed. Make sure none of the wrong kind of
// worker is already listening.
let workerType = Config.golang ? GoWorker : cluster.Worker;
for (let [workerid, worker] of workers) {
if (worker.isConnected() && !(worker.worker instanceof workerType)) {
let oldType = golangCache ? 'Go' : 'Node';
let newType = Config.golang ? 'Go' : 'Node';
throw new Error(
`Sockets: worker of ID ${workerid} is a ${oldType} worker, but config was changed to spawn ${newType} ones!
Set Config.golang back to ${golangCache} or kill all active workers before attempting to spawn more.`
);
}
}
golangCache = !!Config.golang;
} else if (golangCache) {
// Prevent spawning multiple Go child processes by accident.
for (let [workerid, worker] of workers) { // eslint-disable-line no-unused-vars
if (worker.worker instanceof GoWorker) {
throw new Error('Sockets: multiple Go child processes cannot be spawned!');
}
}
}

let worker;
if (Config.golang) {
if (golangCache) {
worker = new GoWorker(nextWorkerid);
} else {
worker = cluster.fork({
Expand All @@ -380,9 +444,8 @@ function spawnWorker() {
});
}

let wrapper = new WorkerWrapper(worker);
let wrapper = new WorkerWrapper(worker, nextWorkerid++);
workers.set(wrapper.id, wrapper);
nextWorkerid++;
return wrapper;
}

Expand Down Expand Up @@ -413,7 +476,7 @@ function listen(port, bindAddress, workerCount) {
// serving like Node.js workers do. Workers are instead used to limit the
// number of concurrent requests that can be handled at once in the child
// process.
if (Config.golang) {
if (golangCache) {
spawnWorker();
return;
}
Expand Down
30 changes: 6 additions & 24 deletions sockets/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"crypto/tls"
"fmt"
"io/ioutil"
"log"
"net"
Expand Down Expand Up @@ -77,11 +76,6 @@ func main() {

// Begin serving over HTTP.
go func(ba string, port string) {
srv := &http.Server{
Handler: r,
Addr: ba + port,
}

addr, err := net.ResolveTCPAddr("tcp4", ba+port)
if err != nil {
log.Fatalf("Sockets: failed to resolve the TCP address of the parent's server: %v", err)
Expand All @@ -90,20 +84,11 @@ func main() {
ln, err := net.ListenTCP("tcp4", addr)
defer ln.Close()
if err != nil {
log.Fatalf("Sockets: failed to listen on %v over HTTP", srv.Addr)
}

fmt.Printf("Go workers now listening on %v%v\n", ba, port)

if ba == "0.0.0.0" {
fmt.Printf("Test your server at http://localhost%v/\n", port)
} else {
fmt.Printf("Test your server at http://%v%v/\n", ba, port)
log.Fatalf("Sockets: failed to listen over HTTP: %v", err)
}

if err = http.Serve(ln, r); err != nil {
log.Fatalf("Sockets: HTTP server failed with error: %v", err)
}
err = http.Serve(ln, r)
log.Fatalf("Sockets: HTTP server failed: %v", err)
}(config.BindAddress, config.Port)

// Begin serving over HTTPS if configured to do so.
Expand All @@ -124,14 +109,11 @@ func main() {
ln, err = tls.Listen("tcp4", srv.Addr, srv.TLSConfig)
defer ln.Close()
if err != nil {
log.Fatalf("Sockets: failed to listen on %v over HTTPS", srv.Addr)
log.Fatalf("Sockets: failed to listen over HTTPS: %v", err)
}

fmt.Printf("Go workers now listening for SSL on port %v\n", port)

if err := http.Serve(ln, r); err != nil {
log.Fatalf("Sockets: HTTPS server failed: %v", err)
}
err = http.Serve(ln, r)
log.Fatalf("Sockets: HTTPS server failed: %v", err)
}(config.BindAddress, config.SSL.Port, config.SSL.Options.Cert, config.SSL.Options.Key)
}

Expand Down

0 comments on commit 2b71e64

Please sign in to comment.