Skip to content

Commit 0fe5989

Browse files
refactor(jobs): route expiry-stage + reaper status logic through common/resourcestatus
Eliminates the api↔worker expiry-stage predicate divergence the BugBash flagged. The worker's hand-written 12h/6h/1h schedule and the reaper's reapable-status set now come from instant.dev/common/resourcestatus. expiry_reminder.go: - reminderStage/reminderSchedule deleted; selectStage delegates to resourcestatus.DeriveExpiryStage (the P2-12 "most-imminent bucket wins" logic now lives in the shared package) - hoursLeft delegates to resourcestatus.HoursUntilExpiry - outermostReminderWindow derived from resourcestatus.ExpiryWindow12h expire.go: - reapable-status SQL filter (active/paused/suspended) derived from resourcestatus.ReapableStatuses() so the SQL IN(...) can never drift from Status.IsReapable; terminal write uses StatusDeleted resourcestatus_conversion_test.go pins the converted selectStage / hoursLeft against verbatim copies of the original hand-written algorithms over every time-bucket boundary — proving identical behaviour. Cross-repo contract change (CLAUDE.md rule 22); common + api pushed first. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4e0e837 commit 0fe5989

3 files changed

Lines changed: 219 additions & 66 deletions

File tree

internal/jobs/expire.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,37 @@ import (
55
"database/sql"
66
"fmt"
77
"log/slog"
8+
"strings"
89
"time"
910

1011
madmin "github.com/minio/madmin-go/v3"
1112
minio "github.com/minio/minio-go/v7"
1213
"github.com/riverqueue/river"
1314
"go.opentelemetry.io/otel"
15+
"instant.dev/common/resourcestatus"
1416
commonv1 "instant.dev/proto/common/v1"
1517
"instant.dev/worker/internal/metrics"
1618
"instant.dev/worker/internal/provisioner"
1719
)
1820

