Skip to content

Commit b10df7e

Browse files
fix(reaper+reconciler): MR-P0-1 + MR-P0-2 — stop the customer-namespace leak; wire real prober
BugBash 2026-05-20 P0s (cross-confirmed across 4 tracks — T1/T5/T20/T24). The MR-P0-1 leak put 188 instant-customer-* namespaces, each running a live Postgres/Redis/Mongo pod with no DB record, into the prod cluster. MR-P0-1a (the headline finding): the reaper used to mark a resources row status='deleted' EVEN WHEN provisioner.DeprovisionResource had returned an error. A 'deleted' row is terminal and invisible to every reconciler, so the backend (customer namespace + its live pod) was orphaned forever. Now the reaper only advances the row to 'deleted' when the backend teardown genuinely succeeded; on failure the row stays in its reapable status and the next tick retries. A new instant_expire_deprovision_failed_total counter surfaces a sustained leak rate to NR. Added a deprovisioner-seam interface (ResourceDeprovisioner) so the regression test can inject a failing fake. MR-P0-1b: the orphan_sweep_reconciler's PASS 3 only swept instant-deploy-* namespaces — there was no sweep for instant-customer-*. New PASS 4 lists every instant-customer-<token> namespace and deletes any whose token has no active/paused/suspended resources row. Fail-open posture mirrors PASS 3 (RBAC forbidden / DB blip → one WARN, zero orphans this sweep). This is the durable backstop that stops the leak from recurring; 121 of the 188 leaked namespaces had to be cleaned up by hand. MR-P0-2: workers.go constructed the provisioner_reconciler with nil → the fallback NoopProber whose every Probe returns ProbeReachable. The reconciler would blindly promote stuck 'pending' rows to status='active' WITHOUT checking the backend. Now wired with the same NewRealProber(cfg) that resource_heartbeat already uses two lines below. Regression tests (each fails without the fix, verified): P0-1a → TestExpireAnonymousWorker_P0_1a_DeprovisionFailure_DoesNotMarkDeleted (+ companion: ..._DeprovisionSuccess_StillMarksDeleted) P0-1b → TestOrphanSweep_Pass4_ReclaimsOrphanedCustomerNamespace (+ companion: TestOrphanSweep_Pass4_NoCustomerNamespaces_NoQuery) P0-2 → TestProvisionerReconciler_P0_2_RealProberUnreachable_DoesNotPromote + TestProvisionerReconciler_StartWorkersCallSite_PassesRealProber (call-site text-grep binding so a future nil revert fails CI) + TestProvisionerReconciler_WorkerKeepsRealProber go build / go vet / go test ./... all green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 7169493 commit b10df7e

9 files changed

Lines changed: 691 additions & 34 deletions

internal/jobs/expire.go

Lines changed: 72 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,28 @@ type ExpireAnonymousArgs struct{}
4242

4343
func (ExpireAnonymousArgs) Kind() string { return "expire_anonymous" }
4444

