@@ -30,10 +30,8 @@ import (
3030 "github.com/riverqueue/river/rivershared/riverpilot"
3131 "github.com/riverqueue/river/rivershared/riversharedmaintenance"
3232 "github.com/riverqueue/river/rivershared/startstop"
33- "github.com/riverqueue/river/rivershared/testsignal"
3433 "github.com/riverqueue/river/rivershared/util/dbutil"
3534 "github.com/riverqueue/river/rivershared/util/maputil"
36- "github.com/riverqueue/river/rivershared/util/serviceutil"
3735 "github.com/riverqueue/river/rivershared/util/sliceutil"
3836 "github.com/riverqueue/river/rivershared/util/testutil"
3937 "github.com/riverqueue/river/rivershared/util/valutil"
@@ -605,18 +603,10 @@ type Client[TTx any] struct {
605603 notifier * notifier.Notifier // may be nil in poll-only mode
606604 periodicJobs * PeriodicJobBundle
607605 pilot riverpilot.Pilot
608- producersByQueueName map [string ]* producer
609- queueMaintainer * maintenance.QueueMaintainer
610-
611- // queueMaintainerEpoch is incremented each time leadership is gained,
612- // giving each tryStartQueueMaintainer goroutine a term number.
613- // queueMaintainerMu serializes epoch checks with Stop calls so that a
614- // stale goroutine from an older term cannot tear down a maintainer
615- // started by a newer term.
616- queueMaintainerEpoch int64
617- queueMaintainerMu sync.Mutex
618-
619- queues * QueueBundle
606+ producersByQueueName map [string ]* producer
607+ queueMaintainer * maintenance.QueueMaintainer
608+ queueMaintainerLeader * maintenance.QueueMaintainerLeader
609+ queues * QueueBundle
620610 services []startstop.Service
621611 stopped <- chan struct {}
622612 subscriptionManager * subscriptionManager
@@ -629,23 +619,16 @@ type Client[TTx any] struct {
629619
630620// Test-only signals.
631621type clientTestSignals struct {
632- electedLeader testsignal.TestSignal [struct {}] // notifies when elected leader
633- queueMaintainerStartError testsignal.TestSignal [error ] // notifies on each failed queue maintainer start attempt
634- queueMaintainerStartRetriesExhausted testsignal.TestSignal [struct {}] // notifies when leader resignation is requested after all queue maintainer start retries have been exhausted
635-
636- jobCleaner * maintenance.JobCleanerTestSignals
637- jobRescuer * maintenance.JobRescuerTestSignals
638- jobScheduler * maintenance.JobSchedulerTestSignals
639- periodicJobEnqueuer * maintenance.PeriodicJobEnqueuerTestSignals
640- queueCleaner * maintenance.QueueCleanerTestSignals
641- reindexer * maintenance.ReindexerTestSignals
622+ jobCleaner * maintenance.JobCleanerTestSignals
623+ jobRescuer * maintenance.JobRescuerTestSignals
624+ jobScheduler * maintenance.JobSchedulerTestSignals
625+ periodicJobEnqueuer * maintenance.PeriodicJobEnqueuerTestSignals
626+ queueCleaner * maintenance.QueueCleanerTestSignals
627+ queueMaintainerLeader * maintenance.QueueMaintainerLeaderTestSignals
628+ reindexer * maintenance.ReindexerTestSignals
642629}
643630
644631func (ts * clientTestSignals ) Init (tb testutil.TestingTB ) {
645- ts .electedLeader .Init (tb )
646- ts .queueMaintainerStartError .Init (tb )
647- ts .queueMaintainerStartRetriesExhausted .Init (tb )
648-
649632 if ts .jobCleaner != nil {
650633 ts .jobCleaner .Init (tb )
651634 }
@@ -661,6 +644,9 @@ func (ts *clientTestSignals) Init(tb testutil.TestingTB) {
661644 if ts .queueCleaner != nil {
662645 ts .queueCleaner .Init (tb )
663646 }
647+ if ts .queueMaintainerLeader != nil {
648+ ts .queueMaintainerLeader .Init (tb )
649+ }
664650 if ts .reindexer != nil {
665651 ts .reindexer .Init (tb )
666652 }
@@ -867,9 +853,6 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
867853 client .services = append (client .services ,
868854 startstop .StartStopFunc (client .logStatsLoop ))
869855
870- client .services = append (client .services ,
871- startstop .StartStopFunc (client .handleLeadershipChangeLoop ))
872-
873856 if pluginPilot != nil {
874857 client .services = append (client .services , pluginPilot .PluginServices ()... )
875858 }
@@ -972,6 +955,15 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
972955 if config .TestOnly {
973956 client .queueMaintainer .StaggerStartupDisable (true )
974957 }
958+
959+ client .queueMaintainerLeader = maintenance .NewQueueMaintainerLeader (archetype , & maintenance.QueueMaintainerLeaderConfig {
960+ ClientID : config .ID ,
961+ Elector : client .elector ,
962+ QueueMaintainer : client .queueMaintainer ,
963+ RequestResignFunc : client .clientNotifyBundle .RequestResign ,
964+ })
965+ client .services = append (client .services , client .queueMaintainerLeader )
966+ client .testSignals .queueMaintainerLeader = & client .queueMaintainerLeader .TestSignals
975967 }
976968
977969 return client , nil
@@ -1292,147 +1284,6 @@ func (c *Client[TTx]) logStatsLoop(ctx context.Context, shouldStart bool, starte
12921284 return nil
12931285}
12941286
1295- func (c * Client [TTx ]) handleLeadershipChangeLoop (ctx context.Context , shouldStart bool , started , stopped func ()) error {
1296- if ! shouldStart {
1297- return nil
1298- }
1299-
1300- go func () {
1301- started ()
1302- defer stopped () // this defer should come first so it's last out
1303-
1304- sub := c .elector .Listen ()
1305- defer sub .Unlisten ()
1306-
1307- // Cancel function for an in-progress tryStartQueueMaintainer. If
1308- // leadership is lost while the start process is still retrying, used to
1309- // abort it promptly instead of waiting for retries to finish.
1310- var cancelQueueMaintainerStart context.CancelCauseFunc = func (_ error ) {}
1311-
1312- for {
1313- select {
1314- case <- ctx .Done ():
1315- cancelQueueMaintainerStart (context .Cause (ctx ))
1316- return
1317-
1318- case notification := <- sub .C ():
1319- c .baseService .Logger .DebugContext (ctx , c .baseService .Name + ": Election change received" ,
1320- slog .String ("client_id" , c .config .ID ), slog .Bool ("is_leader" , notification .IsLeader ))
1321-
1322- switch {
1323- case notification .IsLeader :
1324- // Starting the queue maintainer takes time, so send the
1325- // test signal first. Tests waiting on it can receive it,
1326- // cancel the queue maintainer start, and finish faster.
1327- c .testSignals .electedLeader .Signal (struct {}{})
1328-
1329- // Start the queue maintainer with retries and exponential
1330- // backoff in a separate goroutine so the leadership change
1331- // loop remains responsive to new notifications. startCtx is
1332- // used for cancellation in case leadership is lost while
1333- // retries are in progress.
1334- //
1335- // Epoch is incremented so stale tryStartQueueMaintainer
1336- // goroutines from a previous term cannot call Stop after a
1337- // new term has begun.
1338- var startCtx context.Context
1339- startCtx , cancelQueueMaintainerStart = context .WithCancelCause (ctx )
1340-
1341- c .queueMaintainerMu .Lock ()
1342- c .queueMaintainerEpoch ++
1343- epoch := c .queueMaintainerEpoch
1344- c .queueMaintainerMu .Unlock ()
1345-
1346- go c .tryStartQueueMaintainer (startCtx , epoch )
1347-
1348- default :
1349- // Cancel any in-progress start attempts before stopping.
1350- // Send a startstop.ErrStop to make sure services like
1351- // Reindexer run any specific cleanup code for stops.
1352- cancelQueueMaintainerStart (startstop .ErrStop )
1353- cancelQueueMaintainerStart = func (_ error ) {}
1354-
1355- c .queueMaintainer .Stop ()
1356- }
1357- }
1358- }
1359- }()
1360-
1361- return nil
1362- }
1363-
1364- // Tries to start the queue maintainer after gaining leadership. We allow some
1365- // retries with exponential backoff in case of failure, and in case the queue
1366- // maintainer can't be started, we request resignation to allow another client
1367- // to try and take over.
1368- func (c * Client [TTx ]) tryStartQueueMaintainer (ctx context.Context , epoch int64 ) {
1369- const maxStartAttempts = 3
1370-
1371- ctxCancelled := func () bool {
1372- if ctx .Err () != nil {
1373- c .baseService .Logger .InfoContext (ctx , c .baseService .Name + ": Queue maintainer start cancelled" )
1374- return true
1375- }
1376- return false
1377- }
1378-
1379- // stopIfCurrentEpoch atomically checks whether this goroutine's epoch is
1380- // still the active one and calls Stop only if it is. Combined with the
1381- // epoch increment in handleLeadershipChangeLoop, prevents stale goroutine
1382- // from stopping a maintainer started by a newer leadership term.
1383- stopIfCurrentEpoch := func () bool {
1384- c .queueMaintainerMu .Lock ()
1385- defer c .queueMaintainerMu .Unlock ()
1386-
1387- if c .queueMaintainerEpoch != epoch {
1388- return false
1389- }
1390-
1391- c .queueMaintainer .Stop ()
1392- return true
1393- }
1394-
1395- var lastErr error
1396- for attempt := 1 ; attempt <= maxStartAttempts ; attempt ++ {
1397- if ctxCancelled () {
1398- return
1399- }
1400-
1401- if lastErr = c .queueMaintainer .Start (ctx ); lastErr == nil {
1402- return
1403- }
1404-
1405- c .baseService .Logger .ErrorContext (ctx , c .baseService .Name + ": Error starting queue maintainer" ,
1406- slog .String ("err" , lastErr .Error ()), slog .Int ("attempt" , attempt ))
1407-
1408- c .testSignals .queueMaintainerStartError .Signal (lastErr )
1409-
1410- // Stop the queue maintainer to fully reset its state (and any
1411- // sub-services) before retrying. The epoch check ensures a stale
1412- // goroutine cannot stop a maintainer from a newer leadership term.
1413- if ! stopIfCurrentEpoch () {
1414- return
1415- }
1416-
1417- if attempt < maxStartAttempts {
1418- serviceutil .CancellableSleep (ctx , serviceutil .ExponentialBackoff (attempt , serviceutil .MaxAttemptsBeforeResetDefault ))
1419- }
1420- }
1421-
1422- if ctxCancelled () {
1423- return
1424- }
1425-
1426- c .baseService .Logger .ErrorContext (ctx , c .baseService .Name + ": Queue maintainer failed to start after all attempts, requesting leader resignation" ,
1427- slog .String ("err" , lastErr .Error ()))
1428-
1429- c .testSignals .queueMaintainerStartRetriesExhausted .Signal (struct {}{})
1430-
1431- if err := c .clientNotifyBundle .RequestResign (ctx ); err != nil {
1432- c .baseService .Logger .ErrorContext (ctx , c .baseService .Name + ": Error requesting leader resignation" , slog .String ("err" , err .Error ()))
1433- }
1434- }
1435-
14361287// Driver exposes the underlying driver used by the client.
14371288//
14381289// API is not stable. DO NOT USE.
0 commit comments