Skip to content

Commit 4fbdf26

Browse files
committed
chore: wip
1 parent 64cf43d commit 4fbdf26

6 files changed

Lines changed: 994 additions & 26 deletions

File tree

src/index.ts

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,15 @@
11
export { BatchProcessor } from './batch'
2-
export { type CleanupOptions, CleanupService } from './cleanup'
3-
export * from './config'
4-
export { type CronJobOptions, CronScheduler } from './cron-scheduler'
52
export { DeadLetterQueue } from './dead-letter-queue'
6-
export { DistributedLock, type LockOptions } from './distributed-lock'
3+
export { DistributedLock } from './distributed-lock'
74
export { JobEvents } from './events'
85
export { QueueGroup } from './group'
96
export { Job } from './job'
7+
export { LeaderElection } from './leader-election'
108
export { createLogger } from './logger'
11-
export { Metrics } from './metrics'
129
export { QueueObservable } from './observable'
1310
export { PriorityQueue } from './priority-queue'
1411
export { Queue } from './queue'
1512
export { RateLimiter } from './rate-limiter'
16-
export type { RateLimitResult } from './rate-limiter'
17-
export { StalledJobChecker } from './stalled-checker'
1813
export * from './types'
19-
export type { DeadLetterQueueOptions } from './types'
20-
export * from './utils'
14+
export { WorkCoordinator } from './work-coordinator'
2115
export { Worker } from './worker'

src/leader-election.ts

