@@ -112,7 +112,7 @@ export class RunsReplicationService {
112112 private _lastAcknowledgedLsn : string | null = null ;
113113 private _acknowledgeInterval : NodeJS . Timeout | null = null ;
114114 private _leaderElectionRetryTimer : NodeJS . Timeout | null = null ;
115- private _leaderElectionRetryIntervalMs : number ;
115+ private _leaderElectionRetryCount = 0 ;
116116 // Retry configuration
117117 private _insertMaxRetries : number ;
118118 private _insertBaseDelayMs : number ;
@@ -259,10 +259,16 @@ export class RunsReplicationService {
259259 this . logger . error ( "Replication client error" , {
260260 error,
261261 } ) ;
262+
263+ if ( this . _replicationClient . isStopped && ! this . _isShuttingDown ) {
264+ this . _isSubscribed = false ;
265+ this . #scheduleLeaderElectionRetry( ) ;
266+ }
262267 } ) ;
263268
264269 this . _replicationClient . events . on ( "start" , ( ) => {
265270 this . logger . info ( "Replication client started" ) ;
271+ this . _isSubscribed = true ;
266272 } ) ;
267273
268274 this . _replicationClient . events . on ( "acknowledge" , ( { lsn } : { lsn : string } ) => {
@@ -271,25 +277,20 @@ export class RunsReplicationService {
271277
272278 this . _replicationClient . events . on ( "leaderElection" , ( isLeader : boolean ) => {
273279 this . logger . info ( "Leader election" , { isLeader } ) ;
274-
280+ this . _isSubscribed = isLeader ;
281+
275282 // Forward the event to external listeners
276283 this . events . emit ( "leaderElection" , isLeader ) ;
277-
278- if ( ! isLeader ) {
279- // Failed to acquire leadership - reset subscription flag and schedule retry
280- this . _isSubscribed = false ;
284+
285+ if ( ! isLeader && ! this . _isShuttingDown ) {
281286 this . #scheduleLeaderElectionRetry( ) ;
282- } else {
283- // Successfully acquired leadership - clear any pending retries
284- this . #clearLeaderElectionRetry( ) ;
285287 }
286288 } ) ;
287289
288290 // Initialize retry configuration
289291 this . _insertMaxRetries = options . insertMaxRetries ?? 3 ;
290292 this . _insertBaseDelayMs = options . insertBaseDelayMs ?? 100 ;
291293 this . _insertMaxDelayMs = options . insertMaxDelayMs ?? 2000 ;
292- this . _leaderElectionRetryIntervalMs = options . leaderElectionRetryIntervalMs ?? 5_000 ;
293294 }
294295
295296 public async shutdown ( ) {
@@ -299,10 +300,10 @@ export class RunsReplicationService {
299300
300301 this . logger . info ( "Initiating shutdown of runs replication service" ) ;
301302
302- // Clear any pending leader election retries
303- this . #clearLeaderElectionRetry ( ) ;
304-
305- this . _isSubscribed = false ;
303+ if ( this . _leaderElectionRetryTimer ) {
304+ clearTimeout ( this . _leaderElectionRetryTimer ) ;
305+ this . _leaderElectionRetryTimer = null ;
306+ }
306307
307308 if ( ! this . _currentTransaction ) {
308309 this . logger . info ( "No transaction to commit, shutting down immediately" ) ;
@@ -334,30 +335,93 @@ export class RunsReplicationService {
334335
335336 await this . _replicationClient . subscribe ( this . _latestCommitEndLsn ?? undefined ) ;
336337
338+ if ( this . _isSubscribed ) {
339+ this . #startAcknowledgeInterval( ) ;
340+ this . _concurrentFlushScheduler . start ( ) ;
341+ }
342+ }
343+
344+ #startAcknowledgeInterval( ) {
345+ if ( this . _acknowledgeInterval ) {
346+ clearInterval ( this . _acknowledgeInterval ) ;
347+ }
337348 this . _acknowledgeInterval = setInterval ( this . #acknowledgeLatestTransaction. bind ( this ) , 1000 ) ;
338- this . _concurrentFlushScheduler . start ( ) ;
349+ }
350+
351+ #scheduleLeaderElectionRetry( ) {
352+ if ( this . _isShuttingDown || this . _isShutDownComplete ) {
353+ return ;
354+ }
355+
356+ if ( this . _leaderElectionRetryTimer ) {
357+ clearTimeout ( this . _leaderElectionRetryTimer ) ;
358+ }
359+
360+ const baseDelayMs = ( this . options . leaderLockTimeoutMs ?? 30_000 ) + 5_000 ;
361+ const maxDelayMs = 5 * 60 * 1_000 ; // 5 minutes max
362+ const retryDelayMs = Math . min ( baseDelayMs * Math . pow ( 2 , this . _leaderElectionRetryCount ) , maxDelayMs ) ;
363+
364+ this . _leaderElectionRetryCount ++ ;
365+
366+ this . logger . info ( "Scheduling leader election retry" , {
367+ retryDelayMs,
368+ retryCount : this . _leaderElectionRetryCount ,
369+ } ) ;
370+
371+ this . _leaderElectionRetryTimer = setTimeout ( async ( ) => {
372+ if ( this . _isShuttingDown || this . _isShutDownComplete || this . _isSubscribed ) {
373+ return ;
374+ }
375+
376+ const hasLeader = await this . _replicationClient . hasLeader ( ) ;
377+
378+ if ( hasLeader ) {
379+ this . logger . info ( "Leader exists, skipping retry" , {
380+ retryCount : this . _leaderElectionRetryCount ,
381+ } ) ;
382+ this . #scheduleLeaderElectionRetry( ) ;
383+ return ;
384+ }
385+
386+ this . logger . info ( "No leader found, attempting to become leader" , {
387+ retryCount : this . _leaderElectionRetryCount ,
388+ } ) ;
389+
390+ await this . _replicationClient . subscribe ( this . _latestCommitEndLsn ?? undefined ) ;
391+
392+ if ( this . _isSubscribed ) {
393+ this . _leaderElectionRetryCount = 0 ;
394+ this . #startAcknowledgeInterval( ) ;
395+ this . _concurrentFlushScheduler . start ( ) ;
396+ }
397+ } , retryDelayMs ) ;
339398 }
340399
341400 async stop ( ) {
342401 this . logger . info ( "Stopping replication client" ) ;
343402
344- // Clear any pending leader election retries
345- this . #clearLeaderElectionRetry ( ) ;
346-
347- this . _isSubscribed = false ;
403+ if ( this . _leaderElectionRetryTimer ) {
404+ clearTimeout ( this . _leaderElectionRetryTimer ) ;
405+ this . _leaderElectionRetryTimer = null ;
406+ }
348407
349408 await this . _replicationClient . stop ( ) ;
350409
351410 if ( this . _acknowledgeInterval ) {
352411 clearInterval ( this . _acknowledgeInterval ) ;
353412 }
413+
414+ this . _isSubscribed = false ;
415+ this . _leaderElectionRetryCount = 0 ;
354416 }
355417
356418 async teardown ( ) {
357419 this . logger . info ( "Teardown replication client" ) ;
358420
359- // Clear any pending leader election retries
360- this . #clearLeaderElectionRetry( ) ;
421+ if ( this . _leaderElectionRetryTimer ) {
422+ clearTimeout ( this . _leaderElectionRetryTimer ) ;
423+ this . _leaderElectionRetryTimer = null ;
424+ }
361425
362426 this . _isSubscribed = false ;
363427
@@ -1005,43 +1069,6 @@ export class RunsReplicationService {
10051069
10061070 return { data : parsedData } ;
10071071 }
1008-
1009- #scheduleLeaderElectionRetry( ) {
1010- // Clear any existing retry timer
1011- this . #clearLeaderElectionRetry( ) ;
1012-
1013- // Don't retry if we're shutting down or already shut down
1014- if ( this . _isShuttingDown || this . _isShutDownComplete ) {
1015- return ;
1016- }
1017-
1018- this . logger . info ( "Scheduling leader election retry" , {
1019- retryIntervalMs : this . _leaderElectionRetryIntervalMs ,
1020- } ) ;
1021-
1022- this . _leaderElectionRetryTimer = setTimeout ( async ( ) => {
1023- if ( this . _isShuttingDown || this . _isShutDownComplete ) {
1024- return ;
1025- }
1026-
1027- this . logger . info ( "Retrying leader election after failed attempt" ) ;
1028-
1029- try {
1030- await this . start ( ) ;
1031- } catch ( error ) {
1032- this . logger . error ( "Error during leader election retry" , { error } ) ;
1033- // If start() fails, the leaderElection event will be emitted with false,
1034- // which will trigger another retry via #scheduleLeaderElectionRetry
1035- }
1036- } , this . _leaderElectionRetryIntervalMs ) ;
1037- }
1038-
1039- #clearLeaderElectionRetry( ) {
1040- if ( this . _leaderElectionRetryTimer ) {
1041- clearTimeout ( this . _leaderElectionRetryTimer ) ;
1042- this . _leaderElectionRetryTimer = null ;
1043- }
1044- }
10451072}
10461073
10471074export type ConcurrentFlushSchedulerConfig < T > = {
0 commit comments