Skip to content

Commit 4d82efb

Browse files
fix(worker): R1 — drive customer-backup cadence off plans.yaml rpo_minutes (#103)
The customer-Postgres backup scheduler chose hourly-vs-daily cadence from a hardcoded `switch canonicalTier()` plus a hardcoded SQL tier allow-list. The hourly tiers (pro/growth/team) happened to match their advertised `rpo_minutes:60`, but the coupling was implicit: changing rpo_minutes in plans.yaml would NOT have moved the cadence, so the product could silently over-promise RPO (effective ~24h vs sold 60m). Make the cadence RPO-driven from plans.yaml (the same field surfaced on /api/v1/capabilities), so the cadence that MAKES the RPO true is read from the field that PROMISES it — they can no longer drift: rpo_minutes in [1,60] → hourly (pro/growth/team) rpo_minutes > 60 → once-daily team slot (hobby/hobby_plus = 1440) rpo_minutes == 0 OR backup_retention_days == 0 → never (anonymous/free) - BackupPlanRegistry grows RPOMinutes(tier); the common-plans adapter and the test fake implement it. Registry.RPOMinutes already existed in common/plans. - Scheduler takes the registry (wired from the same `backupPlans` the runner uses); nil registry falls back to the legacy hardcoded mapping so a misconfigured boot never silently stops paid backups. - Candidate SELECT now excludes only the never-backup tiers (anonymous/free) instead of an explicit paid-tier allow-list, so a NEW paid tier in plans.yaml is covered automatically — removing the single-site-list failure mode (root rule 18) that once dropped hobby_plus + every _yearly variant. - Per-row retention==0 guard is a second, independent veto (defence-in-depth) so a stray rpo_minutes on a zero-retention tier still never enqueues. - Idempotency unchanged: the atomic INSERT … WHERE NOT EXISTS (50-min DB lookback) still prevents a duplicate enqueue within the hour even across River RunOnStart double-ticks (dedupe lives in the DB, not River UniqueOpts). Tests prove: a pro resource becomes due every hour (all 24 hours), a free resource is never enqueued (SQL-filtered AND registry-gated if it leaks), no duplicate enqueue within an hour, the [1,60]→hourly / >60→daily boundary, the zero-retention veto, the nil-registry legacy fallback, and the adapter's RPOMinutes delegation against the real embedded plans.yaml. jobs package at 96.0% coverage; all new/changed functions at 100%. Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent a0f4014 commit 4d82efb

7 files changed

Lines changed: 574 additions & 109 deletions

internal/jobs/backup_extra_test.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,22 @@ func TestCommonPlanRegistryAdapter_Delegates(t *testing.T) {
121121
if d := adapter.BackupRetentionDays("pro"); d <= 0 {
122122
t.Errorf("BackupRetentionDays(pro) = %d; want > 0", d)
123123
}
124+
// RPOMinutes delegation — the scheduler cadence gate reads this. Pin the
125+
// contract against the real embedded plans.yaml: pro/growth/team promise
126+
// a 60-minute RPO (→ hourly cadence) while anonymous promises 0 (→ never
127+
// backed up). A plans.yaml edit that breaks either trips here.
128+
for _, tier := range []string{"pro", "growth", "team"} {
129+
if m := adapter.RPOMinutes(tier); m != 60 {
130+
t.Errorf("RPOMinutes(%q) = %d; want 60 (hourly-cadence promise)", tier, m)
131+
}
132+
}
133+
if m := adapter.RPOMinutes("anonymous"); m != 0 {
134+
t.Errorf("RPOMinutes(anonymous) = %d; want 0 (never backed up)", m)
135+
}
136+
// hobby promises a coarser daily RPO (1440) → daily cadence.
137+
if m := adapter.RPOMinutes("hobby"); m <= 60 {
138+
t.Errorf("RPOMinutes(hobby) = %d; want > 60 (daily cadence)", m)
139+
}
124140
names := adapter.TierNames()
125141
if len(names) == 0 {
126142
t.Fatal("TierNames returned empty slice")
@@ -719,8 +735,8 @@ func TestRunner_ProcessBackup_BadAESKey(t *testing.T) {
719735
w := &CustomerBackupRunnerWorker{
720736
db: db, store: newFakeBackupStore(), pgDump: &fakePgDump{},
721737
bucket: "b", prefix: "p",
722-
aesKey: "not-hex-not-valid-please-fail",
723-
now: time.Now, timeout: time.Minute, batchN: backupBatchSize,
738+
aesKey: "not-hex-not-valid-please-fail",
739+
now: time.Now, timeout: time.Minute, batchN: backupBatchSize,
724740
}
725741
if err := w.Work(context.Background(), fakeRunnerJob()); err != nil {
726742
t.Fatalf("Work: %v", err)
@@ -922,12 +938,11 @@ func TestCustomerBackupSchedulerArgs_Kind(t *testing.T) {
922938
}
923939
}
924940

925-
// TestScheduler_AnonymousTier_DoesNotInsert — defensive: an anonymous row
926-
// in resource.tier slips the SQL filter (it shouldn't, but the cadence
927-
// switch also gates it). canonicalTier returns "anonymous"; the switch
928-
// has no case for it, so the row proceeds to the dedupe INSERT — which
929-
// is fine because the SQL filter excludes anonymous-tier rows in the
930-
// first place. This test pins that contract via the SELECT shape.
941+
// TestScheduler_AnonymousTier_DoesNotInsert — defensive: anonymous rows are
942+
// excluded by the SQL WHERE clause (`tier NOT IN ('anonymous','free')`), so
943+
// the candidate set is empty and no INSERT fires. Even if one leaked through,
944+
// the registry cadence gate (rpo_minutes:0 → cadenceNever) skips it. This
945+
// test pins the SQL-exclusion contract via the empty SELECT.
931946
func TestScheduler_AnonymousTier_DoesNotInsert(t *testing.T) {
932947
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp))
933948
if err != nil {
@@ -939,7 +954,7 @@ func TestScheduler_AnonymousTier_DoesNotInsert(t *testing.T) {
939954
mock.ExpectQuery(`SELECT r\.id::text`).
940955
WillReturnRows(sqlmock.NewRows([]string{"id", "tier", "team_id"}))
941956

942-
w := NewCustomerBackupSchedulerWorker(db)
957+
w := NewCustomerBackupSchedulerWorker(db, schedulerPlans())
943958
w.now = func() time.Time { return time.Date(2026, 5, 13, 0, 0, 0, 0, time.UTC) }
944959
if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil {
945960
t.Fatalf("Work: %v", err)
@@ -961,7 +976,7 @@ func TestScheduler_HobbyMissingTeamID_Skips(t *testing.T) {
961976
AddRow(resID, "hobby", nil))
962977
// No INSERT expected.
963978

964-
w := NewCustomerBackupSchedulerWorker(db)
979+
w := NewCustomerBackupSchedulerWorker(db, schedulerPlans())
965980
w.now = func() time.Time { return time.Date(2026, 5, 13, 0, 0, 0, 0, time.UTC) }
966981
if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil {
967982
t.Fatalf("Work: %v", err)
@@ -987,7 +1002,7 @@ func TestScheduler_InsertError_LoggedNonFatal(t *testing.T) {
9871002
WithArgs(uuid.MustParse(resID), "pro").
9881003
WillReturnError(errors.New("db hiccup"))
9891004

990-
w := NewCustomerBackupSchedulerWorker(db)
1005+
w := NewCustomerBackupSchedulerWorker(db, schedulerPlans())
9911006
w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) }
9921007
if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil {
9931008
t.Errorf("Work: per-row insert error must be non-fatal: %v", err)
@@ -1010,7 +1025,7 @@ func TestScheduler_BadUUIDInRow_Skipped(t *testing.T) {
10101025
AddRow("not-a-uuid", "pro", teamID))
10111026
// No INSERT expected — bad UUID short-circuits the per-row body.
10121027

1013-
w := NewCustomerBackupSchedulerWorker(db)
1028+
w := NewCustomerBackupSchedulerWorker(db, schedulerPlans())
10141029
w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) }
10151030
if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil {
10161031
t.Fatalf("Work: %v", err)

internal/jobs/backup_s3.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
// We keep this as a tiny interface (Upload / Download / Delete / List) instead
55
// of passing *minio.Client around directly so:
66
//
7-
// 1. The runner / restore-runner / retention sweep tests can use a fake that
8-
// exercises the exact streaming path without dialing a real S3.
9-
// 2. A future cutover from MinIO to a real DO-Spaces SDK (or AWS SDK v2) is a
10-
// one-file change — every consumer of this interface stays the same.
7+
// 1. The runner / restore-runner / retention sweep tests can use a fake that
8+
// exercises the exact streaming path without dialing a real S3.
9+
// 2. A future cutover from MinIO to a real DO-Spaces SDK (or AWS SDK v2) is a
10+
// one-file change — every consumer of this interface stays the same.
1111
//
1212
// All four methods take the bucket as an explicit argument so the same client
1313
// can serve a separate retention sweep on a different bucket later (e.g.
@@ -44,6 +44,16 @@ type BackupPlanRegistry interface {
4444
// tier — e.g. a Pro→Free downgrade — cannot stick around past
4545
// policy).
4646
BackupRetentionDays(tier string) int
47+
// RPOMinutes returns the per-tier Recovery Point Objective in minutes
48+
// from plans.yaml. This is the SOURCE OF TRUTH the backup SCHEDULER
49+
// uses to pick a cadence: a tier promising rpo_minutes<=60 must be
50+
// backed up hourly (else the effective RPO is ~24h and the product
51+
// over-promises), a tier with rpo_minutes>60 gets the once-daily slot,
52+
// and rpo_minutes==0 ("not promised" — anonymous/free) is never
53+
// enqueued. Keeping the cadence derived from this value (rather than a
54+
// hardcoded tier list) means changing rpo_minutes in plans.yaml
55+
// automatically moves the cadence, with no scheduler code change.
56+
RPOMinutes(tier string) int
4757
// TierNames lists the tier names the sweep should iterate. We sweep
4858
// per-tier because the SQL hits a partial index on tier_at_backup;
4959
// iterating an explicit list (rather than scanning DISTINCT) keeps
@@ -80,6 +90,13 @@ func (a *commonPlanRegistryAdapter) BackupRetentionDays(tier string) int {
8090
return a.reg.BackupRetentionDays(tier)
8191
}
8292

93+
// RPOMinutes delegates to the common Registry. Returns 0 for unknown tiers
94+
// (common's Get falls back to "anonymous", whose RPO is 0 / no scheduled
95+
// backups). The scheduler reads this to choose hourly vs daily cadence.
96+
func (a *commonPlanRegistryAdapter) RPOMinutes(tier string) int {
97+
return a.reg.RPOMinutes(tier)
98+
}
99+
83100
// TierNames returns every tier name registered in plans.yaml. Sorted
84101
// is unnecessary — the sweep is order-independent — but stable across
85102
// process lifetime so log lines per tier read predictably.

internal/jobs/coverage_gaps_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -728,7 +728,7 @@ func TestScheduler_Work_ScanError_SkipsRow(t *testing.T) {
728728
WillReturnRows(sqlmock.NewRows([]string{"id", "tier", "team_id"}).
729729
AddRow("fffffff0-1111-2222-3333-444444444444", "pro", "not-a-uuid"))
730730

731-
w := NewCustomerBackupSchedulerWorker(db)
731+
w := NewCustomerBackupSchedulerWorker(db, schedulerPlans())
732732
w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) }
733733
if err := w.Work(context.Background(), fakeSchedulerJob()); err != nil {
734734
t.Fatalf("Work should skip unscannable row, got %v", err)
@@ -746,7 +746,7 @@ func TestScheduler_Work_RowsError_ReturnsError(t *testing.T) {
746746
RowError(0, errors.New("rows boom"))
747747
mock.ExpectQuery(`SELECT r.id::text, r.tier, r.team_id`).WillReturnRows(rows)
748748

749-
w := NewCustomerBackupSchedulerWorker(db)
749+
w := NewCustomerBackupSchedulerWorker(db, schedulerPlans())
750750
w.now = func() time.Time { return time.Date(2026, 5, 13, 14, 0, 0, 0, time.UTC) }
751751
if err := w.Work(context.Background(), fakeSchedulerJob()); err == nil ||
752752
!strings.Contains(err.Error(), "rows error") {

internal/jobs/customer_backup_runner_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,7 @@ func TestBackupObjectKey(t *testing.T) {
450450
// flips a tier, the test fails on the assertion, not on a moved goalpost.
451451
type fakeBackupPlanRegistry struct {
452452
days map[string]int
453+
rpo map[string]int // tier→rpo_minutes for the scheduler cadence gate
453454
tiers []string
454455
}
455456

@@ -460,6 +461,24 @@ func (f *fakeBackupPlanRegistry) BackupRetentionDays(tier string) int {
460461
return 0
461462
}
462463

464+
// RPOMinutes returns the per-tier RPO the scheduler uses to pick cadence.
465+
// When the test didn't declare an rpo map (runner tests don't care), fall
466+
// back to deriving a sane value from retention days so those fakes keep
467+
// satisfying the interface: any tier that takes backups (days>0) reports a
468+
// 60-minute RPO (hourly), tiers with 0 retention report 0 (never).
469+
func (f *fakeBackupPlanRegistry) RPOMinutes(tier string) int {
470+
if f.rpo != nil {
471+
if m, ok := f.rpo[tier]; ok {
472+
return m
473+
}
474+
return 0
475+
}
476+
if f.BackupRetentionDays(tier) > 0 {
477+
return 60
478+
}
479+
return 0
480+
}
481+
463482
func (f *fakeBackupPlanRegistry) TierNames() []string { return f.tiers }
464483

465484
// TestRetentionDaysFromRegistry — per-tier retention read straight from

0 commit comments

Comments
 (0)