Skip to content

Commit 5f548c9

Browse files
committed
feat: Enhance ClusterManager with dynamic scaling, restart policies, and improved shutdown handling
1 parent 601c22b commit 5f548c9

5 files changed

Lines changed: 286 additions & 42 deletions

File tree

src/cluster/cluster-manager.ts

Lines changed: 165 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ export interface WorkerInfo {
4747
lastRestartTime: number
4848
/** Flag indicating worker is in shutdown process */
4949
isExiting: boolean
50+
/**
51+
* Timestamps of recent restarts used to enforce respawn threshold within a time window
52+
* Old entries are pruned automatically on access
53+
*/
54+
restartTimestamps: number[]
5055
}
5156

5257
/**
@@ -68,6 +73,12 @@ export class ClusterManager {
6873
private isShuttingDown = false
6974
/** Path to worker script for spawning processes */
7075
private workerScript: string
76+
/** Guard to prevent double start */
77+
private started = false
78+
/** Bound signal handlers to allow proper removal */
79+
private boundSigint?: () => void
80+
private boundSigterm?: () => void
81+
private boundSigusr2?: () => void
7182

7283
/**
7384
* Initialize cluster manager with configuration and dependencies
@@ -106,54 +117,70 @@ export class ClusterManager {
106117
return
107118
}
108119

120+
if (this.started) {
121+
this.logger?.warn('Cluster already started; ignoring subsequent start()')
122+
return
123+
}
124+
this.started = true
125+
109126
this.logger?.info(`Starting cluster with ${this.config.workers} workers`)
110127

111128
// Configure signal handlers for graceful lifecycle management
112-
process.on('SIGINT', this.gracefulShutdown.bind(this))
113-
process.on('SIGTERM', this.gracefulShutdown.bind(this))
114-
process.on('SIGUSR2', this.restartAllWorkers.bind(this))
129+
this.boundSigint = this.gracefulShutdown.bind(this)
130+
this.boundSigterm = this.gracefulShutdown.bind(this)
131+
this.boundSigusr2 = this.restartAllWorkers.bind(this)
132+
process.on('SIGINT', this.boundSigint)
133+
process.on('SIGTERM', this.boundSigterm)
134+
process.on('SIGUSR2', this.boundSigusr2)
135+
136+
// Calculate number of workers, ensuring at least one
137+
const maxWorkers = Math.max(1, this.config.workers || cpus().length)
115138

116139
// Spawn workers
117-
for (let i = 0; i < this.config.workers!; i++) {
118-
await this.spawnWorker()
140+
for (let i = 0; i < maxWorkers; i++) {
141+
try {
142+
await this.spawnWorker()
143+
} catch (error) {
144+
this.logger?.error(`Failed to spawn worker ${i}:`, error as Error)
145+
}
119146
}
120147

121148
this.logger?.info('Cluster started successfully')
122149
}
123150

124-
private async spawnWorker(): Promise<void> {
125-
const workerId = this.nextWorkerId++
151+
private async spawnWorker(workerId?: number): Promise<void> {
152+
const id = workerId ?? this.nextWorkerId++
126153

127154
try {
128155
const worker = spawn({
129156
cmd: [process.execPath, this.workerScript],
130157
env: {
131158
...process.env,
132159
CLUSTER_WORKER: 'true',
133-
CLUSTER_WORKER_ID: workerId.toString(),
160+
CLUSTER_WORKER_ID: id.toString(),
134161
},
135162
stdio: ['inherit', 'inherit', 'inherit'],
136163
})
137164

138165
const workerInfo: WorkerInfo = {
139-
id: workerId,
166+
id,
140167
process: worker,
141168
restarts: 0,
142169
lastRestartTime: 0,
143170
isExiting: false,
171+
restartTimestamps: [],
144172
}
145173

146-
this.workers.set(workerId, workerInfo)
174+
this.workers.set(id, workerInfo)
147175

148176
// Handle worker exit
149177
worker.exited.then((exitCode) => {
150178
this.handleWorkerExit(workerInfo, exitCode)
151179
})
152180

153-
this.logger?.info(`Worker ${workerId} started (PID: ${worker.pid})`)
181+
this.logger?.info(`Worker ${id} started (PID: ${worker.pid})`)
154182
} catch (error) {
155-
this.logger?.error(`Failed to spawn worker ${workerId}:`, error as Error)
156-
throw error
183+
this.logger?.error(`Failed to spawn worker ${id}:`, error as Error)
157184
}
158185
}
159186

@@ -174,16 +201,25 @@ export class ClusterManager {
174201
// Check if we should restart the worker
175202
if (this.config.restartWorkers && this.shouldRestartWorker(workerInfo)) {
176203
this.logger?.info(`Restarting worker ${id}`)
204+
// Track restart metrics
205+
const now = Date.now()
177206
workerInfo.restarts++
178-
workerInfo.lastRestartTime = Date.now()
179-
180-
// Add restart delay
181-
await new Promise((resolve) =>
182-
setTimeout(resolve, this.config.restartDelay),
207+
workerInfo.lastRestartTime = now
208+
workerInfo.restartTimestamps.push(now)
209+
// Apply exponential backoff with jitter based on restarts count
210+
const base = Math.max(0, this.config.restartDelay ?? 1000)
211+
const attempt = Math.max(1, workerInfo.restarts)
212+
const maxDelay = 30000 // cap at 30s to avoid unbounded waits
213+
const backoff = Math.min(
214+
maxDelay,
215+
base * Math.pow(2, Math.min(5, attempt - 1)),
183216
)
217+
const jitter = Math.floor(Math.random() * Math.floor(base / 2))
218+
const delay = Math.min(maxDelay, backoff + jitter)
219+
await new Promise((resolve) => setTimeout(resolve, delay))
184220

185221
try {
186-
await this.spawnWorker()
222+
await this.spawnWorker(id)
187223
} catch (error) {
188224
this.logger?.error(`Failed to restart worker ${id}:`, error as Error)
189225
}
@@ -195,43 +231,48 @@ export class ClusterManager {
195231
}
196232

197233
private shouldRestartWorker(workerInfo: WorkerInfo): boolean {
198-
const { restarts, lastRestartTime } = workerInfo
234+
const { restarts } = workerInfo
199235
const now = Date.now()
200236

201-
// Check max restarts
202-
if (restarts >= this.config.maxRestarts!) {
237+
// Check max restarts (lifetime)
238+
if (
239+
typeof this.config.maxRestarts === 'number' &&
240+
restarts >= (this.config.maxRestarts ?? 10)
241+
) {
203242
return false
204243
}
205244

206-
// Check respawn threshold
207-
if (this.config.respawnThreshold && this.config.respawnThresholdTime) {
208-
const timeSinceLastRestart = now - lastRestartTime
209-
if (
210-
timeSinceLastRestart < this.config.respawnThresholdTime &&
211-
restarts >= this.config.respawnThreshold
212-
) {
213-
return false
214-
}
245+
// Enforce respawn threshold within time window using sliding window timestamps
246+
const threshold = this.config.respawnThreshold ?? 5
247+
const windowMs = this.config.respawnThresholdTime ?? 60000
248+
// Prune old timestamps
249+
workerInfo.restartTimestamps = workerInfo.restartTimestamps.filter(
250+
(t) => now - t <= windowMs,
251+
)
252+
if (workerInfo.restartTimestamps.length >= threshold) {
253+
return false
215254
}
216255

217256
return true
218257
}
219258

220259
private async restartAllWorkers(): Promise<void> {
221-
this.logger?.info('Restarting all workers')
260+
this.logger?.info('Rolling restart of all workers initiated')
222261

223262
const workerIds = Array.from(this.workers.keys())
224263

225264
for (const workerId of workerIds) {
226-
const workerInfo = this.workers.get(workerId)
227-
if (workerInfo) {
228-
this.logger?.info(`Restarting worker ${workerId}`)
229-
workerInfo.isExiting = true
230-
workerInfo.process.kill('SIGTERM')
231-
232-
// Wait a bit before starting the next restart
233-
await new Promise((resolve) => setTimeout(resolve, 1000))
234-
}
265+
const current = this.workers.get(workerId)
266+
if (!current) continue
267+
// Spawn a replacement first (uses a new worker id) to minimize downtime
268+
await this.spawnWorker()
269+
// Give the new worker a brief moment to initialize
270+
await new Promise((resolve) => setTimeout(resolve, 250))
271+
this.logger?.info(`Stopping old worker ${workerId} (rolling restart)`)
272+
current.isExiting = true
273+
current.process.kill('SIGTERM')
274+
// Small spacing to avoid thundering herd
275+
await new Promise((resolve) => setTimeout(resolve, 250))
235276
}
236277
}
237278

@@ -291,7 +332,13 @@ export class ClusterManager {
291332
await Promise.race([shutdownPromise, timeoutPromise])
292333

293334
this.logger?.info('Cluster shutdown complete')
294-
process.exit(0)
335+
// Remove signal handlers to avoid memory leaks when embedding in long-running processes/tests
336+
if (this.boundSigint) process.off('SIGINT', this.boundSigint)
337+
if (this.boundSigterm) process.off('SIGTERM', this.boundSigterm)
338+
if (this.boundSigusr2) process.off('SIGUSR2', this.boundSigusr2)
339+
if (this.config.exitOnShutdown ?? true) {
340+
process.exit(0)
341+
}
295342
}
296343

297344
getWorkerCount(): number {
@@ -301,4 +348,80 @@ export class ClusterManager {
301348
getWorkerInfo(): WorkerInfo[] {
302349
return Array.from(this.workers.values())
303350
}
351+
352+
/** Whether the cluster has been started and not shut down */
353+
isRunning(): boolean {
354+
return this.started && !this.isShuttingDown
355+
}
356+
357+
/**
358+
* Dynamically scale the number of workers.
359+
* Increases spawns or gracefully stops excess workers to match target.
360+
*/
361+
async scaleTo(target: number): Promise<void> {
362+
if (!this.config.enabled) return
363+
const desired = Math.max(1, Math.floor(target))
364+
const current = this.workers.size
365+
if (desired === current) return
366+
if (desired > current) {
367+
const toAdd = desired - current
368+
this.logger?.info(`Scaling up workers: +${toAdd}`)
369+
for (let i = 0; i < toAdd; i++) {
370+
await this.spawnWorker()
371+
}
372+
} else {
373+
const toRemove = current - desired
374+
this.logger?.info(`Scaling down workers: -${toRemove}`)
375+
const ids = Array.from(this.workers.keys()).slice(0, toRemove)
376+
for (const id of ids) {
377+
const info = this.workers.get(id)
378+
if (!info) continue
379+
info.isExiting = true
380+
info.process.kill('SIGTERM')
381+
}
382+
}
383+
this.config.workers = desired
384+
}
385+
386+
/** Convenience: increase workers by N (default 1) */
387+
scaleUp(by = 1): Promise<void> {
388+
return this.scaleTo((this.workers.size || 0) + Math.max(1, by))
389+
}
390+
391+
/** Convenience: decrease workers by N (default 1) */
392+
scaleDown(by = 1): Promise<void> {
393+
return this.scaleTo(Math.max(1, this.workers.size - Math.max(1, by)))
394+
}
395+
396+
/** Broadcast a POSIX signal to all workers (e.g., 'SIGTERM', 'SIGHUP'). */
397+
broadcastSignal(signal: NodeJS.Signals = 'SIGHUP'): void {
398+
for (const [id, info] of this.workers) {
399+
this.logger?.debug(`Sending ${signal} to worker ${id}`)
400+
try {
401+
info.process.kill(signal)
402+
} catch (err) {
403+
this.logger?.warn(
404+
`Failed sending ${signal} to worker ${id}: ${(err as Error).message}`,
405+
)
406+
}
407+
}
408+
}
409+
410+
/** Send a signal to a single worker by id. */
411+
sendSignalToWorker(
412+
workerId: number,
413+
signal: NodeJS.Signals = 'SIGHUP',
414+
): boolean {
415+
const info = this.workers.get(workerId)
416+
if (!info) return false
417+
try {
418+
info.process.kill(signal)
419+
return true
420+
} catch (err) {
421+
this.logger?.warn(
422+
`Failed sending ${signal} to worker ${workerId}: ${(err as Error).message}`,
423+
)
424+
return false
425+
}
426+
}
304427
}

