Skip to content

Commit 3ededbb

Browse files
committed
feat(cluster): Allow multiple worker processes with cluster module
This adds the ability for the API to use multiple worker processes with Node.js's builtin [`cluster`](https://nodejs.org/api/cluster.html) module. Historically, I've been opposed to adding this functionality, preferring that Pelias users manage parallelism on their own such as with Kubernetes or something else. But there are enough use cases where that sort of orchestration isn't worth the complexity, and you want more than one CPU's worth of API requests. This implementation is essentially lifted from the one in Placeholder, with mostly cosmetic changes only. The biggest difference is the default. For backwards compatibility, without specifing the new `CPUS` environment variable, the API will _not_ use the cluster module and operate as a single process just like before. _With_ the `CPUS` variable set to a number, that many worker processes will be launched, up to the number of CPUs detected on the machine.
1 parent 1f921d5 commit 3ededbb

2 files changed

Lines changed: 73 additions & 9 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ Most Pelias configuration is done through pelias-config, however the API has add
220220
| --- | --- | --- |
221221
| HOST | `undefined` | The network interface the Pelias API will bind to. Defaults to whatever the current Node.js default is, which is currently to listen on all interfaces. See the [Node.js Net documentation](https://nodejs.org/api/net.html#net_server_listen_port_host_backlog_callback) for more info. |
222222
| PORT | 3100 | The TCP port the Pelias API will use for incoming network connections. |
223+
| CPUS | 1 | The maximum number of worker processes to be launched. Will not launch more than one process per detected CPU |
223224

224225
## Contributing
225226

index.js

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,88 @@
1+
const os = require('os');
2+
const cluster = require('cluster');
3+
14
const logger = require('pelias-logger').get('api');
25
const type_mapping = require('./helper/type_mapping');
36

47
const app = require('./app'),
58
port = ( process.env.PORT || 3100 ),
69
host = ( process.env.HOST || undefined );
710

11+
// determine the number of processes to launch
12+
// by default launch only a single process,
13+
// but if the CPUS environment variable is set, launch up to one process per CPU detected
14+
const envCpus = parseInt( process.env.CPUS, 10 );
15+
const cpus = Math.min( envCpus || 1 , os.availableParallelism() );
16+
817
let server;
18+
let terminating = false;
19+
20+
logger.info('Starting Pelias API using %d CPUs', cpus);
21+
22+
// simple case where cluster module is disabled with CPUS=1
23+
// or if this is a worker
24+
if ( cpus === 1 || cluster.isWorker ) {
25+
startServer();
26+
// if using the cluster module, do more work to set up all the workers
27+
} else if ( cluster.isMaster ) {
28+
// listen to worker ready message and print a message
29+
cluster.on('online', (worker) => {
30+
if (Object.keys(cluster.workers).length === cpus) {
31+
logger.info( `pelias is now running on http://${host || `::`}:${port}` );
32+
}
33+
});
34+
35+
// set up worker exit event that prints error message
36+
cluster.on('exit', (worker, code, signal) => {
37+
if (!terminating) {
38+
logger.error('[master] worker died', worker.process.pid);
39+
}
40+
});
941

10-
// load Elasticsearch type mappings before starting web server
11-
type_mapping.load(() => {
12-
server = app.listen( port, host, () => {
13-
// ask server for the actual address and port its listening on
14-
const listenAddress = server.address();
15-
logger.info( `pelias is now running on http://${listenAddress.address}:${listenAddress.port}` );
42+
// create a handler that prints when a new worker is created via fork
43+
cluster.on('fork', (worker, code, signal) => {
44+
logger.info('[master] worker forked', worker.process.pid);
1645
});
17-
});
1846

47+
// call fork to create the desired number of workers
48+
for( let c = 0; c < cpus; c++ ){
49+
cluster.fork();
50+
}
51+
}
52+
53+
// an exit handler that either closes the local Express server
54+
// or, if using the cluster module, forwards the signal to all workers
1955
function exitHandler() {
20-
logger.info('Pelias API shutting down');
56+
terminating = true;
57+
58+
if (cluster.isPrimary) {
59+
logger.info('Pelias API shutting down');
60+
for (const id in cluster.workers) {
61+
cluster.workers[id].send('shutdown');
62+
cluster.workers[id].disconnect();
63+
}
64+
}
2165

22-
server.close();
66+
if (server) {
67+
server.close();
68+
}
69+
}
70+
71+
function startServer() {
72+
// load Elasticsearch type_mapping before starting the web server
73+
// This has to be run on each forked worker because unlike "real"
74+
// unix `fork`, these forks don't share memory with other workers
75+
type_mapping.load(() => {
76+
server = app.listen( port, host, () => {
77+
// ask server for the actual address and port its listening on
78+
const listenAddress = server.address();
79+
if (cluster.isMaster) {
80+
logger.info( `pelias is now running on http://${listenAddress.address}:${listenAddress.port}` );
81+
} else {
82+
logger.info( `pelias worker ${process.pid} online` );
83+
}
84+
});
85+
});
2386
}
2487

2588
process.on('SIGINT', exitHandler);

0 commit comments

Comments
 (0)