21+
// reapableStatusSQLList is the canonical SQL `IN (...)` body listing the
22+
// statuses a TTL-expiry sweep may act on (active / paused / suspended),
23+
// derived from resourcestatus.ReapableStatuses() so the reaper's SQL
24+
// filter can never drift from the shared Go predicate Status.IsReapable.
25+
// TTL must win over lifecycle state: a paused/suspended resource past its
26+
// 24h TTL is still expired and must be deprovisioned.
27+
//
28+
// Built once at init from a fixed enum of literals (no caller input), so
29+
// there is no SQL-injection surface — the values are 'active', 'paused',
30+
// 'suspended'.
31+
var reapableStatusSQLList = func() string {
32+
quoted := make([]string, 0, 3)
33+
for _, s := range resourcestatus.ReapableStatuses() {
34+
quoted = append(quoted, "'"+s+"'")
35+
}
36+
return strings.Join(quoted, ", ")
37+
}()
38+
1939
// ExpireAnonymousArgs holds the arguments for the ExpireAnonymousJob.
2040
// No fields are needed — it's a periodic maintenance job.
2141
type ExpireAnonymousArgs struct{}
@@ -106,7 +126,7 @@ func (w *ExpireAnonymousWorker) Work(ctx context.Context, job *river.Job[ExpireA
106126
SELECT id::text, token::text, resource_type, COALESCE(provider_resource_id, '')
107127
FROM resources
108128
WHERE ((team_id IS NULL AND tier = 'anonymous') OR tier = 'free')
109-
AND status IN ('active', 'paused', 'suspended')
129+
AND status IN (`+reapableStatusSQLList+`)
110130
AND expires_at IS NOT NULL
111131
AND expires_at < now()
112132
`)
@@ -195,8 +215,8 @@ func (w *ExpireAnonymousWorker) Work(ctx context.Context, job *river.Job[ExpireA
195215
// the SELECT above. Gating on status='active' alone would leave the
196216
// paused/suspended rows we just deprovisioned stuck in their old status.
197217
if _, err := w.db.ExecContext(ctx,
198-
`UPDATE resources SET status = 'deleted'
199-
WHERE id = $1 AND status IN ('active', 'paused', 'suspended')`,
218+
`UPDATE resources SET status = '`+resourcestatus.StatusDeleted.String()+`'
219+
WHERE id = $1 AND status IN (`+reapableStatusSQLList+`)`,
200220
r.id,
201221
); err != nil {
202222
slog.Error("jobs.expire_anonymous.mark_deleted_failed",
@@ -214,7 +234,7 @@ func (w *ExpireAnonymousWorker) Work(ctx context.Context, job *river.Job[ExpireA
214234
var activeAnon int
215235
if scanErr := w.db.QueryRowContext(ctx, `
216236
SELECT COUNT(*) FROM resources
217-
WHERE team_id IS NULL AND status = 'active' AND expires_at IS NOT NULL
237+
WHERE team_id IS NULL AND status = '`+resourcestatus.StatusActive.String()+`' AND expires_at IS NOT NULL
218238
`).Scan(&activeAnon); scanErr == nil {
219239
metrics.ActiveAnonymousResources.Set(float64(activeAnon))
220240
}

internal/jobs/expiry_reminder.go

Lines changed: 50 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import (
6060
"github.com/google/uuid"
6161
"github.com/riverqueue/river"
6262
"go.opentelemetry.io/otel"
63+
"instant.dev/common/resourcestatus"
6364
)
6465

6566
// ExpiryReminderArgs is the River job payload — no fields, runs as a sweep.
@@ -69,20 +70,25 @@ type ExpiryReminderArgs struct{}
6970
func (ExpiryReminderArgs) Kind() string { return "expiry_reminder" }
7071

7172
// reminderStage describes one bucket in the 12h/6h/1h schedule.
73+
//
74+
// As of the resourcestatus unification it is a thin wrapper over
75+
// instant.dev/common/resourcestatus.ExpiryStage — the 12h/6h/1h
76+
// thresholds, the index→reminder_index mapping, and the label→stage_label
77+
// mapping all live in the shared package, so api and worker can never
78+
// disagree on which window a resource falls in. The struct is retained
79+
// only so the existing `stage.index` / `stage.label` / `stage.stage`
80+
// call sites in Work() and emitAnonExpiryWarningAudit() stay unchanged.
7281
type reminderStage struct {
73-
index int // 1-based; matches reminder_index in the email
74-
expiresWithin time.Duration // resource fires this stage when expires_at <= now + expiresWithin
75-
label string // logging label, also flows into the email as stage_label
82+
stage resourcestatus.ExpiryStage
83+
index int // 1-based; matches reminder_index in the email
84+
label string // logging label, also flows into the email as stage_label
7685
}
7786

78-
// reminderSchedule is the canonical stage table. Ordered most-distant
79-
// to most-imminent. selectStage picks the MOST IMMINENT window the
80-
// resource currently falls in (time-bucket-first) — see P2-12 below.
81-
var reminderSchedule = []reminderStage{
82-
{index: 1, expiresWithin: 12 * time.Hour, label: "stage_12h"},
83-
{index: 2, expiresWithin: 6 * time.Hour, label: "stage_6h"},
84-
{index: 3, expiresWithin: 1 * time.Hour, label: "stage_1h"},
85-
}
87+
// outermostReminderWindow is the widest reminder window — anything past
88+
// it is too far from expiry to be a candidate for any stage. Derived
89+
// from the shared package's canonical 12h threshold (no second copy of
90+
// the duration here).
91+
const outermostReminderWindow = resourcestatus.ExpiryWindow12h
8692

8793
// reminderCooldown is the minimum wall-clock gap between two
8894
// dispatches on the same resource. Belt-and-braces — the CAS on
@@ -155,7 +161,7 @@ func (w *ExpiryReminderWorker) Work(ctx context.Context, job *river.Job[ExpiryRe
155161
// The outermost window is the largest stage threshold — anything
156162
// outside this window is too far from expiry to be a candidate
157163
// for any stage. Inner windows are checked per-stage below.
158-
windowEnd := now.Add(reminderSchedule[0].expiresWithin)
164+
windowEnd := now.Add(outermostReminderWindow)
159165
cooldownBefore := now.Add(-reminderCooldown)
160166

161167
// LEFT JOIN users u ... AND u.is_primary = true — the is_primary
@@ -314,72 +320,54 @@ func (w *ExpiryReminderWorker) Work(ctx context.Context, job *river.Job[ExpiryRe
314320

315321
// selectStage picks the stage a row is currently eligible for.
316322
//
317-
// P2-12 (BugBash 2026-05-18): the stage is chosen by TIME BUCKET first,
318-
// then gated on reminders_sent — NOT the other way round. The prior
319-
// implementation required strict monotonic 0→1→2 progression
320-
// (`reminders_sent == mustHaveSent`), so a resource created less than
321-
// 6h before its TTL — already inside the (6h, 1h] or (1h, 0h] window
322-
// from the very first sweep — still fired stage 1 because that was the
323-
// only stage whose mustHaveSent matched reminders_sent=0. The email
324-
// then claimed "12h to go" while only ~50 min actually remained:
325-
// hours_remaining and stage_label disagreed.
323+
// The time-bucket classification is delegated to
324+
// resourcestatus.DeriveExpiryStage — the canonical, shared "most
325+
// imminent window wins" logic (the P2-12 BugBash fix now lives in the
326+
// common package, so api and worker can never disagree on the bucket).
327+
// selectStage layers the worker-specific reminders_sent CAS gate on top:
328+
// a stage already sent (reminders_sent >= stage index) is skipped.
326329
//
327-
// The fix: bucket the resource into the MOST IMMINENT stage window its
328-
// time-to-expiry falls in (schedule is most-distant → most-imminent, so
329-
// the last matching entry wins), then fire it only if reminders_sent is
330-
// still below that stage's index. The CAS in Work() fast-forwards
331-
// reminders_sent straight to stage.index, consuming any skipped earlier
332-
// stages. Result: a short-TTL resource gets exactly one correctly
333-
// labelled reminder ("1h to go"), not a mislabelled "12h" one.
330+
// P2-12 (BugBash 2026-05-18) recap: the stage is chosen by TIME BUCKET
331+
// first, then gated on reminders_sent — NOT the other way round. A
332+
// resource created less than 6h before its TTL is bucketed straight into
333+
// stage_6h / stage_1h; the CAS in Work() fast-forwards reminders_sent to
334+
// that stage's index, consuming the skipped earlier stages so exactly one
335+
// correctly labelled reminder fires.
334336
//
335337
// Examples:
336-
// remaining 8h, reminders_sent 0 → stage 1 ("12h") — bucket (12h,6h]
337-
// remaining 4h, reminders_sent 0 → stage 2 ("6h") — skips stage 1
338-
// remaining 40m, reminders_sent 0 → stage 3 ("1h") — skips 1 and 2
339-
// remaining 4h, reminders_sent 2 → no stage (already past stage 2)
338+
//
339+
// remaining 8h, reminders_sent 0 → stage 1 ("12h") — bucket (12h,6h]
340+
// remaining 4h, reminders_sent 0 → stage 2 ("6h") — skips stage 1
341+
// remaining 40m, reminders_sent 0 → stage 3 ("1h") — skips 1 and 2
342+
// remaining 4h, reminders_sent 2 → no stage (already past stage 2)
340343
func selectStage(r expiryReminderRow, now time.Time) (reminderStage, bool) {
341-
remaining := r.expiresAt.Sub(now)
342-
if remaining <= 0 {
343-
return reminderStage{}, false
344-
}
345-
var bucket reminderStage
346-
found := false
347-
for _, s := range reminderSchedule {
348-
if remaining <= s.expiresWithin {
349-
// schedule is ordered most-distant → most-imminent, so
350-
// later matches overwrite earlier ones — the final match
351-
// is the tightest window the resource currently sits in.
352-
bucket = s
353-
found = true
354-
}
355-
}
356-
if !found {
357-
// Outside even the widest (12h) window — too far from expiry.
344+
es := resourcestatus.DeriveExpiryStage(r.expiresAt, now)
345+
if !es.IsWarning() {
346+
// ExpiryStageNone (too far out) or ExpiryStagePastTTL — no
347+
// reminder stage applies.
358348
return reminderStage{}, false
359349
}
350+
bucket := reminderStage{stage: es, index: es.Index(), label: es.Label()}
360351
if r.remindersSent >= bucket.index {
361352
// This stage (or a later one) was already sent — nothing to do.
362353
return reminderStage{}, false
363354
}
364355
return bucket, true
365356
}
366357

367-
// hoursLeft rounds the gap up to whole hours, with a floor of 1 so
368-
// the email never says "0 hours". The legacy worker had the same
369-
// floor (it just always rendered the floor due to a Brevo template bug).
358+
// hoursLeft rounds the gap up to whole hours, with a floor of 1 so the
359+
// email never says "0 hours". Delegates to the shared
360+
// resourcestatus.HoursUntilExpiry — identical floor-of-1 / round-up
361+
// semantics, now shared so api and worker render the same number.
370362
func hoursLeft(expires, now time.Time) int {
371-
delta := expires.Sub(now)
372-
if delta <= time.Hour {
363+
h := resourcestatus.HoursUntilExpiry(expires, now)
364+
if h < 1 {
365+
// HoursUntilExpiry returns 0 for a past-TTL / zero expiry; the
366+
// reminder path only ever calls this for a future expiry, but
367+
// keep the floor of 1 so the email copy never regresses.
373368
return 1
374369
}
375-
hours := int(delta.Hours())
376-
if delta-time.Duration(hours)*time.Hour > 0 {
377-
hours++
378-
}
379-
if hours < 1 {
380-
hours = 1
381-
}
382-
return hours
370+
return h
383371
}
384372

385373
// emitAnonExpiryWarningAudit writes one anon.expiry_warning audit_log
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package jobs
2+
3+
// resourcestatus_conversion_test.go — proves the worker expiry-stage
4+
// helpers converted to instant.dev/common/resourcestatus behave
5+
// IDENTICALLY to the pre-conversion hand-written logic.
6+
//
7+
// This test lives in `package jobs` (not jobs_test) so it can reach the
8+
// unexported selectStage / hoursLeft helpers directly.
9+
//
10+
// Each test re-implements the ORIGINAL algorithm inline and asserts the
11+
// converted helper agrees for every input. If the shared package ever
12+
// drifts from the worker's original semantics, these tests fail.
13+
14+
import (
15+
"testing"
16+
"time"
17+
18+
"instant.dev/common/resourcestatus"
19+
)
20+
21+
// legacyReminderStage / legacySchedule reproduce the pre-conversion
22+
// hand-written 12h/6h/1h table that lived in expiry_reminder.go.
23+
type legacyReminderStage struct {
24+
index int
25+
expiresWithin time.Duration
26+
}
27+
28+
var legacySchedule = []legacyReminderStage{
29+
{index: 1, expiresWithin: 12 * time.Hour},
30+
{index: 2, expiresWithin: 6 * time.Hour},
31+
{index: 3, expiresWithin: 1 * time.Hour},
32+
}
33+
34+
// legacySelectStage is the ORIGINAL selectStage body, verbatim, used as
35+
// the oracle the converted selectStage must match.
36+
func legacySelectStage(r expiryReminderRow, now time.Time) (int, bool) {
37+
remaining := r.expiresAt.Sub(now)
38+
if remaining <= 0 {
39+
return 0, false
40+
}
41+
var bucketIndex int
42+
found := false
43+
for _, s := range legacySchedule {
44+
if remaining <= s.expiresWithin {
45+
bucketIndex = s.index
46+
found = true
47+
}
48+
}
49+
if !found {
50+
return 0, false
51+
}
52+
if r.remindersSent >= bucketIndex {
53+
return 0, false
54+
}
55+
return bucketIndex, true
56+
}
57+
58+
// legacyHoursLeft is the ORIGINAL hoursLeft body, verbatim.
59+
func legacyHoursLeft(expires, now time.Time) int {
60+
delta := expires.Sub(now)
61+
if delta <= time.Hour {
62+
return 1
63+
}
64+
hours := int(delta.Hours())
65+
if delta-time.Duration(hours)*time.Hour > 0 {
66+
hours++
67+
}
68+
if hours < 1 {
69+
hours = 1
70+
}
71+
return hours
72+
}
73+
74+
// TestSelectStage_EquivalentToLegacy proves the converted selectStage
75+
// (now delegating to resourcestatus.DeriveExpiryStage) returns the same
76+
// (stage index, eligible) result as the original hand-written version,
77+
// across every time-bucket boundary and every reminders_sent value.
78+
func TestSelectStage_EquivalentToLegacy(t *testing.T) {
79+
now := time.Date(2026, 5, 19, 12, 0, 0, 0, time.UTC)
80+
offsets := []time.Duration{
81+
-time.Hour, // past TTL
82+
0, // exactly now
83+
time.Nanosecond, // 1ns out
84+
40 * time.Minute, // inside 1h
85+
time.Hour, // exactly 1h
86+
time.Hour + time.Minute, // just over 1h
87+
4 * time.Hour, // inside 6h
88+
6 * time.Hour, // exactly 6h
89+
8 * time.Hour, // inside 12h
90+
12 * time.Hour, // exactly 12h
91+
13 * time.Hour, // beyond 12h
92+
24 * time.Hour, // far out
93+
}
94+
for _, off := range offsets {
95+
for remindersSent := 0; remindersSent <= 4; remindersSent++ {
96+
r := expiryReminderRow{
97+
expiresAt: now.Add(off),
98+
remindersSent: remindersSent,
99+
}
100+
wantIdx, wantOK := legacySelectStage(r, now)
101+
gotStage, gotOK := selectStage(r, now)
102+
if gotOK != wantOK {
103+
t.Errorf("offset=%v remindersSent=%d: ok=%v, legacy ok=%v",
104+
off, remindersSent, gotOK, wantOK)
105+
continue
106+
}
107+
if gotOK && gotStage.index != wantIdx {
108+
t.Errorf("offset=%v remindersSent=%d: index=%d, legacy index=%d",
109+
off, remindersSent, gotStage.index, wantIdx)
110+
}
111+
// Label and shared-enum agreement.
112+
if gotOK {
113+
es := resourcestatus.DeriveExpiryStage(r.expiresAt, now)
114+
if gotStage.label != es.Label() || gotStage.stage != es {
115+
t.Errorf("offset=%v: stage wrapper out of sync with shared enum", off)
116+
}
117+
}
118+
}
119+
}
120+
}
121+
122+
// TestHoursLeft_EquivalentToLegacy proves the converted hoursLeft (now
123+
// delegating to resourcestatus.HoursUntilExpiry) matches the original
124+
// hand-written rounding for every future-expiry input.
125+
func TestHoursLeft_EquivalentToLegacy(t *testing.T) {
126+
now := time.Date(2026, 5, 19, 12, 0, 0, 0, time.UTC)
127+
offsets := []time.Duration{
128+
time.Nanosecond,
129+
30 * time.Minute,
130+
time.Hour,
131+
time.Hour + time.Minute,
132+
2 * time.Hour,
133+
10 * time.Hour,
134+
10*time.Hour + 30*time.Minute,
135+
23 * time.Hour,
136+
}
137+
for _, off := range offsets {
138+
expires := now.Add(off)
139+
want := legacyHoursLeft(expires, now)
140+
got := hoursLeft(expires, now)
141+
if got != want {
142+
t.Errorf("offset=%v: hoursLeft=%d, legacy=%d", off, got, want)
143+
}
144+
}
145+
}

0 commit comments

Comments
 (0)