Skip to content

Commit c4a23ce

Browse files
authored
add Config.ReindexerIndexNames (#1194)
Some Pro features add high churn tables that would benefit from the reindexer. As of now, however, there's no way to extend it to run on anything but the default index list. Add `Config.ReindexerIndexNames` so callers can provide the exact list to reindex, and export `ReindexerIndexNamesDefault()` so integrations can start from the built-in targets without reaching into internal maintenance code. Thread the configured names into `maintenance.Reindexer` and filter them through `IndexesExist` before starting work. That lets mixed-version or partially migrated installs skip absent indexes instead of trying to rebuild objects that are not there. Preserve the `nil` versus non-`nil` contract in `WithDefaults` by copying the slice without collapsing an explicit empty override back to `nil`, so `[]string{}` still means "reindex nothing". When `IndexesExist` fails during reindex discovery, advance the next scheduled run before resetting the timer. The old code reset against the already-fired `nextRunAt`, which made `time.Until(nextRunAt)` zero or negative and caused immediate retries in a tight error loop. Scheduling from the prior run time preserves the configured cadence after transient discovery failures. The added tests cover the exported default list, exact override propagation, explicit empty overrides, missing-index filtering, and the discovery-error retry path.
1 parent 11d80e0 commit c4a23ce

File tree

6 files changed

+311
-47
lines changed

6 files changed

+311
-47
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Added `Config.ReindexerIndexNames` and `ReindexerIndexNamesDefault()` so the reindexer's target indexes can be customized from the public API. [PR #1194](https://github.com/riverqueue/river/pull/1194).
13+
1014
### Fixed
1115

1216
- Upon a client gaining leadership, its queue maintainer is given more than one opportunity to start. [PR #1184](https://github.com/riverqueue/river/pull/1184).

client.go

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,19 @@ const (
5252
QueueNumWorkersMax = 10_000
5353
)
5454

55-
var postgresSchemaNameRE = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`)
55+
var (
56+
postgresSchemaNameRE = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`)
57+
58+
reindexerIndexNamesDefault = []string{ //nolint:gochecknoglobals
59+
"river_job_args_index",
60+
"river_job_kind",
61+
"river_job_metadata_index",
62+
"river_job_pkey",
63+
"river_job_prioritized_fetching_index",
64+
"river_job_state_and_finalized_at_index",
65+
"river_job_unique_idx",
66+
}
67+
)
5668

