@@ -57,6 +57,7 @@ export type RunsReplicationServiceOptions = {
5757 leaderLockExtendIntervalMs ?: number ;
5858 leaderLockAcquireAdditionalTimeMs ?: number ;
5959 leaderLockRetryIntervalMs ?: number ;
60+ leaderElectionRetryIntervalMs ?: number ;
6061 ackIntervalSeconds ?: number ;
6162 acknowledgeTimeoutMs ?: number ;
6263 logger ?: Logger ;
@@ -85,6 +86,7 @@ export type RunsReplicationServiceEvents = {
8586 batchFlushed : [
8687 { flushId : string ; taskRunInserts : TaskRunInsertArray [ ] ; payloadInserts : PayloadInsertArray [ ] }
8788 ] ;
89+ leaderElection : [ boolean ] ;
8890} ;
8991
9092export class RunsReplicationService {
@@ -109,6 +111,8 @@ export class RunsReplicationService {
109111 private _latestCommitEndLsn : string | null = null ;
110112 private _lastAcknowledgedLsn : string | null = null ;
111113 private _acknowledgeInterval : NodeJS . Timeout | null = null ;
114+ private _leaderElectionRetryTimer : NodeJS . Timeout | null = null ;
115+ private _leaderElectionRetryIntervalMs : number ;
112116 // Retry configuration
113117 private _insertMaxRetries : number ;
114118 private _insertBaseDelayMs : number ;
@@ -231,21 +235,27 @@ export class RunsReplicationService {
231235 tracer : options . tracer ,
232236 } ) ;
233237
234- this . _replicationClient . events . on ( "data" , async ( { lsn, log, parseDuration } ) => {
235- this . #handleData( lsn , log , parseDuration ) ;
236- } ) ;
238+ this . _replicationClient . events . on (
239+ "data" ,
240+ async ( { lsn, log, parseDuration } : { lsn : string ; log : PgoutputMessage ; parseDuration : bigint } ) => {
241+ this . #handleData( lsn , log , parseDuration ) ;
242+ }
243+ ) ;
237244
238- this . _replicationClient . events . on ( "heartbeat" , async ( { lsn, shouldRespond } ) => {
239- if ( this . _isShuttingDown ) return ;
240- if ( this . _isShutDownComplete ) return ;
245+ this . _replicationClient . events . on (
246+ "heartbeat" ,
247+ async ( { lsn, shouldRespond } : { lsn : string ; shouldRespond : boolean } ) => {
248+ if ( this . _isShuttingDown ) return ;
249+ if ( this . _isShutDownComplete ) return ;
241250
242- if ( shouldRespond ) {
243- this . _lastAcknowledgedLsn = lsn ;
244- await this . _replicationClient . acknowledge ( lsn ) ;
251+ if ( shouldRespond ) {
252+ this . _lastAcknowledgedLsn = lsn ;
253+ await this . _replicationClient . acknowledge ( lsn ) ;
254+ }
245255 }
246- } ) ;
256+ ) ;
247257
248- this . _replicationClient . events . on ( "error" , ( error ) => {
258+ this . _replicationClient . events . on ( "error" , ( error : Error ) => {
249259 this . logger . error ( "Replication client error" , {
250260 error,
251261 } ) ;
@@ -255,18 +265,31 @@ export class RunsReplicationService {
255265 this . logger . info ( "Replication client started" ) ;
256266 } ) ;
257267
258- this . _replicationClient . events . on ( "acknowledge" , ( { lsn } ) => {
268+ this . _replicationClient . events . on ( "acknowledge" , ( { lsn } : { lsn : string } ) => {
259269 this . logger . debug ( "Acknowledged" , { lsn } ) ;
260270 } ) ;
261271
262- this . _replicationClient . events . on ( "leaderElection" , ( isLeader ) => {
272+ this . _replicationClient . events . on ( "leaderElection" , ( isLeader : boolean ) => {
263273 this . logger . info ( "Leader election" , { isLeader } ) ;
274+
275+ // Forward the event to external listeners
276+ this . events . emit ( "leaderElection" , isLeader ) ;
277+
278+ if ( ! isLeader ) {
279+ // Failed to acquire leadership - reset subscription flag and schedule retry
280+ this . _isSubscribed = false ;
281+ this . #scheduleLeaderElectionRetry( ) ;
282+ } else {
283+ // Successfully acquired leadership - clear any pending retries
284+ this . #clearLeaderElectionRetry( ) ;
285+ }
264286 } ) ;
265287
266288 // Initialize retry configuration
267289 this . _insertMaxRetries = options . insertMaxRetries ?? 3 ;
268290 this . _insertBaseDelayMs = options . insertBaseDelayMs ?? 100 ;
269291 this . _insertMaxDelayMs = options . insertMaxDelayMs ?? 2000 ;
292+ this . _leaderElectionRetryIntervalMs = options . leaderElectionRetryIntervalMs ?? 5_000 ;
270293 }
271294
272295 public async shutdown ( ) {
@@ -276,6 +299,11 @@ export class RunsReplicationService {
276299
277300 this . logger . info ( "Initiating shutdown of runs replication service" ) ;
278301
302+ // Clear any pending leader election retries
303+ this . #clearLeaderElectionRetry( ) ;
304+
305+ this . _isSubscribed = false ;
306+
279307 if ( ! this . _currentTransaction ) {
280308 this . logger . info ( "No transaction to commit, shutting down immediately" ) ;
281309 await this . _replicationClient . stop ( ) ;
@@ -287,10 +315,23 @@ export class RunsReplicationService {
287315 }
288316
289317 async start ( ) {
318+ // Prevent multiple concurrent start attempts
319+ if ( this . _isSubscribed ) {
320+ this . logger . debug ( "Already subscribed, skipping start" ) ;
321+ return ;
322+ }
323+
324+ if ( this . _isShuttingDown || this . _isShutDownComplete ) {
325+ this . logger . debug ( "Service is shutting down, skipping start" ) ;
326+ return ;
327+ }
328+
290329 this . logger . info ( "Starting replication client" , {
291330 lastLsn : this . _latestCommitEndLsn ,
292331 } ) ;
293332
333+ this . _isSubscribed = true ;
334+
294335 await this . _replicationClient . subscribe ( this . _latestCommitEndLsn ?? undefined ) ;
295336
296337 this . _acknowledgeInterval = setInterval ( this . #acknowledgeLatestTransaction. bind ( this ) , 1000 ) ;
@@ -300,6 +341,11 @@ export class RunsReplicationService {
300341 async stop ( ) {
301342 this . logger . info ( "Stopping replication client" ) ;
302343
344+ // Clear any pending leader election retries
345+ this . #clearLeaderElectionRetry( ) ;
346+
347+ this . _isSubscribed = false ;
348+
303349 await this . _replicationClient . stop ( ) ;
304350
305351 if ( this . _acknowledgeInterval ) {
@@ -310,6 +356,11 @@ export class RunsReplicationService {
310356 async teardown ( ) {
311357 this . logger . info ( "Teardown replication client" ) ;
312358
359+ // Clear any pending leader election retries
360+ this . #clearLeaderElectionRetry( ) ;
361+
362+ this . _isSubscribed = false ;
363+
313364 await this . _replicationClient . teardown ( ) ;
314365
315366 if ( this . _acknowledgeInterval ) {
@@ -955,6 +1006,42 @@ export class RunsReplicationService {
9551006 return { data : parsedData } ;
9561007 }
9571008
1009+ #scheduleLeaderElectionRetry( ) {
1010+ // Clear any existing retry timer
1011+ this . #clearLeaderElectionRetry( ) ;
1012+
1013+ // Don't retry if we're shutting down or already shut down
1014+ if ( this . _isShuttingDown || this . _isShutDownComplete ) {
1015+ return ;
1016+ }
1017+
1018+ this . logger . info ( "Scheduling leader election retry" , {
1019+ retryIntervalMs : this . _leaderElectionRetryIntervalMs ,
1020+ } ) ;
1021+
1022+ this . _leaderElectionRetryTimer = setTimeout ( async ( ) => {
1023+ if ( this . _isShuttingDown || this . _isShutDownComplete ) {
1024+ return ;
1025+ }
1026+
1027+ this . logger . info ( "Retrying leader election after failed attempt" ) ;
1028+
1029+ try {
1030+ await this . start ( ) ;
1031+ } catch ( error ) {
1032+ this . logger . error ( "Error during leader election retry" , { error } ) ;
1033+ // If start() fails, the leaderElection event will be emitted with false,
1034+ // which will trigger another retry via #scheduleLeaderElectionRetry
1035+ }
1036+ } , this . _leaderElectionRetryIntervalMs ) ;
1037+ }
1038+
1039+ #clearLeaderElectionRetry( ) {
1040+ if ( this . _leaderElectionRetryTimer ) {
1041+ clearTimeout ( this . _leaderElectionRetryTimer ) ;
1042+ this . _leaderElectionRetryTimer = null ;
1043+ }
1044+ }
9581045}
9591046
9601047export type ConcurrentFlushSchedulerConfig < T > = {
0 commit comments