Lines changed: 349 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,349 @@
1+
import type { RedisClient } from 'bun'
2+
import { createLogger } from './logger'
3+
import { generateId } from './utils'
4+
5+
export interface LeaderElectionOptions {
6+
/**
7+
* Key prefix for leader election
8+
*/
9+
keyPrefix?: string
10+
11+
/**
12+
* Duration (in ms) for which leadership is valid (default: 10000)
13+
*/
14+
heartbeatInterval?: number
15+
16+
/**
17+
* Duration (in ms) for how long a node can be unreachable before leadership is transferred
18+
*/
19+
leaderTimeout?: number
20+
21+
/**
22+
* Callback when instance becomes the leader
23+
*/
24+
onBecomeLeader?: () => void
25+
26+
/**
27+
* Callback when instance loses leadership
28+
*/
29+
onLeadershipLost?: () => void
30+
31+
/**
32+
* Callback when leader changes
33+
*/
34+
onLeaderChanged?: (leaderId: string) => void
35+
36+
/**
37+
* Instance ID (defaults to auto-generated)
38+
*/
39+
instanceId?: string
40+
}
41+
42+
/**
43+
* Implements leader election for distributed queue instances
44+
* using Redis for coordination
45+
*/
46+
export class LeaderElection {
47+
private readonly redisClient: RedisClient
48+
private readonly keyPrefix: string
49+
private readonly instanceId: string
50+
private readonly logger = createLogger('leader')
51+
private readonly heartbeatInterval: number
52+
private readonly leaderTimeout: number
53+
private readonly options: LeaderElectionOptions
54+
private heartbeatTimer: NodeJS.Timeout | null = null
55+
private watchdogTimer: NodeJS.Timeout | null = null
56+
private isLeader = false
57+
private currentLeaderId: string | null = null
58+
59+
constructor(redisClient: RedisClient, options: LeaderElectionOptions = {}) {
60+
this.redisClient = redisClient
61+
this.keyPrefix = options.keyPrefix || 'bun-queue:leader'
62+
this.instanceId = options.instanceId || generateId()
63+
this.heartbeatInterval = options.heartbeatInterval || 10000 // 10 seconds default
64+
this.leaderTimeout = options.leaderTimeout || this.heartbeatInterval * 3 // 3x heartbeat by default
65+
this.options = options
66+
67+
this.logger.debug(`Leader election initialized for instance ${this.instanceId}`)
68+
}
69+
70+
/**
71+
* Start the leader election process
72+
*/
73+
async start(): Promise<void> {
74+
this.logger.info(`Starting leader election for instance ${this.instanceId}`)
75+
76+
// Start the watchdog to monitor leader
77+
this.startWatchdog()
78+
79+
// Try to elect ourselves as the leader
80+
await this.electLeader()
81+
}
82+
83+
/**
84+
* Stop participating in leader election
85+
*/
86+
async stop(): Promise<void> {
87+
this.logger.info(`Stopping leader election for instance ${this.instanceId}`)
88+
89+
// Clear timers
90+
if (this.heartbeatTimer) {
91+
clearInterval(this.heartbeatTimer)
92+
this.heartbeatTimer = null
93+
}
94+
95+
if (this.watchdogTimer) {
96+
clearInterval(this.watchdogTimer)
97+
this.watchdogTimer = null
98+
}
99+
100+
// If we're the leader, gracefully step down
101+
if (this.isLeader) {
102+
await this.stepDown()
103+
}
104+
}
105+
106+
/**
107+
* Check if this instance is the current leader
108+
*/
109+
isCurrentLeader(): boolean {
110+
return this.isLeader
111+
}
112+
113+
/**
114+
* Get the ID of the current leader
115+
*/
116+
async getCurrentLeader(): Promise<string | null> {
117+
try {
118+
const leaderKey = `${this.keyPrefix}:current`
119+
const leaderInfo = await this.redisClient.get(leaderKey)
120+
121+
if (!leaderInfo) {
122+
return null
123+
}
124+
125+
// Parse the leader info (format: "instanceId:timestamp")
126+
const [leaderId, timestampStr] = leaderInfo.split(':')
127+
const timestamp = Number.parseInt(timestampStr, 10)
128+
129+
// Check if the leader has timed out
130+
if (Date.now() - timestamp > this.leaderTimeout) {
131+
this.logger.debug(`Leader ${leaderId} has timed out, no current leader`)
132+
return null
133+
}
134+
135+
return leaderId
136+
}
137+
catch (err) {
138+
this.logger.error(`Error getting current leader: ${(err as Error).message}`)
139+
return null
140+
}
141+
}
142+
143+
/**
144+
* Try to elect ourselves as the leader
145+
*/
146+
private async electLeader(): Promise<void> {
147+
try {
148+
const leaderKey = `${this.keyPrefix}:current`
149+
const now = Date.now()
150+
const newLeaderInfo = `${this.instanceId}:${now}`
151+
152+
// Check if there's an existing leader
153+
const currentLeader = await this.getCurrentLeader()
154+
155+
if (currentLeader) {
156+
// There's already a valid leader
157+
if (currentLeader !== this.instanceId && this.currentLeaderId !== currentLeader) {
158+
this.logger.info(`Instance ${currentLeader} is the current leader`)
159+
this.currentLeaderId = currentLeader
160+
161+
if (this.options.onLeaderChanged) {
162+
this.options.onLeaderChanged(currentLeader)
163+
}
164+
}
165+
166+
// If we're already the leader, just send a heartbeat
167+
if (currentLeader === this.instanceId && !this.isLeader) {
168+
this.becomeLeader()
169+
}
170+
return
171+
}
172+
173+
// Try to become the leader using SET NX
174+
const result = await this.redisClient.send('SET', [
175+
leaderKey,
176+
newLeaderInfo,
177+
'NX',
178+
'PX',
179+
this.leaderTimeout.toString(),
180+
])
181+
182+
if (result === 'OK') {
183+
// We became the leader
184+
this.becomeLeader()
185+
}
186+
else {
187+
// Someone else beat us to it, check who it is
188+
await this.checkLeadership()
189+
}
190+
}
191+
catch (err) {
192+
this.logger.error(`Error in leader election: ${(err as Error).message}`)
193+
}
194+
}
195+
196+
/**
197+
* Become the leader and start sending heartbeats
198+
*/
199+
private becomeLeader(): void {
200+
if (this.isLeader)
201+
return
202+
203+
this.isLeader = true
204+
this.currentLeaderId = this.instanceId
205+
this.logger.info(`Instance ${this.instanceId} became the leader`)
206+
207+
// Call the handler if provided
208+
if (this.options.onBecomeLeader) {
209+
this.options.onBecomeLeader()
210+
}
211+
212+
// Start sending heartbeats
213+
this.startHeartbeat()
214+
}
215+
216+
/**
217+
* Start sending heartbeats to maintain leadership
218+
*/
219+
private startHeartbeat(): void {
220+
if (this.heartbeatTimer) {
221+
clearInterval(this.heartbeatTimer)
222+
}
223+
224+
this.heartbeatTimer = setInterval(async () => {
225+
try {
226+
if (!this.isLeader) {
227+
clearInterval(this.heartbeatTimer as NodeJS.Timeout)
228+
this.heartbeatTimer = null
229+
return
230+
}
231+
232+
const leaderKey = `${this.keyPrefix}:current`
233+
const now = Date.now()
234+
const leaderInfo = `${this.instanceId}:${now}`
235+
236+
// Update the leader key with fresh timestamp
237+
const result = await this.redisClient.send('SET', [
238+
leaderKey,
239+
leaderInfo,
240+
'PX',
241+
this.leaderTimeout.toString(),
242+
])
243+
244+
if (result !== 'OK') {
245+
this.logger.warn(`Failed to send heartbeat, may have lost leadership`)
246+
// We'll check leadership in the next watchdog cycle
247+
}
248+
else {
249+
this.logger.debug(`Sent leader heartbeat`)
250+
}
251+
}
252+
catch (err) {
253+
this.logger.error(`Error sending heartbeat: ${(err as Error).message}`)
254+
}
255+
}, Math.max(1000, this.heartbeatInterval / 3)) // At least once per second but no more than 1/3 of timeout
256+
}
257+
258+
/**
259+
* Start the watchdog to monitor leadership and trigger elections
260+
*/
261+
private startWatchdog(): void {
262+
if (this.watchdogTimer) {
263+
clearInterval(this.watchdogTimer)
264+
}
265+
266+
this.watchdogTimer = setInterval(async () => {
267+
try {
268+
await this.checkLeadership()
269+
}
270+
catch (err) {
271+
this.logger.error(`Error in watchdog: ${(err as Error).message}`)
272+
}
273+
}, this.heartbeatInterval)
274+
}
275+
276+
/**
277+
* Check leadership status and trigger election if needed
278+
*/
279+
private async checkLeadership(): Promise<void> {
280+
const currentLeader = await this.getCurrentLeader()
281+
282+
// If we think we're the leader but we're not
283+
if (this.isLeader && currentLeader !== this.instanceId) {
284+
this.loseLeadership()
285+
}
286+
287+
// If there's no leader, try to become one
288+
if (!currentLeader) {
289+
await this.electLeader()
290+
return
291+
}
292+
293+
// If leader changed, trigger the callback
294+
if (currentLeader !== this.currentLeaderId) {
295+
this.currentLeaderId = currentLeader
296+
this.logger.info(`Leader changed to ${currentLeader}`)
297+
298+
if (this.options.onLeaderChanged) {
299+
this.options.onLeaderChanged(currentLeader)
300+
}
301+
}
302+
}
303+
304+
/**
305+
* Handle losing leadership
306+
*/
307+
private loseLeadership(): void {
308+
if (!this.isLeader)
309+
return
310+
311+
this.isLeader = false
312+
this.logger.info(`Instance ${this.instanceId} lost leadership`)
313+
314+
// Stop heartbeats
315+
if (this.heartbeatTimer) {
316+
clearInterval(this.heartbeatTimer)
317+
this.heartbeatTimer = null
318+
}
319+
320+
// Call the handler if provided
321+
if (this.options.onLeadershipLost) {
322+
this.options.onLeadershipLost()
323+
}
324+
}
325+
326+
/**
327+
* Voluntarily step down as leader
328+
*/
329+
private async stepDown(): Promise<void> {
330+
if (!this.isLeader)
331+
return
332+
333+
try {
334+
const leaderKey = `${this.keyPrefix}:current`
335+
336+
// Only delete if we're still the leader
337+
const currentValue = await this.redisClient.get(leaderKey)
338+
if (currentValue && currentValue.startsWith(this.instanceId)) {
339+
await this.redisClient.del(leaderKey)
340+
}
341+
342+
this.loseLeadership()
343+
this.logger.info(`Instance ${this.instanceId} stepped down as leader`)
344+
}
345+
catch (err) {
346+
this.logger.error(`Error stepping down as leader: ${(err as Error).message}`)
347+
}
348+
}
349+
}

0 commit comments

Comments
 (0)