45+
// ResourceDeprovisioner is the narrow seam the reaper uses to tear down a
46+
// resource's physical backend (DROP DATABASE / DROP USER / delete NATS pod).
47+
// Lifted to an interface so a test can inject a fake that fails the
48+
// deprovision and assert the reaper does NOT then mark the row deleted
49+
// (MR-P0-1a, BugBash 2026-05-20). The concrete *provisioner.Client satisfies
50+
// it. nil = deprovision skipped (fail open) — same posture as before.
51+
type ResourceDeprovisioner interface {
52+
DeprovisionResource(ctx context.Context, token, providerResourceID string, resType commonv1.ResourceType) error
53+
}
54+
55+
// compile-time assertion: the real provisioner client satisfies the seam.
56+
var _ ResourceDeprovisioner = (*provisioner.Client)(nil)
57+
4558
// ExpireAnonymousWorker expires anonymous resources that have passed their expires_at time.
4659
// It calls the provisioner to DROP the physical resource (DB/ACL user/Mongo user) before
4760
// marking the row as deleted, so credentials stop working immediately rather than lingering
4861
// until the next provisioner cycle.
4962
type ExpireAnonymousWorker struct {
5063
river.WorkerDefaults[ExpireAnonymousArgs]
5164
db *sql.DB
52-
provisioner *provisioner.Client // nil = deprovision skipped (fail open)
53-
minioClient *madmin.AdminClient // nil = MinIO IAM-user cleanup skipped (legacy self-hosted MinIO backend)
65+
provisioner ResourceDeprovisioner // nil = deprovision skipped (fail open)
66+
minioClient *madmin.AdminClient // nil = MinIO IAM-user cleanup skipped (legacy self-hosted MinIO backend)
5467
// objectDeleter deletes a storage resource's objects under its tenant
5568
// prefix on the S3-compatible OBJECT_STORE_* backend (DO Spaces in prod).
5669
// This is the only path that actually removes a tenant's objects on
@@ -65,15 +78,34 @@ type ExpireAnonymousWorker struct {
6578
}
6679

6780
// NewExpireAnonymousWorker constructs an ExpireAnonymousWorker.
68-
// Pass nil for provClient to skip physical deprovisioning (e.g. in tests or when the
69-
// provisioner is unavailable — the DB row is still marked deleted).
81+
// Pass nil for provClient to skip physical deprovisioning (e.g. in tests or
82+
// when the provisioner is unavailable — the deprovision step is then skipped,
83+
// and per MR-P0-1a the row is left in its reapable status for a later retry,
84+
// NOT marked deleted).
7085
// Pass nil for minioClient to skip MinIO IAM user cleanup.
7186
//
7287
// The storage-object deleter and bucket are wired separately via
7388
// WithObjectDeleter — callers that don't set it leave storage expiry as a
7489
// logged WARN (no silent no-op) rather than dropping the tenant's objects.
7590
func NewExpireAnonymousWorker(db *sql.DB, provClient *provisioner.Client, minioClient *madmin.AdminClient) *ExpireAnonymousWorker {
76-
return &ExpireAnonymousWorker{db: db, provisioner: provClient, minioClient: minioClient}
91+
w := &ExpireAnonymousWorker{db: db, minioClient: minioClient}
92+
// A typed-nil *provisioner.Client stored straight into the interface
93+
// field would make `w.provisioner != nil` true and panic on call. Only
94+
// assign when the pointer is genuinely non-nil so the nil-skip guard in
95+
// Work() behaves.
96+
if provClient != nil {
97+
w.provisioner = provClient
98+
}
99+
return w
100+
}
101+
102+
// WithDeprovisioner overrides the deprovisioner seam — used by tests to
103+
// inject a fake that fails the deprovision call so the MR-P0-1a regression
104+
// (a failed deprovision must NOT mark the row deleted) can be exercised
105+
// without a live provisioner. Returns the worker for chaining.
106+
func (w *ExpireAnonymousWorker) WithDeprovisioner(d ResourceDeprovisioner) *ExpireAnonymousWorker {
107+
w.provisioner = d
108+
return w
77109
}
78110

79111
// WithObjectDeleter wires the S3-compatible object deleter used to remove a
@@ -160,11 +192,26 @@ func (w *ExpireAnonymousWorker) Work(ctx context.Context, job *river.Job[ExpireA
160192
return nil
161193
}
162194

163-
// Step 2: For each candidate, deprovision the physical resource then mark the row deleted.
164-
// Errors on either step are logged but never propagate — fail open so one bad resource
165-
// does not block the expiry of the remaining batch.
195+
// Step 2: For each candidate, deprovision the physical resource then mark
196+
// the row deleted. One bad resource never blocks the expiry of the rest of
197+
// the batch (fail open).
198+
//
199+
// MR-P0-1a (BugBash 2026-05-20, cross-confirmed by T1/T5/T20/T24): the row
200+
// is marked status='deleted' ONLY when the backend teardown genuinely
201+
// succeeded (or there was nothing to tear down). A 'deleted' row is
202+
// terminal and invisible to every reconciler — so marking it deleted while
203+
// the physical Postgres DB / Redis ACL / Mongo user / NATS pod is still
204+
// live orphans that backend forever (it bills real money and consumes
205+
// shared-cluster capacity). On a deprovision failure we instead leave the
206+
// row in its current reapable status: the next reaper tick (or the team-
207+
// deletion executor) retries the teardown.
166208
var expired int
167209
for _, r := range candidates {
210+
// deprovisionFailed gates the mark-deleted UPDATE below. true = a
211+
// backend teardown call returned an error this tick; the row stays
212+
// reapable so a later tick retries it instead of stranding the infra.
213+
deprovisionFailed := false
214+
168215
// Deprovision — credentials become invalid immediately.
169216
switch r.resourceType {
170217
case "storage":
@@ -199,18 +246,35 @@ func (w *ExpireAnonymousWorker) Work(ctx context.Context, job *river.Job[ExpireA
199246
if deprovErr := w.provisioner.DeprovisionResource(
200247
ctx, r.token, r.providerResourceID, resType,
201248
); deprovErr != nil {
249+
// MR-P0-1a: a failed teardown must NOT advance the
250+
// row to 'deleted'. Flag it so the mark-deleted
251+
// UPDATE below is skipped; the row stays reapable
252+
// and the next tick retries the DROP.
253+
deprovisionFailed = true
202254
slog.Warn("jobs.expire_anonymous.deprovision_failed",
203255
"error", deprovErr,
204256
"resource_id", r.id,
205257
"resource_type", r.resourceType,
206258
"token", r.token,
207259
"job_id", job.ID,
260+
"effect", "row left reapable for retry; NOT marked deleted (MR-P0-1a)",
208261
)
209262
}
210263
}
211264
}
212265
}
213266

267+
// MR-P0-1a: skip the mark-deleted UPDATE when the backend teardown
268+
// failed this tick. Marking a row 'deleted' (a terminal, non-reapable
269+
// status) with live backend infra behind it permanently orphans that
270+
// infra — no reconciler ever revisits a 'deleted' row. Leaving the row
271+
// reapable means the next reaper tick re-selects it and retries the
272+
// DROP until it genuinely succeeds.
273+
if deprovisionFailed {
274+
metrics.ExpireDeprovisionFailedTotal.Inc()
275+
continue
276+
}
277+
214278
// Guarded UPDATE: mark deleted only from a non-terminal status, matching
215279
// the SELECT above. Gating on status='active' alone would leave the
216280
// paused/suspended rows we just deprovisioned stuck in their old status.

internal/jobs/expire_test.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package jobs_test
22

33
import (
44
"context"
5+
"database/sql/driver"
6+
"errors"
57
"sync"
68
"testing"
79

@@ -10,9 +12,35 @@ import (
1012
"github.com/riverqueue/river"
1113
"github.com/riverqueue/river/rivertype"
1214

15+
commonv1 "instant.dev/proto/common/v1"
1316
"instant.dev/worker/internal/jobs"
1417
)
1518

19+
// recordingArg is a sqlmock.Argument matcher that records whether it was ever
20+
// evaluated. It matches ANY value — its only job is to observe that the
21+
// statement it gates was actually executed. Used by the MR-P0-1a regression
22+
// test to detect that the reaper attempted the mark-deleted UPDATE.
23+
type recordingArg struct{ hit bool }
24+
25+
func (r *recordingArg) Match(_ driver.Value) bool {
26+
r.hit = true
27+
return true
28+
}
29+
30+
// fakeDeprovisioner is a jobs.ResourceDeprovisioner test double. It records
31+
// every DeprovisionResource call and can be told to fail — used by the
32+
// MR-P0-1a regression test to assert the reaper does NOT mark a row deleted
33+
// when the backend teardown errors.
34+
type fakeDeprovisioner struct {
35+
calls int
36+
failErr error // non-nil → every DeprovisionResource call fails
37+
}
38+
39+
func (f *fakeDeprovisioner) DeprovisionResource(_ context.Context, _, _ string, _ commonv1.ResourceType) error {
40+
f.calls++
41+
return f.failErr
42+
}
43+
1644
// fakeObjectDeleter is a fake S3BackupDeleter used to assert the storage-
1745
// expiry path actually drives an object delete against the object store.
1846
// It records the ListObjects prefixes it was asked for and emits the
@@ -293,3 +321,107 @@ func TestExpireAnonymousWorker_StorageExpiry_NoDeleterWarns(t *testing.T) {
293321
t.Errorf("unmet expectations: %v", err)
294322
}
295323
}
324+
325+
// TestExpireAnonymousWorker_P0_1a_DeprovisionFailure_DoesNotMarkDeleted is the
326+
// MR-P0-1a regression guard (BugBash 2026-05-20, cross-confirmed by
327+
// T1/T5/T20/T24 — the headline namespace/resource leak).
328+
//
329+
// THE BUG: the reaper called provisioner.DeprovisionResource, and on an error
330+
// logged a WARN but FELL THROUGH to the guarded `UPDATE resources SET
331+
// status='deleted'`. A 'deleted' row is terminal and invisible to every
332+
// reconciler — so the backend (the customer's instant-customer-<token> k8s
333+
// namespace and its live Postgres/Redis/Mongo pod) was orphaned forever,
334+
// billing real money. 188 such namespaces leaked in prod.
335+
//
336+
// THE FIX: on a deprovision error the row is LEFT in its reapable status; the
337+
// next reaper tick retries the teardown. The row is marked 'deleted' only
338+
// after a genuinely successful backend teardown.
339+
//
340+
// THE ASSERTION: with a deprovisioner that always fails, the worker must
341+
// issue NO `UPDATE resources SET status='deleted'`. sqlmock has ordered
342+
// expectations — only the SELECT and the trailing active-anon COUNT are
343+
// queued; an unexpected UPDATE makes ExpectationsWereMet fail. If a future
344+
// edit reintroduces the unconditional mark-deleted, this test fails.
345+
func TestExpireAnonymousWorker_P0_1a_DeprovisionFailure_DoesNotMarkDeleted(t *testing.T) {
346+
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp))
347+
if err != nil {
348+
t.Fatalf("sqlmock.New: %v", err)
349+
}
350+
defer db.Close()
351+
352+
// One expired postgres resource — has a real provider_resource_id so the
353+
// reaper attempts a deprovision RPC.
354+
mock.ExpectQuery(`SELECT id::text`).
355+
WillReturnRows(sqlmock.NewRows([]string{"id", "token", "resource_type", "provider_resource_id"}).
356+
AddRow("id-leak", "tok-leak", "postgres", "db_tok_leak"))
357+
// The mark-deleted UPDATE IS queued — but gated by markDeletedSpy, a
358+
// recording argument matcher. With the fix the reaper SKIPS this UPDATE
359+
// (deprovision failed), so the matcher never fires and markDeletedSpy.hit
360+
// stays false. With the bug the reaper FALLS THROUGH to this UPDATE, the
361+
// matcher fires, and hit flips true — failing the test. Queuing it (rather
362+
// than omitting it) is what makes the buggy path observable: an omitted
363+
// expectation just produces a swallowed sqlmock error inside the reaper's
364+
// fail-open `continue`, which ExpectationsWereMet does NOT surface.
365+
markDeletedSpy := &recordingArg{}
366+
mock.ExpectExec(`UPDATE resources SET status = 'deleted'`).
367+
WithArgs(markDeletedSpy).
368+
WillReturnResult(sqlmock.NewResult(1, 1))
369+
// The trailing active-anon count always runs.
370+
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM resources`).
371+
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
372+
373+
deprov := &fakeDeprovisioner{failErr: errors.New("provisioner unreachable: context deadline exceeded")}
374+
w := jobs.NewExpireAnonymousWorker(db, nil, nil).WithDeprovisioner(deprov)
375+
376+
if err := w.Work(context.Background(), fakeJob[jobs.ExpireAnonymousArgs]()); err != nil {
377+
t.Fatalf("unexpected error: %v", err)
378+
}
379+
// The reaper must have actually attempted the teardown.
380+
if deprov.calls != 1 {
381+
t.Errorf("DeprovisionResource calls = %d, want 1 (the reaper must attempt teardown)", deprov.calls)
382+
}
383+
// THE LOAD-BEARING ASSERTION: the mark-deleted UPDATE must NOT have run.
384+
// markDeletedSpy.hit is true iff the reaper executed `UPDATE ... SET
385+
// status='deleted'` for the row whose deprovision failed — the exact bug
386+
// that orphans the customer's instant-customer-<token> namespace forever.
387+
if markDeletedSpy.hit {
388+
t.Error("MR-P0-1a regression: the reaper marked the row status='deleted' " +
389+
"even though DeprovisionResource FAILED. A 'deleted' row is terminal and " +
390+
"invisible to every reconciler — the backend (customer k8s namespace + its " +
391+
"live DB/Redis pod) is now orphaned forever. On a failed deprovision the row " +
392+
"must stay reapable so the next tick retries.")
393+
}
394+
}
395+
396+
// TestExpireAnonymousWorker_P0_1a_DeprovisionSuccess_StillMarksDeleted is the
397+
// companion to the above: it pins that the fix did NOT break the happy path —
398+
// when the backend teardown genuinely succeeds, the row IS marked 'deleted'.
399+
func TestExpireAnonymousWorker_P0_1a_DeprovisionSuccess_StillMarksDeleted(t *testing.T) {
400+
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp))
401+
if err != nil {
402+
t.Fatalf("sqlmock.New: %v", err)
403+
}
404+
defer db.Close()
405+
406+
mock.ExpectQuery(`SELECT id::text`).
407+
WillReturnRows(sqlmock.NewRows([]string{"id", "token", "resource_type", "provider_resource_id"}).
408+
AddRow("id-ok", "tok-ok", "postgres", "db_tok_ok"))
409+
// Deprovision succeeds → the mark-deleted UPDATE MUST run.
410+
mock.ExpectExec(`UPDATE resources SET status = 'deleted'`).
411+
WillReturnResult(sqlmock.NewResult(1, 1))
412+
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM resources`).
413+
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
414+
415+
deprov := &fakeDeprovisioner{} // failErr nil → succeeds
416+
w := jobs.NewExpireAnonymousWorker(db, nil, nil).WithDeprovisioner(deprov)
417+
418+
if err := w.Work(context.Background(), fakeJob[jobs.ExpireAnonymousArgs]()); err != nil {
419+
t.Fatalf("unexpected error: %v", err)
420+
}
421+
if deprov.calls != 1 {
422+
t.Errorf("DeprovisionResource calls = %d, want 1", deprov.calls)
423+
}
424+
if err := mock.ExpectationsWereMet(); err != nil {
425+
t.Errorf("happy path regressed: a successful deprovision must still mark the row deleted: %v", err)
426+
}
427+
}

internal/jobs/k8s_namespace_client.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,31 @@ func (c *k8sNamespaceClient) NamespaceExists(ctx context.Context, namespace stri
9595
// on any API failure so the reconciler skips the k8s-orphan phase rather
9696
// than acting on a truncated list.
9797
func (c *k8sNamespaceClient) ListDeployNamespaces(ctx context.Context) ([]string, error) {
98+
return c.listNamespacesWithPrefix(ctx, deployNamespacePrefixTDE)
99+
}
100+
101+
// ListCustomerNamespaces returns the names of every instant-customer-*
102+
// namespace currently in the cluster. The orphan-sweep reconciler's PASS 4
103+
// uses this to find customer namespaces whose backing resources row is gone
104+
// (the MR-P0-1b leak: a reaper that marked the row 'deleted' while the
105+
// namespace's backend stayed live). Returns an error on any API failure so
106+
// the reconciler skips the customer-orphan phase rather than acting on a
107+
// truncated list — never delete a namespace off an incomplete picture.
108+
func (c *k8sNamespaceClient) ListCustomerNamespaces(ctx context.Context) ([]string, error) {
109+
return c.listNamespacesWithPrefix(ctx, customerNamespacePrefix)
110+
}
111+
112+
// listNamespacesWithPrefix is the shared cluster-scoped namespace List used
113+
// by both ListDeployNamespaces and ListCustomerNamespaces.
114+
func (c *k8sNamespaceClient) listNamespacesWithPrefix(ctx context.Context, prefix string) ([]string, error) {
98115
list, err := c.cs.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
99116
if err != nil {
100-
return nil, fmt.Errorf("k8sNamespaceClient.ListDeployNamespaces: %w", err)
117+
return nil, fmt.Errorf("k8sNamespaceClient.listNamespacesWithPrefix %q: %w", prefix, err)
101118
}
102119
var out []string
103120
for i := range list.Items {
104121
name := list.Items[i].Name
105-
if len(name) >= len(deployNamespacePrefixTDE) &&
106-
name[:len(deployNamespacePrefixTDE)] == deployNamespacePrefixTDE {
122+
if len(name) >= len(prefix) && name[:len(prefix)] == prefix {
107123
out = append(out, name)
108124
}
109125
}

0 commit comments

Comments
 (0)