@@ -57,7 +57,6 @@ export type RunsReplicationServiceOptions = {
5757 leaderLockExtendIntervalMs ?: number ;
5858 leaderLockAcquireAdditionalTimeMs ?: number ;
5959 leaderLockRetryIntervalMs ?: number ;
60- leaderElectionRetryIntervalMs ?: number ;
6160 ackIntervalSeconds ?: number ;
6261 acknowledgeTimeoutMs ?: number ;
6362 logger ?: Logger ;
@@ -86,7 +85,6 @@ export type RunsReplicationServiceEvents = {
8685 batchFlushed : [
8786 { flushId : string ; taskRunInserts : TaskRunInsertArray [ ] ; payloadInserts : PayloadInsertArray [ ] }
8887 ] ;
89- leaderElection : [ boolean ] ;
9088} ;
9189
9290export class RunsReplicationService {
@@ -235,27 +233,21 @@ export class RunsReplicationService {
235233 tracer : options . tracer ,
236234 } ) ;
237235
238- this . _replicationClient . events . on (
239- "data" ,
240- async ( { lsn, log, parseDuration } : { lsn : string ; log : PgoutputMessage ; parseDuration : bigint } ) => {
241- this . #handleData( lsn , log , parseDuration ) ;
242- }
243- ) ;
236+ this . _replicationClient . events . on ( "data" , async ( { lsn, log, parseDuration } ) => {
237+ this . #handleData( lsn , log , parseDuration ) ;
238+ } ) ;
244239
245- this . _replicationClient . events . on (
246- "heartbeat" ,
247- async ( { lsn, shouldRespond } : { lsn : string ; shouldRespond : boolean } ) => {
248- if ( this . _isShuttingDown ) return ;
249- if ( this . _isShutDownComplete ) return ;
240+ this . _replicationClient . events . on ( "heartbeat" , async ( { lsn, shouldRespond } ) => {
241+ if ( this . _isShuttingDown ) return ;
242+ if ( this . _isShutDownComplete ) return ;
250243
251- if ( shouldRespond ) {
252- this . _lastAcknowledgedLsn = lsn ;
253- await this . _replicationClient . acknowledge ( lsn ) ;
254- }
244+ if ( shouldRespond ) {
245+ this . _lastAcknowledgedLsn = lsn ;
246+ await this . _replicationClient . acknowledge ( lsn ) ;
255247 }
256- ) ;
248+ } ) ;
257249
258- this . _replicationClient . events . on ( "error" , ( error : Error ) => {
250+ this . _replicationClient . events . on ( "error" , ( error ) => {
259251 this . logger . error ( "Replication client error" , {
260252 error,
261253 } ) ;
@@ -271,17 +263,14 @@ export class RunsReplicationService {
271263 this . _isSubscribed = true ;
272264 } ) ;
273265
274- this . _replicationClient . events . on ( "acknowledge" , ( { lsn } : { lsn : string } ) => {
266+ this . _replicationClient . events . on ( "acknowledge" , ( { lsn } ) => {
275267 this . logger . debug ( "Acknowledged" , { lsn } ) ;
276268 } ) ;
277269
278- this . _replicationClient . events . on ( "leaderElection" , ( isLeader : boolean ) => {
270+ this . _replicationClient . events . on ( "leaderElection" , ( isLeader ) => {
279271 this . logger . info ( "Leader election" , { isLeader } ) ;
280272 this . _isSubscribed = isLeader ;
281273
282- // Forward the event to external listeners
283- this . events . emit ( "leaderElection" , isLeader ) ;
284-
285274 if ( ! isLeader && ! this . _isShuttingDown ) {
286275 this . #scheduleLeaderElectionRetry( ) ;
287276 }
0 commit comments