5769
// TestConfig contains configuration specific to test environments.
5870
type TestConfig struct {
@@ -272,6 +284,11 @@ type Config struct {
272284
// reindexer will run at midnight UTC every day.
273285
ReindexerSchedule PeriodicSchedule
274286

287+
// ReindexerIndexNames customizes which indexes River periodically reindexes.
288+
// If nil, River uses [ReindexerIndexNamesDefault]. If non-nil, the provided
289+
// slice is used as the exact list.
290+
ReindexerIndexNames []string
291+
275292
// ReindexerTimeout is the amount of time to wait for the reindexer to run a
276293
// single reindex operation before cancelling it via context. Set to -1 to
277294
// disable the timeout.
@@ -374,12 +391,26 @@ type Config struct {
374391
schedulerInterval time.Duration
375392
}
376393

394+
// ReindexerIndexNamesDefault returns the default set of indexes reindexed by River.
395+
func ReindexerIndexNamesDefault() []string {
396+
indexNames := make([]string, len(reindexerIndexNamesDefault))
397+
copy(indexNames, reindexerIndexNamesDefault)
398+
399+
return indexNames
400+
}
401+
377402
// WithDefaults returns a copy of the Config with all default values applied.
378403
func (c *Config) WithDefaults() *Config {
379404
if c == nil {
380405
c = &Config{}
381406
}
382407

408+
reindexerIndexNames := ReindexerIndexNamesDefault()
409+
if c.ReindexerIndexNames != nil {
410+
reindexerIndexNames = make([]string, len(c.ReindexerIndexNames))
411+
copy(reindexerIndexNames, c.ReindexerIndexNames)
412+
}
413+
383414
// Use the existing logger if set, otherwise create a default one.
384415
logger := c.Logger
385416
if logger == nil {
@@ -420,6 +451,7 @@ func (c *Config) WithDefaults() *Config {
420451
PeriodicJobs: c.PeriodicJobs,
421452
PollOnly: c.PollOnly,
422453
Queues: c.Queues,
454+
ReindexerIndexNames: reindexerIndexNames,
423455
ReindexerSchedule: c.ReindexerSchedule,
424456
ReindexerTimeout: cmp.Or(c.ReindexerTimeout, maintenance.ReindexerTimeoutDefault),
425457
RescueStuckJobsAfter: cmp.Or(c.RescueStuckJobsAfter, rescueAfter),
@@ -603,14 +635,14 @@ type Client[TTx any] struct {
603635
notifier *notifier.Notifier // may be nil in poll-only mode
604636
periodicJobs *PeriodicJobBundle
605637
pilot riverpilot.Pilot
606-
producersByQueueName map[string]*producer
607-
queueMaintainer *maintenance.QueueMaintainer
608-
queueMaintainerLeader *maintenance.QueueMaintainerLeader
609-
queues *QueueBundle
610-
services []startstop.Service
611-
stopped <-chan struct{}
612-
subscriptionManager *subscriptionManager
613-
testSignals clientTestSignals
638+
producersByQueueName map[string]*producer
639+
queueMaintainer *maintenance.QueueMaintainer
640+
queueMaintainerLeader *maintenance.QueueMaintainerLeader
641+
queues *QueueBundle
642+
services []startstop.Service
643+
stopped <-chan struct{}
644+
subscriptionManager *subscriptionManager
645+
testSignals clientTestSignals
614646

615647
// workCancel cancels the context used for all work goroutines. Normal Stop
616648
// does not cancel that context.
@@ -936,6 +968,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
936968
}
937969

938970
reindexer := maintenance.NewReindexer(archetype, &maintenance.ReindexerConfig{
971+
IndexNames: config.ReindexerIndexNames,
939972
ScheduleFunc: scheduleFunc,
940973
Schema: config.Schema,
941974
Timeout: config.ReindexerTimeout,

client_test.go

Lines changed: 89 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7185,6 +7185,39 @@ func Test_Client_Start_Error(t *testing.T) {
71857185
})
71867186
}
71877187

7188+
func Test_Config_WithDefaults(t *testing.T) {
7189+
t.Parallel()
7190+
7191+
t.Run("ReindexerIndexNamesEmptyStaysNonNil", func(t *testing.T) {
7192+
t.Parallel()
7193+
7194+
config := (&Config{ReindexerIndexNames: []string{}}).WithDefaults()
7195+
7196+
require.NotNil(t, config.ReindexerIndexNames)
7197+
require.Empty(t, config.ReindexerIndexNames)
7198+
})
7199+
7200+
t.Run("ReindexerIndexNamesNilGetsDefaults", func(t *testing.T) {
7201+
t.Parallel()
7202+
7203+
config := (&Config{}).WithDefaults()
7204+
7205+
require.Equal(t, ReindexerIndexNamesDefault(), config.ReindexerIndexNames)
7206+
})
7207+
7208+
t.Run("ReindexerIndexNamesSliceIsCopied", func(t *testing.T) {
7209+
t.Parallel()
7210+
7211+
input := []string{"custom_index", "other_index"}
7212+
config := (&Config{ReindexerIndexNames: input}).WithDefaults()
7213+
7214+
require.Equal(t, input, config.ReindexerIndexNames)
7215+
7216+
input[0] = "mutated"
7217+
require.Equal(t, []string{"custom_index", "other_index"}, config.ReindexerIndexNames)
7218+
})
7219+
}
7220+
71887221
func Test_NewClient_BaseServiceName(t *testing.T) {
71897222
t.Parallel()
71907223

@@ -7279,13 +7312,8 @@ func Test_NewClient_Defaults(t *testing.T) {
72797312
require.False(t, enqueuer.StaggerStartupIsDisabled())
72807313

72817314
reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
7282-
require.Contains(t, reindexer.Config.IndexNames, "river_job_args_index")
7283-
require.Contains(t, reindexer.Config.IndexNames, "river_job_kind")
7284-
require.Contains(t, reindexer.Config.IndexNames, "river_job_metadata_index")
7285-
require.Contains(t, reindexer.Config.IndexNames, "river_job_pkey")
7286-
require.Contains(t, reindexer.Config.IndexNames, "river_job_prioritized_fetching_index")
7287-
require.Contains(t, reindexer.Config.IndexNames, "river_job_state_and_finalized_at_index")
7288-
require.Contains(t, reindexer.Config.IndexNames, "river_job_unique_idx")
7315+
require.Equal(t, ReindexerIndexNamesDefault(), client.config.ReindexerIndexNames)
7316+
require.Equal(t, ReindexerIndexNamesDefault(), reindexer.Config.IndexNames)
72897317
now := time.Now().UTC()
72907318
nextMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, 1)
72917319
require.Equal(t, nextMidnight, reindexer.Config.ScheduleFunc(now))
@@ -7349,6 +7377,7 @@ func Test_NewClient_Overrides(t *testing.T) {
73497377
Logger: logger,
73507378
MaxAttempts: 5,
73517379
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
7380+
ReindexerIndexNames: []string{"custom_index", "other_index"},
73527381
ReindexerSchedule: &periodicIntervalSchedule{interval: time.Hour},
73537382
ReindexerTimeout: 125 * time.Millisecond,
73547383
RetryPolicy: retryPolicy,
@@ -7373,6 +7402,8 @@ func Test_NewClient_Overrides(t *testing.T) {
73737402
require.True(t, enqueuer.StaggerStartupIsDisabled())
73747403

73757404
reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
7405+
// Assert the exact list so index list changes require explicit test updates.
7406+
require.Equal(t, []string{"custom_index", "other_index"}, reindexer.Config.IndexNames)
73767407
now := time.Now().UTC()
73777408
require.Equal(t, now.Add(time.Hour), reindexer.Config.ScheduleFunc(now))
73787409

@@ -7391,6 +7422,37 @@ func Test_NewClient_Overrides(t *testing.T) {
73917422
require.Len(t, client.config.WorkerMiddleware, 1)
73927423
}
73937424

7425+
func Test_NewClient_ReindexerIndexNamesExplicitEmptyOverride(t *testing.T) {
7426+
t.Parallel()
7427+
7428+
ctx := context.Background()
7429+
7430+
var (
7431+
dbPool = riversharedtest.DBPool(ctx, t)
7432+
driver = riverpgxv5.New(dbPool)
7433+
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
7434+
)
7435+
7436+
workers := NewWorkers()
7437+
AddWorker(workers, &noOpWorker{})
7438+
7439+
client, err := NewClient(driver, &Config{
7440+
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
7441+
ReindexerIndexNames: []string{},
7442+
Schema: schema,
7443+
TestOnly: true,
7444+
Workers: workers,
7445+
})
7446+
require.NoError(t, err)
7447+
7448+
require.NotNil(t, client.config.ReindexerIndexNames)
7449+
require.Empty(t, client.config.ReindexerIndexNames)
7450+
7451+
reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
7452+
require.NotNil(t, reindexer.Config.IndexNames)
7453+
require.Empty(t, reindexer.Config.IndexNames)
7454+
}
7455+
73947456
func Test_NewClient_MissingParameters(t *testing.T) {
73957457
t.Parallel()
73967458

@@ -7817,6 +7879,26 @@ func Test_NewClient_Validations(t *testing.T) {
78177879
}
78187880
}
78197881

7882+
func TestReindexerIndexNamesDefault(t *testing.T) {
7883+
t.Parallel()
7884+
7885+
indexNames := ReindexerIndexNamesDefault()
7886+
7887+
// Assert the exact list so index list changes require explicit test updates.
7888+
require.Equal(t, []string{
7889+
"river_job_args_index",
7890+
"river_job_kind",
7891+
"river_job_metadata_index",
7892+
"river_job_pkey",
7893+
"river_job_prioritized_fetching_index",
7894+
"river_job_state_and_finalized_at_index",
7895+
"river_job_unique_idx",
7896+
}, indexNames)
7897+
7898+
indexNames[0] = "mutated"
7899+
require.Equal(t, "river_job_args_index", ReindexerIndexNamesDefault()[0])
7900+
}
7901+
78207902
type timeoutTestArgs struct {
78217903
TimeoutValue time.Duration `json:"timeout_value"`
78227904
}

insert_opts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals
240240
// what to do about the job that can't be scheduled. We can't send feedback to
241241
// the caller at this point, so probably the best we could do is leave it in
242242
// this untransitionable state until the `running` job finished, which isn't
243-
// particularly satsifactory.
243+
// particularly satisfactory.
244244
var requiredV3states = []rivertype.JobState{ //nolint:gochecknoglobals
245245
rivertype.JobStateAvailable,
246246
rivertype.JobStatePending,

internal/maintenance/reindexer.go

Lines changed: 57 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,6 @@ const (
2929
ReindexerTimeoutDefault = 1 * time.Minute
3030
)
3131

32-
var defaultIndexNames = []string{ //nolint:gochecknoglobals
33-
"river_job_args_index",
34-
"river_job_kind",
35-
"river_job_metadata_index",
36-
"river_job_pkey",
37-
"river_job_prioritized_fetching_index",
38-
"river_job_state_and_finalized_at_index",
39-
"river_job_unique_idx",
40-
}
41-
4232
// ReindexerTestSignals are internal signals used exclusively in tests.
4333
type ReindexerTestSignals struct {
4434
Reindexed testsignal.TestSignal[struct{}] // notifies when a run finishes executing reindexes for all indexes
@@ -49,7 +39,8 @@ func (ts *ReindexerTestSignals) Init(tb testutil.TestingTB) {
4939
}
5040

5141
type ReindexerConfig struct {
52-
// IndexNames is a list of indexes to reindex on each run.
42+
// IndexNames is the exact list of indexes to reindex on each run. It must
43+
// be non-nil. An empty slice disables reindex work.
5344
IndexNames []string
5445

5546
// ScheduleFunc returns the next scheduled run time for the reindexer given the
@@ -66,6 +57,9 @@ type ReindexerConfig struct {
6657
}
6758

6859
func (c *ReindexerConfig) mustValidate() *ReindexerConfig {
60+
if c.IndexNames == nil {
61+
panic("ReindexerConfig.IndexNames must be set")
62+
}
6963
if c.ScheduleFunc == nil {
7064
panic("ReindexerConfig.ScheduleFunc must be set")
7165
}
@@ -91,11 +85,13 @@ type Reindexer struct {
9185
}
9286

9387
func NewReindexer(archetype *baseservice.Archetype, config *ReindexerConfig, exec riverdriver.Executor) *Reindexer {
94-
indexNames := defaultIndexNames
95-
if config.IndexNames != nil {
96-
indexNames = config.IndexNames
88+
if config.IndexNames == nil {
89+
panic("ReindexerConfig.IndexNames must be set")
9790
}
9891

92+
indexNames := make([]string, len(config.IndexNames))
93+
copy(indexNames, config.IndexNames)
94+
9995
scheduleFunc := config.ScheduleFunc
10096
if scheduleFunc == nil {
10197
scheduleFunc = (&DefaultReindexerSchedule{}).Next
@@ -133,11 +129,27 @@ func (s *Reindexer) Start(ctx context.Context) error {
133129
s.Logger.DebugContext(ctx, s.Name+": Scheduling first run", slog.Time("next_run_at", nextRunAt))
134130

135131
timerUntilNextRun := time.NewTimer(time.Until(nextRunAt))
132+
scheduleNextRun := func() {
133+
// Advance from the previous scheduled time, not "now", so retries
134+
// stay aligned with the configured cadence and don't immediately
135+
// refire after a timer that has already elapsed.
136+
nextRunAt = s.Config.ScheduleFunc(nextRunAt)
137+
timerUntilNextRun.Reset(time.Until(nextRunAt))
138+
}
136139

137140
for {
138141
select {
139142
case <-timerUntilNextRun.C:
140-
for _, indexName := range s.Config.IndexNames {
143+
reindexableIndexNames, err := s.reindexableIndexNames(ctx)
144+
if err != nil {
145+
if !errors.Is(err, context.Canceled) {
146+
s.Logger.ErrorContext(ctx, s.Name+": Error listing reindexable indexes", slog.String("error", err.Error()))
147+
}
148+
scheduleNextRun()
149+
continue
150+
}
151+
152+
for _, indexName := range reindexableIndexNames {
141153
if _, err := s.reindexOne(ctx, indexName); err != nil {
142154
if !errors.Is(err, context.Canceled) {
143155
s.Logger.ErrorContext(ctx, s.Name+": Error reindexing", slog.String("error", err.Error()), slog.String("index_name", indexName))
@@ -151,15 +163,11 @@ func (s *Reindexer) Start(ctx context.Context) error {
151163
// On each run, we calculate the new schedule based on the
152164
// previous run's start time. This ensures that we don't
153165
// accidentally skip a run as time elapses during the run.
154-
nextRunAt = s.Config.ScheduleFunc(nextRunAt)
166+
scheduleNextRun()
155167

156168
// TODO: maybe we should log differently if some of these fail?
157169
s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully,
158-
slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(s.Config.IndexNames)))
159-
160-
// Reset the timer after the insert loop has finished so it's
161-
// paused during work. Makes its firing more deterministic.
162-
timerUntilNextRun.Reset(time.Until(nextRunAt))
170+
slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(reindexableIndexNames)))
163171

164172
case <-ctx.Done():
165173
// Clean up timer resources. We know it has _not_ received from
@@ -176,6 +184,34 @@ func (s *Reindexer) Start(ctx context.Context) error {
176184
return nil
177185
}
178186

187+
func (s *Reindexer) reindexableIndexNames(ctx context.Context) ([]string, error) {
188+
indexesExist, err := s.exec.IndexesExist(ctx, &riverdriver.IndexesExistParams{
189+
IndexNames: s.Config.IndexNames,
190+
Schema: s.Config.Schema,
191+
})
192+
if err != nil {
193+
return nil, err
194+
}
195+
196+
indexNames := make([]string, 0, len(s.Config.IndexNames))
197+
missingIndexNames := make([]string, 0)
198+
for _, indexName := range s.Config.IndexNames {
199+
if indexesExist[indexName] {
200+
indexNames = append(indexNames, indexName)
201+
continue
202+
}
203+
204+
missingIndexNames = append(missingIndexNames, indexName)
205+
}
206+
207+
if len(missingIndexNames) > 0 {
208+
s.Logger.WarnContext(ctx, s.Name+": Configured reindex indexes do not exist; run migrations or update ReindexerIndexNames",
209+
slog.Any("index_names", missingIndexNames))
210+
}
211+
212+
return indexNames, nil
213+
}
214+
179215
func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, error) {
180216
var cancel func()
181217
if s.Config.Timeout > -1 {

0 commit comments

Comments
 (0)