Skip to content

Commit a5c8ae8

Browse files
committed
Add leadership "domains" so multiple Rivers can operate in one schema
We've gotten a couple requests so far (see #342 and #1105) to be able to start multiple River clients targeting different queues within the same database/schema, and giving them the capacity to operate independently enough to be functional. This is currently not possible because a single leader is elected given a single schema and it handles all maintenance operations including non-queue ones like periodic job enqueuing. Here, add the idea of a `LeaderDomain`. This lets a user set the "domain" on which a client will elect its leader and allowing multiple leaders to be elected in a single schema. Each leader will run its own maintenance services. Setting `LeaderDomain` causes the additional effect of having maintenance services start to operate only on the queues that their client is configured for. The idea here is to give us backwards compatibility in that the default behavior (in case of an unset `LeaderDomain`) is the same, but providing a path for multiple leaders to be interoperable with each other. There are still a few edges: for example, reindexing is not queue specific, so multiple leaders could be running a reindexer. I've provided guidance in the config documentation that ideally, all clients but one should have their reindexer disabled.
1 parent c6d65e5 commit a5c8ae8

48 files changed

Lines changed: 1118 additions & 269 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Added `Config.LeaderDomain` to allow multiple River clients to be elected leader within a single schema/database and run maintenance services on only their configured queues. [PR #1113](https://github.com/riverqueue/river/pull/1113).
13+
1014
### Fixed
1115

1216
- Fix unsafe concurrent producer map access in client. [PR #1236](https://github.com/riverqueue/river/pull/1236).

client.go

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"log/slog"
1010
"os"
1111
"regexp"
12+
"slices"
1213
"strings"
1314
"sync"
1415
"time"
@@ -220,6 +221,48 @@ type Config struct {
220221
// Jobs may have their own specific hooks by implementing JobArgsWithHooks.
221222
Hooks []rivertype.Hook
222223

224+
// LeaderDomain is an optional "domain" string to use for leader election.
225+
// Different clients sharing the same River schema can elect multiple
226+
// leaders as long as they're using different domains, with one leader
227+
// elected per domain. This is an advanced feature that will almost never be
228+
// needed. Don't set this unless you know what you're doing.
229+
//
230+
// Setting this value also triggers the related behavior that maintenance
231+
// services start to only operate on the queues they're configured on. So
232+
// for example, given client1 handling queue_a and queue_b and client2
233+
// handling queue_c and queue_d, whichever client is elected leader will end
234+
// up running all maintenance services for all queues (queue_a, queue_b,
235+
// queue_c, and queue_d). But if client1 is using domain "domain1" and
236+
// client2 is using domain "domain2", then client1 (elected in domain1) will
237+
// only run maintenance services on queue_a and queue_b, while client2
238+
// (elected in domain2) will run maintenance services on queue_c and
239+
// queue_d.
240+
//
241+
// A warning though that River *does not protect against configuration
242+
// mistakes*. If client1 on domain1 is configured for queue_a and queue_b,
243+
// and client2 on domain2 is *also* configured for queue_a and queue_b, then
244+
// both clients may end up running maintenance services on the same queues
245+
// at the same time. It's the caller's responsibility to ensure that doesn't
246+
// happen.
247+
//
248+
// Left empty or use of the special value "default" causes the client to
249+
// operate on all queues. When setting this value to non-empty
250+
// non-"default", no other clients should be left empty or use "default"
251+
// because the default client(s) will infringe on the domains of the
252+
// non-default one(s).
253+
//
254+
// Certain maintenance services that aren't queue-related like the reindexer
255+
// will continue to run on all leaders regardless of domain. If using this
256+
// feature, it's a good idea to configure ReindexerTimeout on all but a
257+
// single leader domain to river.NeverSchedule().
258+
//
259+
// In general, most River users should not need LeaderDomain, and when
260+
// running multiple Rivers may want to consider using multiple databases and
261+
// multiple schemas instead.
262+
//
263+
// Defaults to "default".
264+
LeaderDomain string
265+
223266
// Logger is the structured logger to use for logging purposes. If none is
224267
// specified, logs will be emitted to STDOUT with messages at warn level
225268
// or higher.
@@ -445,6 +488,7 @@ func (c *Config) WithDefaults() *Config {
445488
Hooks: c.Hooks,
446489
JobInsertMiddleware: c.JobInsertMiddleware,
447490
JobTimeout: cmp.Or(c.JobTimeout, JobTimeoutDefault),
491+
LeaderDomain: c.LeaderDomain,
448492
Logger: logger,
449493
MaxAttempts: cmp.Or(c.MaxAttempts, MaxAttemptsDefault),
450494
Middleware: c.Middleware,
@@ -873,6 +917,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
873917

874918
client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{
875919
ClientID: config.ID,
920+
Domain: config.LeaderDomain,
876921
Schema: config.Schema,
877922
})
878923
client.services = append(client.services, client.elector)
@@ -890,6 +935,12 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
890935
client.services = append(client.services, pluginPilot.PluginServices()...)
891936
}
892937

938+
var queuesIncluded []string
939+
if config.LeaderDomain != "" && config.LeaderDomain != leadership.DomainDefault && len(config.Queues) > 0 {
940+
queuesIncluded = maputil.Keys(config.Queues)
941+
slices.Sort(queuesIncluded)
942+
}
943+
893944
//
894945
// Maintenance services
895946
//
@@ -902,6 +953,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
902953
CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod,
903954
DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod,
904955
QueuesExcluded: client.pilot.JobCleanerQueuesExcluded(),
956+
QueuesIncluded: queuesIncluded,
905957
Schema: config.Schema,
906958
Timeout: config.JobCleanerTimeout,
907959
}, driver.GetExecutor())
@@ -912,6 +964,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
912964
{
913965
jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{
914966
ClientRetryPolicy: config.RetryPolicy,
967+
QueuesIncluded: queuesIncluded,
915968
RescueAfter: config.RescueStuckJobsAfter,
916969
Schema: config.Schema,
917970
WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory {
@@ -927,9 +980,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
927980

928981
{
929982
jobScheduler := maintenance.NewJobScheduler(archetype, &maintenance.JobSchedulerConfig{
930-
Interval: config.schedulerInterval,
931-
NotifyInsert: client.maybeNotifyInsertForQueues,
932-
Schema: config.Schema,
983+
Interval: config.schedulerInterval,
984+
NotifyInsert: client.maybeNotifyInsertForQueues,
985+
QueuesIncluded: queuesIncluded,
986+
Schema: config.Schema,
933987
}, driver.GetExecutor())
934988
maintenanceServices = append(maintenanceServices, jobScheduler)
935989
client.testSignals.jobScheduler = &jobScheduler.TestSignals
@@ -955,6 +1009,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
9551009

9561010
{
9571011
queueCleaner := maintenance.NewQueueCleaner(archetype, &maintenance.QueueCleanerConfig{
1012+
QueuesIncluded: queuesIncluded,
9581013
RetentionPeriod: maintenance.QueueRetentionPeriodDefault,
9591014
Schema: config.Schema,
9601015
}, driver.GetExecutor())

client_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1491,6 +1491,154 @@ func Test_Client_Common(t *testing.T) {
14911491

14921492
startstoptest.Stress(ctx, t, clientWithStop)
14931493
})
1494+
1495+
t.Run("LeaderDomain_Alternate", func(t *testing.T) {
1496+
t.Parallel()
1497+
1498+
var client1 *Client[pgx.Tx]
1499+
{
1500+
config, bundle := setupConfig(t)
1501+
config.LeaderDomain = "domain1"
1502+
config.ReindexerSchedule = &neverSchedule{}
1503+
config.Queues = map[string]QueueConfig{
1504+
"queue_a": {MaxWorkers: 50},
1505+
"queue_b": {MaxWorkers: 50},
1506+
}
1507+
1508+
var err error
1509+
client1, err = NewClient(bundle.driver, config)
1510+
require.NoError(t, err)
1511+
client1.testSignals.Init(t)
1512+
}
1513+
1514+
var client2 *Client[pgx.Tx]
1515+
{
1516+
config, bundle := setupConfig(t)
1517+
config.LeaderDomain = "domain2"
1518+
config.Queues = map[string]QueueConfig{
1519+
"queue_c": {MaxWorkers: 50},
1520+
"queue_d": {MaxWorkers: 50},
1521+
}
1522+
config.Schema = client1.config.Schema
1523+
config.ReindexerSchedule = &neverSchedule{}
1524+
1525+
var err error
1526+
client2, err = NewClient(bundle.driver, config)
1527+
require.NoError(t, err)
1528+
client2.testSignals.Init(t)
1529+
}
1530+
1531+
startClient(ctx, t, client1)
1532+
startClient(ctx, t, client2)
1533+
1534+
// Both elected
1535+
client1.testSignals.queueMaintainerLeader.ElectedLeader.WaitOrTimeout()
1536+
client2.testSignals.queueMaintainerLeader.ElectedLeader.WaitOrTimeout()
1537+
})
1538+
1539+
t.Run("LeaderDomain_MaintenanceServiceConfigEmpty", func(t *testing.T) {
1540+
t.Parallel()
1541+
1542+
config, bundle := setupConfig(t)
1543+
config.Queues = map[string]QueueConfig{
1544+
"queue_a": {MaxWorkers: 50},
1545+
"queue_b": {MaxWorkers: 50},
1546+
}
1547+
1548+
client, err := NewClient(bundle.driver, config)
1549+
require.NoError(t, err)
1550+
client.testSignals.Init(t)
1551+
1552+
jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer)
1553+
require.Nil(t, jobCleaner.Config.QueuesIncluded)
1554+
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client.queueMaintainer)
1555+
require.Nil(t, jobRescuer.Config.QueuesIncluded)
1556+
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client.queueMaintainer)
1557+
require.Nil(t, jobScheduler.Config.QueuesIncluded)
1558+
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client.queueMaintainer)
1559+
require.Nil(t, queueCleaner.Config.QueuesIncluded)
1560+
})
1561+
1562+
// The domain "default" is special in that it behaves like if LeaderDomain
1563+
// was not set.
1564+
t.Run("LeaderDomain_MaintenanceServiceConfigDefault", func(t *testing.T) {
1565+
t.Parallel()
1566+
1567+
config, bundle := setupConfig(t)
1568+
config.LeaderDomain = "default"
1569+
config.Queues = map[string]QueueConfig{
1570+
"queue_a": {MaxWorkers: 50},
1571+
"queue_b": {MaxWorkers: 50},
1572+
}
1573+
1574+
client, err := NewClient(bundle.driver, config)
1575+
require.NoError(t, err)
1576+
client.testSignals.Init(t)
1577+
1578+
jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client.queueMaintainer)
1579+
require.Nil(t, jobCleaner.Config.QueuesIncluded)
1580+
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client.queueMaintainer)
1581+
require.Nil(t, jobRescuer.Config.QueuesIncluded)
1582+
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client.queueMaintainer)
1583+
require.Nil(t, jobScheduler.Config.QueuesIncluded)
1584+
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client.queueMaintainer)
1585+
require.Nil(t, queueCleaner.Config.QueuesIncluded)
1586+
})
1587+
1588+
// When non-default leader domains are configured, each client's maintenance
1589+
// services are limited to only their client's queues.
1590+
t.Run("LeaderDomain_MaintenanceServiceConfigAlternate", func(t *testing.T) {
1591+
t.Parallel()
1592+
1593+
var client1 *Client[pgx.Tx]
1594+
{
1595+
config, bundle := setupConfig(t)
1596+
config.LeaderDomain = "domain1"
1597+
config.ReindexerSchedule = &neverSchedule{}
1598+
config.Queues = map[string]QueueConfig{
1599+
"queue_a": {MaxWorkers: 50},
1600+
"queue_b": {MaxWorkers: 50},
1601+
}
1602+
1603+
var err error
1604+
client1, err = NewClient(bundle.driver, config)
1605+
require.NoError(t, err)
1606+
client1.testSignals.Init(t)
1607+
1608+
jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client1.queueMaintainer)
1609+
require.Equal(t, []string{"queue_a", "queue_b"}, jobCleaner.Config.QueuesIncluded)
1610+
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client1.queueMaintainer)
1611+
require.Equal(t, []string{"queue_a", "queue_b"}, jobRescuer.Config.QueuesIncluded)
1612+
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client1.queueMaintainer)
1613+
require.Equal(t, []string{"queue_a", "queue_b"}, jobScheduler.Config.QueuesIncluded)
1614+
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client1.queueMaintainer)
1615+
require.Equal(t, []string{"queue_a", "queue_b"}, queueCleaner.Config.QueuesIncluded)
1616+
}
1617+
1618+
{
1619+
config, bundle := setupConfig(t)
1620+
config.LeaderDomain = "domain2"
1621+
config.Queues = map[string]QueueConfig{
1622+
"queue_c": {MaxWorkers: 50},
1623+
"queue_d": {MaxWorkers: 50},
1624+
}
1625+
config.Schema = client1.config.Schema
1626+
config.ReindexerSchedule = &neverSchedule{}
1627+
1628+
client2, err := NewClient(bundle.driver, config)
1629+
require.NoError(t, err)
1630+
client2.testSignals.Init(t)
1631+
1632+
jobCleaner := maintenance.GetService[*maintenance.JobCleaner](client2.queueMaintainer)
1633+
require.Equal(t, []string{"queue_c", "queue_d"}, jobCleaner.Config.QueuesIncluded)
1634+
jobRescuer := maintenance.GetService[*maintenance.JobRescuer](client2.queueMaintainer)
1635+
require.Equal(t, []string{"queue_c", "queue_d"}, jobRescuer.Config.QueuesIncluded)
1636+
jobScheduler := maintenance.GetService[*maintenance.JobScheduler](client2.queueMaintainer)
1637+
require.Equal(t, []string{"queue_c", "queue_d"}, jobScheduler.Config.QueuesIncluded)
1638+
queueCleaner := maintenance.GetService[*maintenance.QueueCleaner](client2.queueMaintainer)
1639+
require.Equal(t, []string{"queue_c", "queue_d"}, queueCleaner.Config.QueuesIncluded)
1640+
}
1641+
})
14941642
}
14951643

14961644
type workerWithMiddleware[T JobArgs] struct {

0 commit comments

Comments
 (0)