Skip to content

Commit ba915fd

Browse files
authored
Merge pull request #3 from BackendStack21/enhance-cluster-manager-implementation
feat: Enhance ClusterManager with dynamic scaling, restart policies, …
2 parents 601c22b + ed19353 commit ba915fd

7 files changed

Lines changed: 380 additions & 58 deletions

File tree

README.md

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,78 @@ await gateway.listen(3000)
242242
console.log('Cluster started with 4 workers')
243243
```
244244

245+
#### Advanced usage: Cluster lifecycle and operations
246+
247+
Bungate’s cluster manager powers zero-downtime restarts, dynamic scaling, and safe shutdowns in production. You can control it via signals or programmatically.
248+
249+
- Zero-downtime rolling restart: send `SIGUSR2` to the master process
250+
- The manager spawns a replacement worker first, then gracefully stops the old one
251+
- Graceful shutdown: send `SIGTERM` or `SIGINT`
252+
- Workers receive `SIGTERM` and are given up to `shutdownTimeout` to exit before being force-killed
253+
254+
Programmatic controls (available when using the `ClusterManager` directly):
255+
256+
```ts
257+
import { ClusterManager, BunGateLogger } from 'bungate'
258+
259+
const logger = new BunGateLogger({ level: 'info' })
260+
261+
const cluster = new ClusterManager(
262+
{
263+
enabled: true,
264+
workers: 4,
265+
restartWorkers: true,
266+
restartDelay: 1000, // base delay used for exponential backoff with jitter
267+
maxRestarts: 10, // lifetime cap per worker
268+
respawnThreshold: 5, // sliding window cap
269+
respawnThresholdTime: 60_000, // within this time window
270+
shutdownTimeout: 30_000,
271+
// Set to false when embedding in tests to avoid process.exit(0)
272+
exitOnShutdown: true,
273+
},
274+
logger,
275+
'./gateway.ts', // worker entry (executed with Bun)
276+
)
277+
278+
await cluster.start()
279+
280+
// Dynamic scaling
281+
await cluster.scaleUp(2) // add 2 workers
282+
await cluster.scaleDown(1) // remove 1 worker
283+
await cluster.scaleTo(6) // set exact worker count
284+
285+
// Operational visibility
286+
console.log(cluster.getWorkerCount())
287+
console.log(cluster.getWorkerInfo()) // includes id, restarts, pid, etc.
288+
289+
// Broadcast a POSIX signal to all workers (e.g., for log-level reloads)
290+
cluster.broadcastSignal('SIGHUP')
291+
292+
// Target a single worker
293+
cluster.sendSignalToWorker(1, 'SIGHUP')
294+
295+
// Graceful shutdown (will exit process if exitOnShutdown !== false)
296+
// await (cluster as any).gracefulShutdown() // internal in gateway use; prefer SIGTERM
297+
```
298+
299+
Notes:
300+
301+
- Each worker receives `CLUSTER_WORKER=true` and `CLUSTER_WORKER_ID=<n>` environment variables.
302+
- Restart policy uses exponential backoff with jitter and a sliding window threshold to prevent flapping.
303+
- Defaults: `shutdownTimeout` 30s, `respawnThreshold` 5 within 60s, `restartDelay` 1s, `maxRestarts` 10.
304+
305+
Configuration reference (cluster):
306+
307+
- `enabled` (boolean): enable multi-process mode
308+
- `workers` (number): worker process count (defaults to CPU cores)
309+
- `restartWorkers` (boolean): auto-respawn crashed workers
310+
- `restartDelay` (ms): base delay for backoff
311+
- `maxRestarts` (number): lifetime restarts per worker
312+
- `respawnThreshold` (number): max restarts within time window
313+
- `respawnThresholdTime` (ms): sliding window size
314+
- `shutdownTimeout` (ms): grace period before force-kill
315+
- `exitOnShutdown` (boolean): if true (default), master exits after shutdown; set false in tests/embedded
316+
245317
### 🔄 **Advanced Load Balancing**
246318

247319
Distribute traffic intelligently across multiple backends:

src/cluster/cluster-manager.ts

Lines changed: 165 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ export interface WorkerInfo {
4747
lastRestartTime: number
4848
/** Flag indicating worker is in shutdown process */
4949
isExiting: boolean
50+
/**
51+
* Timestamps of recent restarts used to enforce respawn threshold within a time window
52+
* Old entries are pruned automatically on access
53+
*/
54+
restartTimestamps: number[]
5055
}
5156

5257
/**
@@ -68,6 +73,12 @@ export class ClusterManager {
6873
private isShuttingDown = false
6974
/** Path to worker script for spawning processes */
7075
private workerScript: string
76+
/** Guard to prevent double start */
77+
private started = false
78+
/** Bound signal handlers to allow proper removal */
79+
private boundSigint?: () => void
80+
private boundSigterm?: () => void
81+
private boundSigusr2?: () => void
7182

7283
/**
7384
* Initialize cluster manager with configuration and dependencies
@@ -106,54 +117,70 @@ export class ClusterManager {
106117
return
107118
}
108119

120+
if (this.started) {
121+
this.logger?.warn('Cluster already started; ignoring subsequent start()')
122+
return
123+
}
124+
this.started = true
125+
109126
this.logger?.info(`Starting cluster with ${this.config.workers} workers`)
110127

111128
// Configure signal handlers for graceful lifecycle management
112-
process.on('SIGINT', this.gracefulShutdown.bind(this))
113-
process.on('SIGTERM', this.gracefulShutdown.bind(this))
114-
process.on('SIGUSR2', this.restartAllWorkers.bind(this))
129+
this.boundSigint = this.gracefulShutdown.bind(this)
130+
this.boundSigterm = this.gracefulShutdown.bind(this)
131+
this.boundSigusr2 = this.restartAllWorkers.bind(this)
132+
process.on('SIGINT', this.boundSigint)
133+
process.on('SIGTERM', this.boundSigterm)
134+
process.on('SIGUSR2', this.boundSigusr2)
135+
136+
// Calculate number of workers, ensuring at least one
137+
const maxWorkers = Math.max(1, this.config.workers || cpus().length)
115138

116139
// Spawn workers
117-
for (let i = 0; i < this.config.workers!; i++) {
118-
await this.spawnWorker()
140+
for (let i = 0; i < maxWorkers; i++) {
141+
try {
142+
await this.spawnWorker()
143+
} catch (error) {
144+
this.logger?.error(`Failed to spawn worker ${i}:`, error as Error)
145+
}
119146
}
120147

121148
this.logger?.info('Cluster started successfully')
122149
}
123150

124-
private async spawnWorker(): Promise<void> {
125-
const workerId = this.nextWorkerId++
151+
private async spawnWorker(workerId?: number): Promise<void> {
152+
const id = workerId ?? this.nextWorkerId++
126153

127154
try {
128155
const worker = spawn({
129156
cmd: [process.execPath, this.workerScript],
130157
env: {
131158
...process.env,
132159
CLUSTER_WORKER: 'true',
133-
CLUSTER_WORKER_ID: workerId.toString(),
160+
CLUSTER_WORKER_ID: id.toString(),
134161
},
135162
stdio: ['inherit', 'inherit', 'inherit'],
136163
})
137164

138165
const workerInfo: WorkerInfo = {
139-
id: workerId,
166+
id,
140167
process: worker,
141168
restarts: 0,
142169
lastRestartTime: 0,
143170
isExiting: false,
171+
restartTimestamps: [],
144172
}
145173

146-
this.workers.set(workerId, workerInfo)
174+
this.workers.set(id, workerInfo)
147175

148176
// Handle worker exit
149177
worker.exited.then((exitCode) => {
150178
this.handleWorkerExit(workerInfo, exitCode)
151179
})
152180

153-
this.logger?.info(`Worker ${workerId} started (PID: ${worker.pid})`)
181+
this.logger?.info(`Worker ${id} started (PID: ${worker.pid})`)
154182
} catch (error) {
155-
this.logger?.error(`Failed to spawn worker ${workerId}:`, error as Error)
156-
throw error
183+
this.logger?.error(`Failed to spawn worker ${id}:`, error as Error)
157184
}
158185
}
159186

@@ -174,16 +201,25 @@ export class ClusterManager {
174201
// Check if we should restart the worker
175202
if (this.config.restartWorkers && this.shouldRestartWorker(workerInfo)) {
176203
this.logger?.info(`Restarting worker ${id}`)
204+
// Track restart metrics
205+
const now = Date.now()
177206
workerInfo.restarts++
178-
workerInfo.lastRestartTime = Date.now()
179-
180-
// Add restart delay
181-
await new Promise((resolve) =>
182-
setTimeout(resolve, this.config.restartDelay),
207+
workerInfo.lastRestartTime = now
208+
workerInfo.restartTimestamps.push(now)
209+
// Apply exponential backoff with jitter based on restarts count
210+
const base = Math.max(0, this.config.restartDelay ?? 1000)
211+
const attempt = Math.max(1, workerInfo.restarts)
212+
const maxDelay = 30000 // cap at 30s to avoid unbounded waits
213+
const backoff = Math.min(
214+
maxDelay,
215+
base * Math.pow(2, Math.min(5, attempt - 1)),
183216
)
217+
const jitter = Math.floor(Math.random() * Math.floor(base / 2))
218+
const delay = Math.min(maxDelay, backoff + jitter)
219+
await new Promise((resolve) => setTimeout(resolve, delay))
184220

185221
try {
186-
await this.spawnWorker()
222+
await this.spawnWorker(id)
187223
} catch (error) {
188224
this.logger?.error(`Failed to restart worker ${id}:`, error as Error)
189225
}
@@ -195,43 +231,48 @@ export class ClusterManager {
195231
}
196232

197233
private shouldRestartWorker(workerInfo: WorkerInfo): boolean {
198-
const { restarts, lastRestartTime } = workerInfo
234+
const { restarts } = workerInfo
199235
const now = Date.now()
200236

201-
// Check max restarts
202-
if (restarts >= this.config.maxRestarts!) {
237+
// Check max restarts (lifetime)
238+
if (
239+
typeof this.config.maxRestarts === 'number' &&
240+
restarts >= (this.config.maxRestarts ?? 10)
241+
) {
203242
return false
204243
}
205244

206-
// Check respawn threshold
207-
if (this.config.respawnThreshold && this.config.respawnThresholdTime) {
208-
const timeSinceLastRestart = now - lastRestartTime
209-
if (
210-
timeSinceLastRestart < this.config.respawnThresholdTime &&
211-
restarts >= this.config.respawnThreshold
212-
) {
213-
return false
214-
}
245+
// Enforce respawn threshold within time window using sliding window timestamps
246+
const threshold = this.config.respawnThreshold ?? 5
247+
const windowMs = this.config.respawnThresholdTime ?? 60000
248+
// Prune old timestamps
249+
workerInfo.restartTimestamps = workerInfo.restartTimestamps.filter(
250+
(t) => now - t <= windowMs,
251+
)
252+
if (workerInfo.restartTimestamps.length >= threshold) {
253+
return false
215254
}
216255

217256
return true
218257
}
219258

220259
private async restartAllWorkers(): Promise<void> {
221-
this.logger?.info('Restarting all workers')
260+
this.logger?.info('Rolling restart of all workers initiated')
222261

223262
const workerIds = Array.from(this.workers.keys())
224263

225264
for (const workerId of workerIds) {
226-
const workerInfo = this.workers.get(workerId)
227-
if (workerInfo) {
228-
this.logger?.info(`Restarting worker ${workerId}`)
229-
workerInfo.isExiting = true
230-
workerInfo.process.kill('SIGTERM')
231-
232-
// Wait a bit before starting the next restart
233-
await new Promise((resolve) => setTimeout(resolve, 1000))
234-
}
265+
const current = this.workers.get(workerId)
266+
if (!current) continue
267+
// Spawn a replacement first (uses a new worker id) to minimize downtime
268+
await this.spawnWorker()
269+
// Give the new worker a brief moment to initialize
270+
await new Promise((resolve) => setTimeout(resolve, 250))
271+
this.logger?.info(`Stopping old worker ${workerId} (rolling restart)`)
272+
current.isExiting = true
273+
current.process.kill('SIGTERM')
274+
// Small spacing to avoid thundering herd
275+
await new Promise((resolve) => setTimeout(resolve, 250))
235276
}
236277
}
237278

@@ -291,7 +332,13 @@ export class ClusterManager {
291332
await Promise.race([shutdownPromise, timeoutPromise])
292333

293334
this.logger?.info('Cluster shutdown complete')
294-
process.exit(0)
335+
// Remove signal handlers to avoid memory leaks when embedding in long-running processes/tests
336+
if (this.boundSigint) process.off('SIGINT', this.boundSigint)
337+
if (this.boundSigterm) process.off('SIGTERM', this.boundSigterm)
338+
if (this.boundSigusr2) process.off('SIGUSR2', this.boundSigusr2)
339+
if (this.config.exitOnShutdown ?? true) {
340+
process.exit(0)
341+
}
295342
}
296343

297344
getWorkerCount(): number {
@@ -301,4 +348,80 @@ export class ClusterManager {
301348
getWorkerInfo(): WorkerInfo[] {
302349
return Array.from(this.workers.values())
303350
}
351+
352+
/** Whether the cluster has been started and not shut down */
353+
isRunning(): boolean {
354+
return this.started && !this.isShuttingDown
355+
}
356+
357+
/**
358+
* Dynamically scale the number of workers.
359+
* Increases spawns or gracefully stops excess workers to match target.
360+
*/
361+
async scaleTo(target: number): Promise<void> {
362+
if (!this.config.enabled) return
363+
const desired = Math.max(1, Math.floor(target))
364+
const current = this.workers.size
365+
if (desired === current) return
366+
if (desired > current) {
367+
const toAdd = desired - current
368+
this.logger?.info(`Scaling up workers: +${toAdd}`)
369+
for (let i = 0; i < toAdd; i++) {
370+
await this.spawnWorker()
371+
}
372+
} else {
373+
const toRemove = current - desired
374+
this.logger?.info(`Scaling down workers: -${toRemove}`)
375+
const ids = Array.from(this.workers.keys()).slice(0, toRemove)
376+
for (const id of ids) {
377+
const info = this.workers.get(id)
378+
if (!info) continue
379+
info.isExiting = true
380+
info.process.kill('SIGTERM')
381+
}
382+
}
383+
this.config.workers = desired
384+
}
385+
386+
/** Convenience: increase workers by N (default 1) */
387+
scaleUp(by = 1): Promise<void> {
388+
return this.scaleTo((this.workers.size || 0) + Math.max(1, by))
389+
}
390+
391+
/** Convenience: decrease workers by N (default 1) */
392+
scaleDown(by = 1): Promise<void> {
393+
return this.scaleTo(Math.max(1, this.workers.size - Math.max(1, by)))
394+
}
395+
396+
/** Broadcast a POSIX signal to all workers (e.g., 'SIGTERM', 'SIGHUP'). */
397+
broadcastSignal(signal: NodeJS.Signals = 'SIGHUP'): void {
398+
for (const [id, info] of this.workers) {
399+
this.logger?.debug(`Sending ${signal} to worker ${id}`)
400+
try {
401+
info.process.kill(signal)
402+
} catch (err) {
403+
this.logger?.warn(
404+
`Failed sending ${signal} to worker ${id}: ${(err as Error).message}`,
405+
)
406+
}
407+
}
408+
}
409+
410+
/** Send a signal to a single worker by id. */
411+
sendSignalToWorker(
412+
workerId: number,
413+
signal: NodeJS.Signals = 'SIGHUP',
414+
): boolean {
415+
const info = this.workers.get(workerId)
416+
if (!info) return false
417+
try {
418+
info.process.kill(signal)
419+
return true
420+
} catch (err) {
421+
this.logger?.warn(
422+
`Failed sending ${signal} to worker ${workerId}: ${(err as Error).message}`,
423+
)
424+
return false
425+
}
426+
}
304427
}

0 commit comments

Comments
 (0)