src/interfaces/gateway.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ export interface ClusterConfig {
6363
* @default 60000 (1 minute)
6464
*/
6565
respawnThresholdTime?: number
66+
67+
/**
68+
* Exit the master process after graceful shutdown completes.
69+
* Set to false for test environments or embedded runners where exiting the
70+
* process is undesirable.
71+
* @default true
72+
*/
73+
exitOnShutdown?: boolean
6674
}
6775

6876
/**
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import { test, expect } from 'bun:test'
2+
import { ClusterManager } from '../../src'
3+
import { BunGateLogger } from '../../src'
4+
5+
const logger = new BunGateLogger({
6+
level: 'error',
7+
enableRequestLogging: false,
8+
})
9+
const workerScript = `${import.meta.dir}/fixtures/worker.ts`
10+
11+
function makeManager(
12+
overrides: Partial<import('../../src').ClusterConfig> = {},
13+
) {
14+
return new ClusterManager(
15+
{
16+
enabled: true,
17+
workers: 2,
18+
restartWorkers: false,
19+
shutdownTimeout: 2000,
20+
exitOnShutdown: false,
21+
...overrides,
22+
},
23+
logger,
24+
workerScript,
25+
)
26+
}
27+
28+
test('ClusterManager > start spawns requested workers and is idempotent', async () => {
29+
const cm = makeManager({ workers: 2 })
30+
await cm.start()
31+
expect(cm.getWorkerCount()).toBe(2)
32+
await cm.start() // idempotent
33+
expect(cm.getWorkerCount()).toBe(2)
34+
await (cm as any).gracefulShutdown()
35+
expect(cm.getWorkerCount()).toBe(0)
36+
})
37+
38+
test('ClusterManager > scale up and down dynamically', async () => {
39+
const cm = makeManager({ workers: 1 })
40+
await cm.start()
41+
expect(cm.getWorkerCount()).toBe(1)
42+
await cm.scaleUp(2)
43+
expect(cm.getWorkerCount()).toBe(3)
44+
await cm.scaleDown(2)
45+
// allow SIGTERM to process
46+
await new Promise((r) => setTimeout(r, 300))
47+
expect(cm.getWorkerCount()).toBe(1)
48+
await (cm as any).gracefulShutdown()
49+
})

0 commit comments

Comments
 (0)