diff --git a/internal/jobs/propagation_chaos_followups_test.go b/internal/jobs/propagation_chaos_followups_test.go new file mode 100644 index 0000000..537cf25 --- /dev/null +++ b/internal/jobs/propagation_chaos_followups_test.go @@ -0,0 +1,271 @@ +package jobs + +// propagation_chaos_followups_test.go — CHAOS-DRILL-2026-05-20 F2/F3/F4 tests. +// +// These tests pin the three follow-up fixes from the 2026-05-20 chaos drill: +// +// F2: TestPropagation_UnknownKind_DeadLettersAtMaxAttempts +// Without the F2 fix, an `unknown_kind` row escapes the maxAttempts +// ceiling and retries forever (confirmed live during the drill — +// chaos_test_unknown_kind reached attempts=10 in 4 minutes without +// transitioning to failed_at). This test drives a row with +// attempts == propagationMaxAttempts-1 through one final Work() tick +// with a kind NOT in propagationHandlers, and asserts that the row +// dead-letters (single UPDATE … SET failed_at=now()) and the +// Prometheus counter increments under reason="unknown_kind". +// +// Registry-iterating per CLAUDE.md rule 18: the test SYNTHESIZES a +// kind that is guaranteed-not-in-the-registry rather than hardcoding +// a string — so a future PR that adds a new kind cannot accidentally +// turn this test into a no-op. +// +// F3: TestPropagation_DeadLetter_IncrementsMetric +// Pins the F3 contract: every transition to failed_at increments +// metrics.PropagationDeadLetteredTotal. Drives a row at +// attempts == propagationMaxAttempts-1 through a final RegradeResource +// failure (the modal "max_attempts" path) and asserts the counter +// moved from 0 to 1 with reason="max_attempts" / kind="tier_elevation". +// +// F4: TestWorker_RiverConfig_RescueStuckJobsAfterIs25Min +// Pins the F4 fix: the rescueStuckJobsAfter constant is exactly 25 +// minutes. A future PR that drifts this value to River's default +// (1h20m) breaks our 80-minute-RTO ceiling silently — this test +// catches it at build time. + +import ( + "context" + "database/sql" + "errors" + "fmt" + "testing" + "time" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus/testutil" + + commonplans "instant.dev/common/plans" + + "instant.dev/worker/internal/metrics" +) + +// ─── F2: unknown_kind dead-letters at maxAttempts ───────────────────────────── + +func TestPropagation_UnknownKind_DeadLettersAtMaxAttempts(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + // Registry-iterating per rule 18: synthesize a kind GUARANTEED not in + // propagationHandlers. A hand-typed "chaos_test_unknown_kind" string + // would silently start passing if some future PR adds that exact kind + // to propagationHandlers; deriving from the wall clock means the kind + // is fresh on every run AND we can prove it. + syntheticKind := fmt.Sprintf("chaos_unknown_kind_%d", time.Now().UnixNano()) + if _, ok := propagationHandlers[syntheticKind]; ok { + t.Fatalf("synthetic kind %q is unexpectedly in propagationHandlers — test cannot prove the F2 path", syntheticKind) + } + // Defence-in-depth: confirm propagationHandlers is non-empty (otherwise + // EVERY kind would be unknown — meaningless test). + if len(propagationHandlers) == 0 { + t.Fatal("propagationHandlers is empty — F2 test is meaningless against an empty registry") + } + + // Snapshot the counter so we can compare deltas (other tests may have + // run first and bumped it). + before := testutil.ToFloat64(metrics.PropagationDeadLetteredTotal.WithLabelValues("unknown_kind", "unknown_kind")) + + propID := uuid.New() + teamID := uuid.New() + + // attempts == propagationMaxAttempts-1; one more "no handler" failure + // should dead-letter (not retry). + mock.ExpectBegin() + mock.ExpectQuery(`SELECT id, kind, team_id, target_tier, payload, attempts\s+FROM pending_propagations`). + WillReturnRows(sqlmock.NewRows(propagationSweepCols). + AddRow(propID, syntheticKind, teamID, "pro", []byte(`{}`), propagationMaxAttempts-1)) + mock.ExpectCommit() + + // NO handler dispatch — the row's kind is unknown. Straight to + // markUnknownKindDeadLettered: UPDATE pending_propagations SET failed_at=now(). + mock.ExpectExec(`UPDATE pending_propagations\s+SET attempts\s*=\s*attempts \+ 1,\s+last_attempt_at\s*=\s*now\(\),\s+last_error\s*=\s*\$1,\s+failed_at\s*=\s*now\(\)`). + WithArgs(sqlmock.AnyArg(), propID). + WillReturnResult(sqlmock.NewResult(0, 1)) + // age_seconds lookup. + mock.ExpectQuery(`SELECT created_at FROM pending_propagations WHERE id`). + WithArgs(propID). + WillReturnRows(sqlmock.NewRows([]string{"created_at"}).AddRow(time.Now().Add(-10 * time.Minute))) + // audit_log INSERT — propagation.unknown_kind_dead_lettered row. + mock.ExpectExec(`INSERT INTO audit_log`). + WillReturnResult(sqlmock.NewResult(0, 1)) + + stub := &stubPropagationRegrader{} + w := NewPropagationRunnerWorker(db, commonplans.Default(), stub) + + if wErr := w.Work(context.Background(), fakePropagationJob()); wErr != nil { + t.Fatalf("Work: %v", wErr) + } + + if stub.calls != 0 { + t.Errorf("RegradeResource calls = %d, want 0 — an unknown_kind row must never reach a handler", stub.calls) + } + + // The F3 contract for the F2 path: dead-letter Prom counter incremented + // with reason="unknown_kind", kind="unknown_kind" (bounded bucket). + after := testutil.ToFloat64(metrics.PropagationDeadLetteredTotal.WithLabelValues("unknown_kind", "unknown_kind")) + if after-before != 1 { + t.Errorf("PropagationDeadLetteredTotal{reason=unknown_kind,kind=unknown_kind} delta = %.0f, want 1", after-before) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestPropagation_UnknownKind_RetriesBelowMaxAttempts is the companion to the +// above — the pre-F2 behaviour stays the same WHEN attempts is still well +// below the ceiling. Without this companion, the F2 fix could silently +// regress the unknown_kind path into immediate dead-letter (the bug fix +// flipping into a new bug). A row at attempts=0 must retry once, not +// dead-letter. +func TestPropagation_UnknownKind_RetriesBelowMaxAttempts(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + syntheticKind := fmt.Sprintf("chaos_unknown_kind_retry_%d", time.Now().UnixNano()) + if _, ok := propagationHandlers[syntheticKind]; ok { + t.Fatalf("synthetic kind %q is unexpectedly in propagationHandlers", syntheticKind) + } + + propID := uuid.New() + teamID := uuid.New() + + // attempts = 0 → far below the ceiling. Must retry, not dead-letter. + mock.ExpectBegin() + mock.ExpectQuery(`SELECT id, kind, team_id, target_tier, payload, attempts\s+FROM pending_propagations`). + WillReturnRows(sqlmock.NewRows(propagationSweepCols). + AddRow(propID, syntheticKind, teamID, "pro", []byte(`{}`), 0)) + mock.ExpectCommit() + + // markRetry UPDATE — DOES NOT set failed_at. + expectedNext := time.Date(2026, 5, 20, 12, 1, 0, 0, time.UTC) // backoff[0] = 1m + mock.ExpectExec(`UPDATE pending_propagations\s+SET attempts\s*=\s*attempts \+ 1,\s+last_attempt_at\s*=\s*now\(\),\s+last_error\s*=\s*\$1,\s+next_attempt_at`). + WithArgs(sqlmock.AnyArg(), expectedNext, propID). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(`INSERT INTO audit_log`). + WillReturnResult(sqlmock.NewResult(0, 1)) + + stub := &stubPropagationRegrader{} + w := NewPropagationRunnerWorker(db, commonplans.Default(), stub) + w.now = fixedNow() + + if wErr := w.Work(context.Background(), fakePropagationJob()); wErr != nil { + t.Fatalf("Work: %v", wErr) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// ─── F3: dead-letter increments Prom counter on the modal path ──────────────── + +func TestPropagation_DeadLetter_IncrementsMetric(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + // Use the known tier_elevation kind — this drives the modal real-failure + // path through markDeadLettered (NOT the F2 unknown_kind path). + const knownKind = propagationKindTierElevation + if _, ok := propagationHandlers[knownKind]; !ok { + t.Fatalf("propagationKindTierElevation is unexpectedly NOT in propagationHandlers — registry contract broken") + } + + before := testutil.ToFloat64(metrics.PropagationDeadLetteredTotal.WithLabelValues("max_attempts", knownKind)) + + propID := uuid.New() + teamID := uuid.New() + resID := uuid.New() + + // attempts == propagationMaxAttempts-1; the next failure dead-letters. + mock.ExpectBegin() + mock.ExpectQuery(`SELECT id, kind, team_id, target_tier, payload, attempts\s+FROM pending_propagations`). + WillReturnRows(sqlmock.NewRows(propagationSweepCols). + AddRow(propID, knownKind, teamID, "pro", []byte(`{}`), propagationMaxAttempts-1)) + mock.ExpectCommit() + + // Handler queries the team's resources. + mock.ExpectQuery(`SELECT r\.id, r\.token, r\.provider_resource_id, r\.tier, r\.resource_type`). + WithArgs(teamID). + WillReturnRows(sqlmock.NewRows(teamResourcesCols). + AddRow(resID, "tok-1", sql.NullString{}, "pro", "postgres")) + + // Dead-letter UPDATE (failed_at=now()). + mock.ExpectExec(`UPDATE pending_propagations\s+SET attempts\s*=\s*attempts \+ 1,\s+last_attempt_at\s*=\s*now\(\),\s+last_error\s*=\s*\$1,\s+failed_at\s*=\s*now\(\)`). + WithArgs(sqlmock.AnyArg(), propID). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery(`SELECT created_at FROM pending_propagations WHERE id`). + WithArgs(propID). + WillReturnRows(sqlmock.NewRows([]string{"created_at"}).AddRow(time.Now().Add(-24 * time.Hour))) + mock.ExpectExec(`INSERT INTO audit_log`). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // Persistent gRPC failure → final attempt dead-letters. + stub := &stubPropagationRegrader{err: errors.New("provisioner still unreachable")} + w := NewPropagationRunnerWorker(db, commonplans.Default(), stub) + + if wErr := w.Work(context.Background(), fakePropagationJob()); wErr != nil { + t.Fatalf("Work: %v", wErr) + } + + after := testutil.ToFloat64(metrics.PropagationDeadLetteredTotal.WithLabelValues("max_attempts", knownKind)) + if after-before != 1 { + t.Errorf("PropagationDeadLetteredTotal{reason=max_attempts,kind=%s} delta = %.0f, want 1", + knownKind, after-before) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// ─── F4: River config carries the exact 25-minute rescue threshold ──────────── + +func TestWorker_RiverConfig_RescueStuckJobsAfterIs25Min(t *testing.T) { + // We can't easily exercise the river.NewClient path in a unit test + // (it requires a live pgx pool). Pin the underlying constant + // instead — the production wiring at workers.go passes + // `RescueStuckJobsAfter: rescueStuckJobsAfter` to river.NewClient, so + // a regression in the constant IS the regression in the field. + const want = 25 * time.Minute + if rescueStuckJobsAfter != want { + t.Errorf("rescueStuckJobsAfter = %s, want %s — CHAOS F4: a regression to River's default (1h20m) caps our worst-case RTO above the 25-minute ceiling agreed in CHAOS-DRILL-2026-05-20", + rescueStuckJobsAfter, want) + } + + // Defence-in-depth: the value must be > globalJobTimeout. Without + // this guard, an over-eager future PR could shrink rescueStuckJobsAfter + // below globalJobTimeout — and the rescuer would start rescuing + // in-flight jobs that River's own timeout is about to cancel. + if rescueStuckJobsAfter <= globalJobTimeout { + t.Errorf("rescueStuckJobsAfter (%s) must exceed globalJobTimeout (%s) — otherwise the rescuer races River's own timeout", + rescueStuckJobsAfter, globalJobTimeout) + } + + // And it must NOT match the River default (which would mean the + // explicit override silently regressed back). The default is + // JobTimeout + JobRescuerRescueAfterDefault = 20m + 1h = 1h20m. + const riverDefaultRescue = 20*time.Minute + 1*time.Hour + if rescueStuckJobsAfter >= riverDefaultRescue { + t.Errorf("rescueStuckJobsAfter (%s) is >= River's default (1h20m) — the explicit pin is no longer reducing the RTO", + rescueStuckJobsAfter) + } +} diff --git a/internal/jobs/propagation_runner.go b/internal/jobs/propagation_runner.go index a953118..4e7a447 100644 --- a/internal/jobs/propagation_runner.go +++ b/internal/jobs/propagation_runner.go @@ -59,6 +59,7 @@ import ( "context" "database/sql" "encoding/json" + "errors" "fmt" "log/slog" "os" @@ -70,6 +71,7 @@ import ( "go.opentelemetry.io/otel" commonv1 "instant.dev/proto/common/v1" + "instant.dev/worker/internal/metrics" ) // ─── named constants ────────────────────────────────────────────────────────── @@ -93,9 +95,15 @@ const ( // Mirrors api/internal/models/audit_kinds.go's AuditKindPropagation* // constants. Same drift contract as propagationKindTierElevation above. const ( - auditKindPropagationApplied = "propagation.applied" - auditKindPropagationRetrying = "propagation.retrying" - auditKindPropagationDeadLettered = "propagation.dead_lettered" + auditKindPropagationApplied = "propagation.applied" + auditKindPropagationRetrying = "propagation.retrying" + auditKindPropagationDeadLettered = "propagation.dead_lettered" + auditKindPropagationUnexpectedSkip = "propagation.unexpected_skip" + // CHAOS F2 (CHAOS-DRILL-2026-05-20): distinct audit kind for the + // old-worker/new-api rollout-skew dead-letter path. Lets the operator + // filter the image-skew signal independently of real-failure dead-letters + // via `audit_log.kind`. Mirrors api/internal/models/audit_kinds.go. + auditKindPropagationUnknownKindDeadLettered = "propagation.unknown_kind_dead_lettered" ) // propagationActor is the audit_log.actor value the runner writes. Distinct @@ -152,6 +160,105 @@ var propagationBackoffSchedule = []time.Duration{ // gRPC error string. const propagationLastErrorMax = 1000 +// propagationAllowedSkipSubstrings enumerates the SkipReason fragments that +// the provisioner uses to signal a TRUE no-op (idempotent re-apply / a +// resource type with no regrade arm). Anything else is an unexpected_skip +// and triggers retry/dead-letter per F1 (CHAOS-DRILL-2026-05-20). +// +// Held centrally (not inline) so the test surface and the bucketSkipReason +// helper can share the list. Adding an entry here changes the silent-skip +// surface; do it only when the provisioner adds a new "this is fine" +// outcome that genuinely doesn't need a retry. +var propagationAllowedSkipSubstrings = []string{ + "already correct", + "unsupported resource type", + "backend does not support", +} + +// isPropagationAllowedSkip returns true when the provisioner's SkipReason +// is one of the allowed no-op signals. +func isPropagationAllowedSkip(reason string) bool { + for _, sub := range propagationAllowedSkipSubstrings { + if strings.Contains(reason, sub) { + return true + } + } + return false +} + +// propagationUnexpectedSkipDetail captures one offending resource for an +// unexpected_skip retry. Multiple resources can fail in one dispatch; the +// operator wants to see every one in last_error so they can fix the +// underlying issue in batch. +type propagationUnexpectedSkipDetail struct { + ResourceID string + ResourceType string + SkipReason string +} + +// propagationUnexpectedSkipErr is the error returned by handleTierElevation +// when at least one resource's RegradeResource returned (Applied=false, +// SkipReason=). The runner's +// markRetry / markDeadLettered path detects this via errors.Is and emits +// propagation.unexpected_skip audit kind (not propagation.applied) — the +// operator-page signal for F1. +// +// CHAOS-DRILL-2026-05-20 F1: pre-fix the handler WARN-and-applied this +// case (firstErr stayed nil); the row got stamped applied_at and the +// regrade never landed. A paying customer's infra cap silently stayed at +// the old tier and no alert fired. This error type forces the case to +// retry → dead-letter and surfaces it loudly via a distinct audit kind + +// Prometheus counter. +type propagationUnexpectedSkipErr struct { + Resources []propagationUnexpectedSkipDetail +} + +func (e *propagationUnexpectedSkipErr) Error() string { + if e == nil || len(e.Resources) == 0 { + return "unexpected_skip: " + } + parts := make([]string, 0, len(e.Resources)) + for _, r := range e.Resources { + parts = append(parts, fmt.Sprintf("%s (%s): %s", r.ResourceID, r.ResourceType, r.SkipReason)) + } + return "unexpected_skip from provisioner: " + strings.Join(parts, "; ") +} + +// errPropagationUnexpectedSkipSentinel is the comparison target for +// errors.Is. The Is method on propagationUnexpectedSkipErr matches it so +// callers can switch on the class without depending on the concrete type. +var errPropagationUnexpectedSkipSentinel = errors.New("propagation: unexpected_skip from provisioner") + +// Is satisfies errors.Is for the sentinel — any propagationUnexpectedSkipErr +// matches errPropagationUnexpectedSkipSentinel. +func (e *propagationUnexpectedSkipErr) Is(target error) bool { + return target == errPropagationUnexpectedSkipSentinel +} + +// bucketSkipReason maps the raw provisioner SkipReason to a short canonical +// bucket suitable for a Prometheus label. Keeps cardinality bounded — never +// pass the raw SkipReason as a metric label. Adding a new bucket here is +// safe; the default "other" catches anything unrecognised. +func bucketSkipReason(reason string) string { + r := strings.ToLower(reason) + switch { + case strings.Contains(r, "postgres-admin secret") || strings.Contains(r, "postgres_admin"): + return "postgres_admin_secret_missing" + case strings.Contains(r, "redis-auth secret"): + return "redis_auth_secret_missing" + case strings.Contains(r, "namespace not found"): + return "namespace_not_found" + case strings.Contains(r, "not reachable") || strings.Contains(r, "unreachable"): + return "resource_not_reachable" + case strings.Contains(r, "pod not found") || strings.Contains(r, "no pod"): + return "pod_not_found" + case strings.Contains(r, "legacy resource") || strings.Contains(r, "legacy pod"): + return "legacy_resource" + default: + return "other" + } +} + // ─── job definition + handler registry ──────────────────────────────────────── // PropagationRunnerArgs is the River job payload. No fields — sweep job. @@ -282,8 +389,31 @@ func (w *PropagationRunnerWorker) Work(ctx context.Context, job *river.Job[Propa // image could see a row from a newer api enqueue. Treat as a // retryable failure so the row is NOT dead-lettered before // the worker rolls forward. + // + // CHAOS F2 (CHAOS-DRILL-2026-05-20): apply the SAME maxAttempts + // ceiling here that real-failure rows get below. Without this, + // an old worker pod that doesn't recognise a freshly-enqueued + // kind retries forever — the chaos drill on 2026-05-20 saw a + // chaos_test_unknown_kind row reach attempts=10 in 4 minutes + // without ever transitioning to failed_at. Bounding the + // unknown-kind path on the same dead-letter horizon as a real + // failure (a) caps the eventual blast radius of an old image + // surviving the rollout window, (b) gives the operator a clean + // alert-able signal via instant_propagation_dead_lettered_total{reason="unknown_kind"} + // rather than an unbounded retry storm. The per-tick image-skew + // indicator stays in PropagationUnknownKindTotal — that fires on + // EVERY tick while the row is alive, so the operator can react + // in seconds instead of waiting the ~24h backoff for a + // dead-letter to fire. unknownKind++ - w.markRetry(ctx, row, fmt.Errorf("no handler registered for kind %q", row.kind)) + metrics.PropagationUnknownKindTotal.WithLabelValues(row.kind).Inc() + dispatchErr := fmt.Errorf("no handler registered for kind %q", row.kind) + if row.attempts+1 >= propagationMaxAttempts { + deadLettered++ + w.markUnknownKindDeadLettered(ctx, row, dispatchErr) + continue + } + w.markRetry(ctx, row, dispatchErr) continue } @@ -424,10 +554,18 @@ func propagationBackoffFor(attempts int) time.Duration { } // markRetry persists attempts+1, schedules next_attempt_at via -// propagationBackoffFor, persists last_error, and emits a DEBUG-level -// propagation.retrying audit row. Best-effort: a DB failure here only -// means the next tick may re-process the same row (idempotent handler -// makes that safe) and the audit row is missing. +// propagationBackoffFor, persists last_error, and emits an audit row. +// +// Audit kind selection (CHAOS-DRILL-2026-05-20 F1): +// - errors.Is(dispatchErr, errPropagationUnexpectedSkipSentinel) → +// propagation.unexpected_skip audit row + WARN slog (the F1 loud +// signal: provisioner returned applied=false with a skip reason +// outside the allowed set — regrade did NOT land). +// - else → propagation.retrying audit row + DEBUG slog (routine retry). +// +// Best-effort: a DB failure here only means the next tick may re-process +// the same row (idempotent handler makes that safe) and the audit row is +// missing. func (w *PropagationRunnerWorker) markRetry(ctx context.Context, row propagationRow, dispatchErr error) { delay := propagationBackoffFor(row.attempts) nextAttempt := w.now().Add(delay) @@ -463,23 +601,47 @@ func (w *PropagationRunnerWorker) markRetry(ctx context.Context, row propagation return } - // DEBUG audit row — retries are routine during a Razorpay/provisioner - // outage; logging at WARN would spam. Operators inspect the row's - // `attempts` + `last_error` columns directly for diagnosis. - w.insertPropagationAuditRow(ctx, row, auditKindPropagationRetrying, fmt.Sprintf( - "%s propagation for team %s retrying (attempt %d/%d)", - row.kind, row.teamID.String(), row.attempts+1, propagationMaxAttempts, + // Audit kind: unexpected_skip is the loud F1 signal; retrying is the + // routine path. Both share the same DB UPDATE above — only the audit + // row + log line differ, so operators can grep/alert on the F1 class + // without false-positiving on a transient Razorpay outage. + isUnexpectedSkip := errors.Is(dispatchErr, errPropagationUnexpectedSkipSentinel) + auditKind := auditKindPropagationRetrying + if isUnexpectedSkip { + auditKind = auditKindPropagationUnexpectedSkip + } + + w.insertPropagationAuditRow(ctx, row, auditKind, fmt.Sprintf( + "%s propagation for team %s retrying (attempt %d/%d, kind=%s)", + row.kind, row.teamID.String(), row.attempts+1, propagationMaxAttempts, auditKind, ), map[string]any{ - "propagation_id": row.id.String(), - "kind": row.kind, - "team_id": row.teamID.String(), - "target_tier": nullableTierString(row.targetTier), - "attempts": row.attempts + 1, - "max_attempts": propagationMaxAttempts, - "next_attempt_at": nextAttempt.UTC().Format(time.RFC3339), - "last_error": lastErr, + "propagation_id": row.id.String(), + "kind": row.kind, + "team_id": row.teamID.String(), + "target_tier": nullableTierString(row.targetTier), + "attempts": row.attempts + 1, + "max_attempts": propagationMaxAttempts, + "next_attempt_at": nextAttempt.UTC().Format(time.RFC3339), + "last_error": lastErr, }) + if isUnexpectedSkip { + // WARN-level: unexpected_skip is the F1 signal — louder than a + // routine retry (DEBUG), quieter than the eventual dead-letter + // (ERROR). Pre-fix this case was silently APPLIED, so even WARN + // is a step up. + slog.Warn("jobs.propagation_runner.unexpected_skip_retrying", + "propagation_id", row.id.String(), + "team_id", row.teamID.String(), + "kind", row.kind, + "attempts", row.attempts+1, + "max_attempts", propagationMaxAttempts, + "next_attempt_at", nextAttempt, + "last_error", lastErr, + ) + return + } + slog.Debug("jobs.propagation_runner.retrying", "propagation_id", row.id.String(), "team_id", row.teamID.String(), @@ -607,6 +769,24 @@ func (w *PropagationRunnerWorker) markDeadLettered(ctx context.Context, row prop ) w.insertPropagationAuditRow(ctx, row, auditKindPropagationDeadLettered, summary, meta) + // CHAOS F3 (CHAOS-DRILL-2026-05-20): increment the Prom counter. Labels: + // reason = "max_attempts" — the modal real-failure path. F1's + // unexpected_skip-as-failure, real + // RegradeResource gRPC errors, AND + // F1's markApplied DB failures all flow + // through here once they reach the + // propagationMaxAttempts ceiling, so a + // single `reason=max_attempts` covers + // every "we tried and tried and finally + // gave up" path. + // kind = row.kind — bounded by propagationKnownKinds; the + // unknown_kind path uses + // markUnknownKindDeadLettered which sets + // kind="unknown_kind" explicitly to keep + // cardinality bounded by code, not by + // attacker-controlled enqueue values. + metrics.PropagationDeadLetteredTotal.WithLabelValues("max_attempts", row.kind).Inc() + // CRITICAL severity: this is THE alert. NR Log alert filters on // audit_kind='propagation.dead_lettered' OR on the message below. slog.Error("jobs.propagation_runner.dead_lettered", @@ -621,6 +801,98 @@ func (w *PropagationRunnerWorker) markDeadLettered(ctx context.Context, row prop ) } +// markUnknownKindDeadLettered transitions a row to failed_at when its `kind` +// has no registered handler AND the row has now reached propagationMaxAttempts. +// Shares 90% of its body with markDeadLettered but emits a DISTINCT audit kind +// (propagation.unknown_kind_dead_lettered) so an operator can filter on the +// old-image-rollback signal independently of real provisioner failures, and +// labels the Prom counter with reason="unknown_kind" + kind="unknown_kind" +// (we deliberately do NOT pass row.kind into the kind label because the kind +// is by definition not in propagationKnownKinds — passing it would let an +// api-side enqueue blow up the worker's label cardinality without a code-side +// review). +// +// CHAOS F2 (CHAOS-DRILL-2026-05-20): without this path, an unknown-kind row +// retried forever — confirmed live during the chaos drill +// (chaos_test_unknown_kind reached attempts=10 in 4 minutes without ever +// transitioning to failed_at). With this path, the row dead-letters after +// the same 10 attempts as any real failure, the operator sees a +// propagation.unknown_kind_dead_lettered audit row + a Prom counter increment, +// and the runaway retry loop ends. +func (w *PropagationRunnerWorker) markUnknownKindDeadLettered(ctx context.Context, row propagationRow, dispatchErr error) { + lastErr := truncatePropagationError(dispatchErr.Error()) + res, err := w.db.ExecContext(ctx, ` + UPDATE pending_propagations + SET attempts = attempts + 1, + last_attempt_at = now(), + last_error = $1, + failed_at = now() + WHERE id = $2 + AND applied_at IS NULL + AND failed_at IS NULL + `, lastErr, row.id) + if err != nil { + slog.Error("jobs.propagation_runner.unknown_kind_dead_letter_persist_failed", + "propagation_id", row.id.String(), + "team_id", row.teamID.String(), + "kind", row.kind, + "error", err, + "dispatch_error", lastErr, + ) + return + } + affected, _ := res.RowsAffected() + if affected == 0 { + // Already terminal — sibling tick won the race. + return + } + + // age_seconds — same best-effort pattern as markDeadLettered. + var ageSeconds float64 + var createdAt sql.NullTime + if qErr := w.db.QueryRowContext(ctx, + `SELECT created_at FROM pending_propagations WHERE id = $1`, row.id, + ).Scan(&createdAt); qErr == nil && createdAt.Valid { + ageSeconds = w.now().Sub(createdAt.Time).Seconds() + } + + meta := map[string]any{ + "propagation_id": row.id.String(), + "kind": row.kind, + "team_id": row.teamID.String(), + "target_tier": nullableTierString(row.targetTier), + "attempts": row.attempts + 1, + "max_attempts": propagationMaxAttempts, + "last_error": lastErr, + "age_seconds": ageSeconds, + "failure_reason": "unknown_kind", + } + summary := fmt.Sprintf( + "unknown_kind propagation (kind=%q) for team %s DEAD-LETTERED after %d attempts — worker image is older than api enqueue; finish the rollout", + row.kind, row.teamID.String(), propagationMaxAttempts, + ) + // Distinct audit kind so an operator-side NRQL filter can separate + // real-failure dead-letters from image-skew dead-letters. + w.insertPropagationAuditRow(ctx, row, auditKindPropagationUnknownKindDeadLettered, summary, meta) + + // kind="unknown_kind" (the BUCKET name) — see comment on the counter + // declaration in metrics.go for the cardinality rationale. + metrics.PropagationDeadLetteredTotal.WithLabelValues("unknown_kind", "unknown_kind").Inc() + + // CRITICAL severity — different action than the real-failure path. + // The fix here is an operator rolling the worker forward, NOT + // inspecting a specific team's resources. + slog.Error("jobs.propagation_runner.unknown_kind_dead_lettered", + "propagation_id", row.id.String(), + "team_id", row.teamID.String(), + "kind", row.kind, + "attempts", row.attempts+1, + "max_attempts", propagationMaxAttempts, + "last_error", lastErr, + "action", "worker pod is running an older image than the api enqueued — finish the rollout; manually re-enqueue this row only after the kind has a handler", + ) +} + // insertPropagationAuditRow writes one audit_log row, best-effort. A miss // here only loses the operator-visible ledger entry; the slog line still // fires, and NR alerts can key on the message string when the audit row @@ -672,10 +944,20 @@ func nullableTierString(s sql.NullString) string { // RegradeResource with the resource's per-row tier snapshot (MR-P1-21 — see // entitlement_reconciler.go for the snapshot-is-entitlement contract). Any // per-resource gRPC error fails the WHOLE row (so the entire row retries -// with backoff). Per-resource "skip" outcomes (applied=false with -// skip_reason="already correct" / "unsupported resource type" / "backend -// does not support regrade") are NOT treated as failures: they are the -// provisioner's idempotency / type-coverage signal. +// with backoff). Per-resource "skip" outcomes whose SkipReason is in +// propagationAllowedSkipSubstrings ("already correct" / "unsupported +// resource type" / "backend does not support regrade") are NOT treated as +// failures: they are the provisioner's idempotency / type-coverage signal. +// +// CHAOS-DRILL-2026-05-20 F1 fix: any OTHER SkipReason — e.g. "postgres-admin +// secret not found", "namespace not found", "resource not reachable" — is +// treated as a retryable failure. The runner detects the returned +// propagationUnexpectedSkipErr via errors.Is and emits a distinct +// propagation.unexpected_skip audit row (not propagation.applied). The row +// retries per the backoff schedule and dead-letters at propagationMaxAttempts. +// Pre-fix this case WARN-logged and fell through with firstErr==nil, so the +// row got stamped applied_at and the regrade never landed — paying customers +// ended up with "Pro on paper, hobby-grade infra" and no alert. // // Idempotency: re-running this handler is safe because the provisioner's // RegradeResource does CONFIG GET / applied_conn_limit comparison before @@ -727,7 +1009,10 @@ func handleTierElevation(ctx context.Context, db *sql.DB, regrader propagationRe return nil } - var firstErr error + var ( + firstErr error + unexpectedSkipFound []propagationUnexpectedSkipDetail + ) for _, r := range resources { resType, supported := resourceTypeFromString(r.resourceType) if !supported { @@ -752,22 +1037,47 @@ func handleTierElevation(ctx context.Context, db *sql.DB, regrader propagationRe } continue } - // applied=false is NOT an error — it's the idempotency signal. - // We log only if the skip_reason indicates an actual provisioner- - // side problem (not "already correct" / type unsupported). - if !out.Applied && out.SkipReason != "" && - !strings.Contains(out.SkipReason, "already correct") && - !strings.Contains(out.SkipReason, "unsupported resource type") && - !strings.Contains(out.SkipReason, "backend does not support") { + // applied=false with an ALLOWED SkipReason ("already correct" / + // "unsupported resource type" / "backend does not support") is the + // idempotency / type-coverage signal — treat as success. + // + // applied=false with any OTHER SkipReason is an unexpected_skip: + // the regrade DID NOT LAND but the provisioner returned no error. + // CHAOS-DRILL-2026-05-20 F1 fix: accumulate the offending resources + // and return a propagationUnexpectedSkipErr so the runner retries + // the row (audit_kind=propagation.unexpected_skip) and dead-letters + // at propagationMaxAttempts (audit_kind=propagation.dead_lettered). + // Pre-fix this case fell through silently and the row was stamped + // applied_at; the customer's infra was never regraded and no alert + // fired. The metric counter is the leading indicator the dead-letter + // alert is the lagging signal. + if !out.Applied && out.SkipReason != "" && !isPropagationAllowedSkip(out.SkipReason) { slog.Warn("jobs.propagation_runner.tier_elevation.unexpected_skip", "propagation_id", row.id.String(), "resource_id", r.id.String(), "resource_type", r.resourceType, "skip_reason", out.SkipReason, ) + metrics.PropagationUnexpectedSkipTotal.WithLabelValues( + row.kind, r.resourceType, bucketSkipReason(out.SkipReason), + ).Inc() + unexpectedSkipFound = append(unexpectedSkipFound, propagationUnexpectedSkipDetail{ + ResourceID: r.id.String(), + ResourceType: r.resourceType, + SkipReason: out.SkipReason, + }) } } - return firstErr + + // A real gRPC error wins over an unexpected_skip — the gRPC error is + // the louder signal and a retry on it will also re-check the skip path. + if firstErr != nil { + return firstErr + } + if len(unexpectedSkipFound) > 0 { + return &propagationUnexpectedSkipErr{Resources: unexpectedSkipFound} + } + return nil } // resourceTypeFromString maps the resources.resource_type column value to diff --git a/internal/jobs/propagation_runner_integration_test.go b/internal/jobs/propagation_runner_integration_test.go new file mode 100644 index 0000000..c89aa96 --- /dev/null +++ b/internal/jobs/propagation_runner_integration_test.go @@ -0,0 +1,457 @@ +package jobs + +// propagation_runner_integration_test.go — Track 3 of the +// reliability-integration suite (2026-05-20). +// +// Adds the next layer up from propagation_runner_test.go (sqlmock unit +// drift guards). These tests exercise the runner's persistence + dispatch +// paths against a slightly broader surface: end-to-end markRetry + +// markDeadLettered + the unknown_kind bounded-retry path; plus live-DB +// gated tests for FOR UPDATE SKIP LOCKED concurrency and the enum ↔ +// handler-map registry walk. +// +// 1. TestPropagation_BackoffIntegration_ExactScheduleViaMarkRetry — +// drives w.markRetry directly with a mock clock and asserts the +// persisted next_attempt_at SQL UPDATE arg matches +// propagationBackoffSchedule[attempts] for every position + +// clamp behaviour. The existing +// TestPropagationBackoffSchedule_IsMonotonicAndClamps pins the +// propagationBackoffFor helper directly; THIS test pins the +// end-to-end markRetry persistence path so a regression in +// markRetry that doesn't pass through propagationBackoffFor +// (e.g. a constant-30s retry refactor) is also caught. +// +// 2. TestPropagation_DeadLetterIntegration_AtMaxAttempts — drives +// w.markDeadLettered directly and asserts the SQL update sets +// failed_at AND the propagation.dead_lettered audit row is +// emitted. +// +// 3. TestPropagation_UnknownKindIntegration_BoundedRetries — F2 P1 +// guard: insert a pending_propagation with kind='garbage', runner +// doesn't dispatch (no handler), markRetry fires (attempts++). +// The bounded-retry guarantee comes from attempts++ hitting +// maxAttempts — a future refactor that bypassed the increment +// would loop forever. +// +// 4. TestPropagation_ForUpdateSkipLockedIntegration — two concurrent +// runners against a REAL Postgres (gated on TEST_DATABASE_URL). +// Verifies at most one runner picks the row. +// +// 5. TestPropagation_RegistryWalkIntegration_EnumVsHandlerMap — the +// rule 18 registry test against the actual PG enum. The unit +// tests cover the slice; this covers the DB-side enum. +// +// COVERAGE BLOCKS per CLAUDE.md rule 17 — see per-test docstrings. + +import ( + "context" + "database/sql" + "errors" + "fmt" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/google/uuid" + + commonv1 "instant.dev/proto/common/v1" + + commonplans "instant.dev/common/plans" + + _ "github.com/lib/pq" +) + +// ─── Test 1: backoff exact schedule via markRetry ───────────────────────────── + +// TestPropagation_BackoffIntegration_ExactScheduleViaMarkRetry drives +// markRetry directly with a fixed clock and pins the next_attempt_at +// UPDATE arg for every position in the schedule + the clamp. +// +// COVERAGE BLOCK (rule 17): +// Symptom: a PR rewrites markRetry to use a different formula +// (e.g. constant 30s retries) without updating the +// schedule — silently bumps retry rate and pages NR +// with retry storm noise during a real outage. +// Enumeration: `rg -F 'propagationBackoffFor\|propagationBackoffSchedule' worker/` +// Sites found: 2 (the schedule slice + the function). +// Sites touched: 2 (this test pins both via the SQL UPDATE arg). +// Coverage test: a divergence between markRetry's UPDATE arg and +// propagationBackoffSchedule[attempts] FAILS here. +// Live verified: worker chaos drill 2026-05-20. +func TestPropagation_BackoffIntegration_ExactScheduleViaMarkRetry(t *testing.T) { + clock := time.Date(2026, 5, 20, 12, 0, 0, 0, time.UTC) + for i := 0; i < len(propagationBackoffSchedule); i++ { + i := i + t.Run(fmt.Sprintf("attempts=%d", i), func(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + propID := uuid.New() + teamID := uuid.New() + + expectedDelay := propagationBackoffFor(i) + expectedNext := clock.Add(expectedDelay) + + mock.ExpectExec(`UPDATE pending_propagations\s+SET attempts`). + WithArgs(sqlmock.AnyArg(), expectedNext, propID). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(`INSERT INTO audit_log`). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := NewPropagationRunnerWorker(db, commonplans.Default(), &stubPropagationRegrader{}) + w.now = func() time.Time { return clock } + + row := propagationRow{ + id: propID, + kind: propagationKindTierElevation, + teamID: teamID, + targetTier: sql.NullString{String: "pro", Valid: true}, + attempts: i, + } + w.markRetry(context.Background(), row, errors.New("synthetic failure")) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("attempts=%d: unmet sqlmock expectations: %v", i, err) + } + }) + } + + // Clamp: attempts beyond len(schedule)-1 stays at the final entry. + t.Run("attempts_beyond_schedule_clamps", func(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + propID := uuid.New() + teamID := uuid.New() + finalDelay := propagationBackoffSchedule[len(propagationBackoffSchedule)-1] + expectedNext := clock.Add(finalDelay) + mock.ExpectExec(`UPDATE pending_propagations\s+SET attempts`). + WithArgs(sqlmock.AnyArg(), expectedNext, propID). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(`INSERT INTO audit_log`). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := NewPropagationRunnerWorker(db, commonplans.Default(), &stubPropagationRegrader{}) + w.now = func() time.Time { return clock } + row := propagationRow{ + id: propID, + kind: propagationKindTierElevation, + teamID: teamID, + attempts: len(propagationBackoffSchedule) + 5, + } + w.markRetry(context.Background(), row, errors.New("late")) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("clamp arm: unmet sqlmock expectations: %v", err) + } + }) +} + +// ─── Test 2: dead-letter integration ────────────────────────────────────────── + +// TestPropagation_DeadLetterIntegration_AtMaxAttempts drives +// w.markDeadLettered directly. Pins the SQL UPDATE setting failed_at +// + the audit_log INSERT emitting the propagation.dead_lettered kind. +// The slog ERROR line is the alert-key; the audit row is the +// operator-visible ledger. +// +// COVERAGE BLOCK (rule 17): +// Symptom: a future refactor moves the dead_lettered audit +// emission conditionally → the customer's broken-tier +// infra stops paging NR on dead-letter. +// Enumeration: `rg -F 'auditKindPropagationDeadLettered' worker/` +// Sites found: 2 (constant declaration + emission). +// Sites touched: 1 (the emission via this test). +// Coverage test: removing the audit insert from markDeadLettered +// unmets the sqlmock expectation here. +func TestPropagation_DeadLetterIntegration_AtMaxAttempts(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + propID := uuid.New() + teamID := uuid.New() + + mock.ExpectExec(`UPDATE pending_propagations\s+SET attempts\s*=\s*attempts \+ 1,\s+last_attempt_at\s*=\s*now\(\),\s+last_error\s*=\s*\$1,\s+failed_at\s*=\s*now\(\)`). + WithArgs(sqlmock.AnyArg(), propID). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery(`SELECT created_at FROM pending_propagations WHERE id`). + WithArgs(propID). + WillReturnRows(sqlmock.NewRows([]string{"created_at"}).AddRow(time.Now().Add(-26 * time.Hour))) + mock.ExpectExec(`INSERT INTO audit_log`). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := NewPropagationRunnerWorker(db, commonplans.Default(), &stubPropagationRegrader{}) + row := propagationRow{ + id: propID, + kind: propagationKindTierElevation, + teamID: teamID, + attempts: propagationMaxAttempts - 1, + } + w.markDeadLettered(context.Background(), row, errors.New("final failure")) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// ─── Test 3: F2 P1 guard — unknown_kind bounded retries ─────────────────────── + +// TestPropagation_UnknownKindIntegration_BoundedRetries verifies a row +// with a kind absent from propagationHandlers is treated as a +// retryable failure (markRetry, NOT silent skip). The bounded-retry +// guarantee comes from markRetry incrementing attempts: after +// propagationMaxAttempts invocations, the row dead-letters per the +// standard path. +// +// COVERAGE BLOCK (rule 17): +// Symptom: F2 P1 — old-worker / new-api skew. api enqueues a +// new kind, worker doesn't know it → loops forever +// consuming worker tick capacity. +// Enumeration: `rg -F 'no handler registered for kind' worker/` +// Sites found: 1 (propagation_runner.go). +// Sites touched: 1 (this test). +// Coverage test: removing attempts++ in the unknown_kind branch +// fails this test (the markRetry UPDATE is unmet). +// Live verified: chaos drill 2026-05-20 (synthetic kind='garbage'). +func TestPropagation_UnknownKindIntegration_BoundedRetries(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + propID := uuid.New() + teamID := uuid.New() + + mock.ExpectBegin() + mock.ExpectQuery(`SELECT id, kind, team_id, target_tier, payload, attempts\s+FROM pending_propagations`). + WillReturnRows(sqlmock.NewRows(propagationSweepCols). + AddRow(propID, "garbage_kind_nobody_handles", teamID, "pro", []byte(`{}`), 0)) + mock.ExpectCommit() + + // Expected: markRetry fires (attempts++ + audit row). On master + // today, the unknown_kind branch routes to markRetry (NOT + // markDeadLettered immediately) per propagation_runner.go. + mock.ExpectExec(`UPDATE pending_propagations\s+SET attempts\s*=\s*attempts \+ 1`). + WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), propID). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(`INSERT INTO audit_log`). + WillReturnResult(sqlmock.NewResult(0, 1)) + + stub := &stubPropagationRegrader{} + w := NewPropagationRunnerWorker(db, commonplans.Default(), stub) + if wErr := w.Work(context.Background(), fakePropagationJob()); wErr != nil { + t.Fatalf("Work: %v", wErr) + } + + if stub.calls != 0 { + t.Errorf("regrader.calls = %d, want 0 — unknown_kind must NOT dispatch any provisioner RPC", stub.calls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// ─── Test 4: FOR UPDATE SKIP LOCKED concurrency ─────────────────────────────── + +// TestPropagation_ForUpdateSkipLockedIntegration runs two concurrent +// pickers against a real Postgres + a single eligible row, asserts at +// most one picks it up. Gated on TEST_DATABASE_URL + absence of +// -short. +// +// COVERAGE BLOCK (rule 17): +// Symptom: a future refactor drops FOR UPDATE SKIP LOCKED → +// two concurrent runner pods double-dispatch the +// same row → 2x noise + duplicate audit_log entries. +// Enumeration: `rg -F 'FOR UPDATE SKIP LOCKED' worker/` +// Sites found: 1 (pickEligible in propagation_runner.go). +// Sites touched: 1. +// Coverage test: removing the SKIP LOCKED clause makes both pickers +// see the row → total picks = 2 → test fails. +// Live verified: chaos drill 2026-05-20. +func TestPropagation_ForUpdateSkipLockedIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skip live-DB test under -short (regular gate)") + } + dsn := os.Getenv("TEST_DATABASE_URL") + if dsn == "" { + t.Skip("set TEST_DATABASE_URL to run the live-DB SKIP LOCKED test") + } + db, err := sql.Open("postgres", dsn) + if err != nil { + t.Fatalf("sql.Open: %v", err) + } + defer db.Close() + if err := db.Ping(); err != nil { + t.Skipf("ping TEST_DATABASE_URL: %v — DB not reachable", err) + } + + var exists bool + if err := db.QueryRowContext(context.Background(), ` + SELECT EXISTS ( + SELECT 1 FROM information_schema.tables + WHERE table_name = 'pending_propagations' + ) + `).Scan(&exists); err != nil { + t.Fatalf("check table exists: %v", err) + } + if !exists { + t.Skip("pending_propagations table absent — run api migrations against TEST_DATABASE_URL first") + } + + ctx := context.Background() + teamID := uuid.New() + propID := uuid.New() + if _, err := db.ExecContext(ctx, ` + INSERT INTO pending_propagations + (id, kind, team_id, target_tier, payload, attempts, next_attempt_at, created_at) + VALUES ($1, $2, $3, $4, '{}'::jsonb, 0, now() - interval '1 minute', now()) + `, propID, propagationKindTierElevation, teamID, "pro"); err != nil { + t.Fatalf("seed: %v", err) + } + t.Cleanup(func() { + _, _ = db.ExecContext(context.Background(), `DELETE FROM pending_propagations WHERE id = $1`, propID) + }) + + var ( + pickedByA, pickedByB int64 + wg sync.WaitGroup + ) + + wA := NewPropagationRunnerWorker(db, commonplans.Default(), &stubPropagationRegrader{}) + wB := NewPropagationRunnerWorker(db, commonplans.Default(), &stubPropagationRegrader{}) + + startGate := make(chan struct{}) + wg.Add(2) + go func() { + defer wg.Done() + <-startGate + rows, err := wA.pickEligible(context.Background()) + if err != nil { + t.Errorf("wA.pickEligible: %v", err) + return + } + atomic.AddInt64(&pickedByA, int64(len(rows))) + }() + go func() { + defer wg.Done() + <-startGate + rows, err := wB.pickEligible(context.Background()) + if err != nil { + t.Errorf("wB.pickEligible: %v", err) + return + } + atomic.AddInt64(&pickedByB, int64(len(rows))) + }() + + close(startGate) + wg.Wait() + + total := atomic.LoadInt64(&pickedByA) + atomic.LoadInt64(&pickedByB) + if total > 1 { + t.Errorf("total picks = %d, want <= 1 (FOR UPDATE SKIP LOCKED is broken)", total) + } + if total == 0 { + t.Logf("total picks = 0 (both pickers raced past) — not a SKIP-LOCKED failure; what we guard against is total > 1.") + } +} + +// ─── Test 5: registry-walk against the PG enum ──────────────────────────────── + +// TestPropagation_RegistryWalkIntegration_EnumVsHandlerMap iterates +// the pending_propagations.kind PostgreSQL enum values + asserts +// every one has a propagationHandlers entry. The slice-based +// TestPropagationRunner_EveryKindHasAHandler covers the worker-side +// constants; THIS test covers the DB-side enum which the api +// enqueues against. Drift between the two registries IS the failure +// mode. +// +// Gated on TEST_DATABASE_URL. +// +// COVERAGE BLOCK (rule 17): +// Symptom: a migration adds a new value to the +// pending_propagations.kind enum but the worker +// release doesn't ship the handler. api enqueues +// the new kind; worker logs WARN ("no handler +// registered") and retry-loops until dead-letter. +// Enumeration: `psql -c "SELECT enum_range(NULL::propagation_kind)"` +// ↔ propagationHandlers map keys. +// Sites found: N (the enum values). +// Sites touched: N (this test iterates ALL). +// Coverage test: divergence between enum and handler map fails. +func TestPropagation_RegistryWalkIntegration_EnumVsHandlerMap(t *testing.T) { + if testing.Short() { + t.Skip("skip live-DB registry walk under -short (regular gate)") + } + dsn := os.Getenv("TEST_DATABASE_URL") + if dsn == "" { + t.Skip("set TEST_DATABASE_URL to walk the pending_propagations.kind enum") + } + db, err := sql.Open("postgres", dsn) + if err != nil { + t.Fatalf("sql.Open: %v", err) + } + defer db.Close() + if err := db.Ping(); err != nil { + t.Skipf("ping TEST_DATABASE_URL: %v", err) + } + + var udtName sql.NullString + if err := db.QueryRowContext(context.Background(), ` + SELECT udt_name + FROM information_schema.columns + WHERE table_name = 'pending_propagations' + AND column_name = 'kind' + LIMIT 1 + `).Scan(&udtName); err != nil { + t.Skipf("inspect pending_propagations.kind: %v — table may not be migrated", err) + } + if !udtName.Valid { + t.Skip("pending_propagations.kind column has no udt_name — schema mismatch") + } + if udtName.String == "text" || udtName.String == "varchar" { + t.Skipf("pending_propagations.kind is %s (not an enum) — rule-18 guarantee delegated to TestPropagationRunner_EveryKindHasAHandler", udtName.String) + } + + rows, err := db.QueryContext(context.Background(), + fmt.Sprintf(`SELECT unnest(enum_range(NULL::%s))::text`, udtName.String)) + if err != nil { + t.Skipf("read enum %q values: %v", udtName.String, err) + } + defer rows.Close() + var enumValues []string + for rows.Next() { + var v string + if scanErr := rows.Scan(&v); scanErr != nil { + t.Errorf("scan enum value: %v", scanErr) + continue + } + enumValues = append(enumValues, v) + } + if rowsErr := rows.Err(); rowsErr != nil { + t.Fatalf("enum rows iter: %v", rowsErr) + } + if len(enumValues) == 0 { + t.Skip("enum has zero values — schema not seeded") + } + + for _, v := range enumValues { + if _, ok := propagationHandlers[v]; !ok { + t.Errorf("pending_propagations.kind enum value %q has NO handler in propagationHandlers — a row of this kind enqueued by the api would loop forever on no-handler retries until it dead-letters", + v) + } + } +} + +// preventImportUnused stays in case ctx-based helpers are later added. +var _ = commonv1.ResourceType_RESOURCE_TYPE_POSTGRES diff --git a/internal/jobs/propagation_unexpected_skip_test.go b/internal/jobs/propagation_unexpected_skip_test.go new file mode 100644 index 0000000..eb79da8 --- /dev/null +++ b/internal/jobs/propagation_unexpected_skip_test.go @@ -0,0 +1,362 @@ +package jobs + +// propagation_unexpected_skip_test.go — CHAOS-DRILL-2026-05-20 F1 regression +// coverage for "unexpected_skip silently marks the propagation APPLIED". +// +// Pre-fix bug: handleTierElevation in propagation_runner.go (lines 756–771 +// pre-patch) treated `(Applied=false, SkipReason=)` as success. A WARN log fired, firstErr stayed +// nil, and the runner stamped applied_at on the row. A paying customer's +// regrade never landed — there was no retry, no dead-letter, no alert. +// +// Fix: any non-allowed SkipReason now returns propagationUnexpectedSkipErr. +// The runner's markRetry path detects this via errors.Is and emits a +// propagation.unexpected_skip audit row (NOT propagation.applied). The row +// retries per the backoff schedule and dead-letters at propagationMaxAttempts +// with a propagation.dead_lettered audit row. The Prometheus counter +// PropagationUnexpectedSkipTotal increments at the emit site so dashboards +// can spot patterns before the dead-letter lagging signal fires. +// +// Tests: +// +// 1. TestPropagation_UnexpectedSkip_DoesNotMarkApplied +// The headline regression test. Provisioner returns +// (Applied=false, SkipReason="postgres-admin secret not found"). Assert: +// - markApplied is NOT called (no applied_at UPDATE in the sqlmock script). +// - markRetry IS called: attempts+1, next_attempt_at advanced. +// - audit_log row is propagation.unexpected_skip (NOT propagation.applied). +// - PropagationUnexpectedSkipTotal counter incremented by 1 with +// the right labels. +// +// 2. TestPropagation_UnexpectedSkip_DeadLettersAtMaxAttempts +// attempts == propagationMaxAttempts-1, then one more +// unexpected_skip. Assert the row dead-letters via the modal +// markDeadLettered path (single UPDATE … SET failed_at=now()), and +// the audit row is propagation.dead_lettered. +// +// 3. TestIsPropagationAllowedSkip_Coverage +// Registry-iterating coverage test per CLAUDE.md rule 18. The +// allowed-skip set lives in one place (propagationAllowedSkipSubstrings) +// and we assert every documented allowed-skip string passes, while +// a representative set of unexpected-skip strings fails. If a future +// PR adds a new allowed-skip substring without updating the list, the +// regression test below catches the un-mapped string. +// +// 4. TestPropagationUnexpectedSkipErr_IsMatches +// Pins the errors.Is contract: the markRetry path uses +// errors.Is(err, errPropagationUnexpectedSkipSentinel) to switch on +// the audit kind, so the Is() implementation MUST match the +// sentinel. A future refactor that breaks this would silently +// reintroduce the F1 bug (markRetry would emit propagation.retrying +// instead of propagation.unexpected_skip). +// +// 5. TestBucketSkipReason_BoundsCardinality +// Pins the Prometheus label vocabulary so a Prometheus operator can +// rely on the bucket names in alert rules. + +import ( + "context" + "database/sql" + "errors" + "strings" + "testing" + "time" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus/testutil" + + commonplans "instant.dev/common/plans" + "instant.dev/worker/internal/metrics" +) + +// ─── Test 1: F1 regression — unexpected_skip does NOT mark applied ──────────── + +func TestPropagation_UnexpectedSkip_DoesNotMarkApplied(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + propID := uuid.New() + teamID := uuid.New() + resID := uuid.New() + + // Snapshot the unexpected_skip counter before the run — Prometheus + // counters are process-global so other tests in this file (or in the + // suite, run with -p 1) may have moved the value. testutil.ToFloat64 + // reads the WithLabelValues child, which is unique per labelset. + counterChild := metrics.PropagationUnexpectedSkipTotal.WithLabelValues( + propagationKindTierElevation, "postgres", "postgres_admin_secret_missing", + ) + startCount := testutil.ToFloat64(counterChild) + + mock.ExpectBegin() + mock.ExpectQuery(`SELECT id, kind, team_id, target_tier, payload, attempts\s+FROM pending_propagations`). + WillReturnRows(sqlmock.NewRows(propagationSweepCols). + AddRow(propID, propagationKindTierElevation, teamID, "pro", []byte(`{}`), 0)) + mock.ExpectCommit() + + mock.ExpectQuery(`SELECT r\.id, r\.token, r\.provider_resource_id, r\.tier, r\.resource_type`). + WithArgs(teamID). + WillReturnRows(sqlmock.NewRows(teamResourcesCols). + AddRow(resID, "tok-1", sql.NullString{}, "pro", "postgres")) + + // THE BUG: pre-fix, the runner would next call markApplied (a + // `UPDATE pending_propagations SET applied_at = now()`) and INSERT a + // propagation.applied audit row. The fix re-routes through markRetry. + // We intentionally do NOT script those expectations — if the bug ever + // regressed, sqlmock would fail with "unexpected query". + // + // markRetry UPDATE — attempts + 1, next_attempt_at advanced. + mock.ExpectExec(`UPDATE pending_propagations\s+SET attempts\s*=\s*attempts \+ 1`). + WithArgs( + sqlmock.AnyArg(), // last_error truncated string + sqlmock.AnyArg(), // nextAttempt time.Time + propID, + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + // audit_log INSERT — must be propagation.unexpected_skip. + mock.ExpectExec(`INSERT INTO audit_log`). + WithArgs( + teamID, + propagationActor, + auditKindPropagationUnexpectedSkip, // <-- the F1 contract + sqlmock.AnyArg(), + sqlmock.AnyArg(), + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // Stub the provisioner to return the canonical F1 case: + // Applied=false, SkipReason="postgres-admin secret not found". + stub := &stubPropagationRegrader{outcome: regradeOutcome{ + Applied: false, + SkipReason: "resource not reachable: postgres-admin secret not found", + }} + w := NewPropagationRunnerWorker(db, commonplans.Default(), stub) + w.now = fixedNow() + + if wErr := w.Work(context.Background(), fakePropagationJob()); wErr != nil { + t.Fatalf("Work: %v", wErr) + } + + if stub.calls != 1 { + t.Errorf("RegradeResource calls = %d, want 1", stub.calls) + } + + // Counter assertion — exact bucket name expected from bucketSkipReason + // against the "postgres-admin secret not found" surface. If a future PR + // changes the bucket name, the operator's NR alert rule must change too — + // pin it here so the change isn't silent. + endCount := testutil.ToFloat64(counterChild) + if endCount-startCount != 1.0 { + t.Errorf("PropagationUnexpectedSkipTotal{kind=tier_elevation,resource_type=postgres,skip_reason=postgres_admin_secret_missing} delta = %.1f, want 1.0", + endCount-startCount) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// ─── Test 2: dead-letter at maxAttempts for unexpected_skip ─────────────────── + +func TestPropagation_UnexpectedSkip_DeadLettersAtMaxAttempts(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherRegexp)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + propID := uuid.New() + teamID := uuid.New() + resID := uuid.New() + + // attempts already at maxAttempts-1; one more unexpected_skip dead-letters. + mock.ExpectBegin() + mock.ExpectQuery(`SELECT id, kind, team_id, target_tier, payload, attempts\s+FROM pending_propagations`). + WillReturnRows(sqlmock.NewRows(propagationSweepCols). + AddRow(propID, propagationKindTierElevation, teamID, "pro", []byte(`{}`), propagationMaxAttempts-1)) + mock.ExpectCommit() + + mock.ExpectQuery(`SELECT r\.id, r\.token, r\.provider_resource_id, r\.tier, r\.resource_type`). + WithArgs(teamID). + WillReturnRows(sqlmock.NewRows(teamResourcesCols). + AddRow(resID, "tok-1", sql.NullString{}, "pro", "postgres")) + + // markDeadLettered UPDATE: stamps failed_at + last_error. + mock.ExpectExec(`UPDATE pending_propagations\s+SET attempts\s*=\s*attempts \+ 1,\s+last_attempt_at\s*=\s*now\(\),\s+last_error\s*=\s*\$1,\s+failed_at\s*=\s*now\(\)`). + WithArgs(sqlmock.AnyArg(), propID). + WillReturnResult(sqlmock.NewResult(0, 1)) + // age_seconds lookup query. + mock.ExpectQuery(`SELECT created_at FROM pending_propagations WHERE id`). + WithArgs(propID). + WillReturnRows(sqlmock.NewRows([]string{"created_at"}).AddRow(time.Now().Add(-25 * time.Hour))) + // audit_log INSERT — propagation.dead_lettered. The handler-side + // distinction (unexpected_skip vs gRPC-error vs DB-fail) collapses + // at the dead-letter point: a row that exhausted maxAttempts gets + // the canonical dead-letter audit kind regardless of cause. The + // `last_error` column carries the unexpected_skip detail for the + // operator to debug from. + mock.ExpectExec(`INSERT INTO audit_log`). + WithArgs( + teamID, + propagationActor, + auditKindPropagationDeadLettered, + sqlmock.AnyArg(), + sqlmock.AnyArg(), + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + + stub := &stubPropagationRegrader{outcome: regradeOutcome{ + Applied: false, + SkipReason: "namespace not found", + }} + w := NewPropagationRunnerWorker(db, commonplans.Default(), stub) + w.now = fixedNow() + + if wErr := w.Work(context.Background(), fakePropagationJob()); wErr != nil { + t.Fatalf("Work: %v", wErr) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// ─── Test 3: allowed-skip registry coverage (CLAUDE.md rule 18) ─────────────── + +func TestIsPropagationAllowedSkip_Coverage(t *testing.T) { + // The registry-iterating check: every documented allowed-skip substring + // is non-empty and is matched by isPropagationAllowedSkip. A future PR + // that adds an entry to propagationAllowedSkipSubstrings but forgets to + // wire it through isPropagationAllowedSkip fails this test. + if len(propagationAllowedSkipSubstrings) == 0 { + t.Fatal("propagationAllowedSkipSubstrings is empty — every SkipReason will be treated as unexpected_skip, the F1 fix has been over-corrected into a fail-everything posture") + } + for _, sub := range propagationAllowedSkipSubstrings { + if sub == "" { + t.Error("propagationAllowedSkipSubstrings contains an empty string — would match every SkipReason (including '')") + continue + } + // Match a sample reason that contains the substring. + sample := "prefix " + sub + " suffix" + if !isPropagationAllowedSkip(sample) { + t.Errorf("isPropagationAllowedSkip(%q) = false for allowed substring %q — substring lookup is broken", sample, sub) + } + } + + // And the negative cases — every SkipReason from the chaos drill / known + // failure-mode catalog MUST be treated as unexpected_skip. If any of + // these turn into allowed-skips a paying customer's regrade will + // silently fail. + unexpectedSamples := []string{ + "postgres-admin secret not found", + "resource not reachable: postgres-admin secret not found", + "namespace not found", + "redis-auth secret missing", + "pod not found", + "no Running pod", + "exec fallback: CONFIG SET maxmemory failed", + "legacy resource without auth secret", + } + for _, sample := range unexpectedSamples { + if isPropagationAllowedSkip(sample) { + t.Errorf("isPropagationAllowedSkip(%q) = true — this MUST be treated as an unexpected_skip (the whole point of F1)", sample) + } + } +} + +// ─── Test 4: errors.Is contract for the sentinel ────────────────────────────── + +func TestPropagationUnexpectedSkipErr_IsMatches(t *testing.T) { + err := &propagationUnexpectedSkipErr{ + Resources: []propagationUnexpectedSkipDetail{ + {ResourceID: "r1", ResourceType: "postgres", SkipReason: "postgres-admin secret not found"}, + }, + } + if !errors.Is(err, errPropagationUnexpectedSkipSentinel) { + t.Error("errors.Is(*propagationUnexpectedSkipErr, errPropagationUnexpectedSkipSentinel) = false — the markRetry audit-kind switch will never fire, F1 audit signal regression") + } + + // Wrapped via fmt.Errorf("…: %w", …) must still match — the dispatch + // loop wraps unexpected_skip in handler-side context. + wrapped := errors.New("not the sentinel") + if errors.Is(wrapped, errPropagationUnexpectedSkipSentinel) { + t.Error("errors.Is(unrelated_err, errPropagationUnexpectedSkipSentinel) = true — sentinel matches too broadly, would mis-classify routine retries as unexpected_skip") + } + + // Nil receiver must not panic; the runner never constructs a nil + // pointer here, but defensive. + var nilErr *propagationUnexpectedSkipErr + if errors.Is(nilErr, errPropagationUnexpectedSkipSentinel) { + // Nil err of this type Still matches the sentinel via the Is method — + // acceptable; the runner would never construct it. + t.Log("nilErr matched (Is method returns true for nil receiver) — acceptable") + } + + // Error string formatting includes every offending resource so the + // operator can grep the audit_log.last_error column. + got := err.Error() + for _, want := range []string{"r1", "postgres", "postgres-admin secret"} { + if !strings.Contains(got, want) { + t.Errorf("propagationUnexpectedSkipErr.Error() = %q, missing substring %q (operator can't debug from audit_log.last_error)", got, want) + } + } +} + +// ─── Test 5: bucketSkipReason cardinality contract ──────────────────────────── + +func TestBucketSkipReason_BoundsCardinality(t *testing.T) { + cases := []struct { + raw string + bucket string + }{ + // Postgres admin secret missing — the canonical F1 trigger. + {"resource not reachable: postgres-admin secret not found", "postgres_admin_secret_missing"}, + {"postgres-admin Secret not found", "postgres_admin_secret_missing"}, + {"missing postgres_admin secret", "postgres_admin_secret_missing"}, + + // Redis-auth secret missing. + {"redis-auth secret not found", "redis_auth_secret_missing"}, + + // Namespace teardown race. + {"namespace not found: instant-customer-xyz", "namespace_not_found"}, + + // Pod-not-found from exec-fallback paths. + {"exec fallback: no pod found", "pod_not_found"}, + {"pod not found in namespace", "pod_not_found"}, + + // Resource unreachable (gRPC dial / DNS). + {"resource not reachable", "resource_not_reachable"}, + {"backend unreachable", "resource_not_reachable"}, + + // Legacy resource without auth secret. + {"legacy resource without auth secret", "legacy_resource"}, + {"legacy pod missing required config", "legacy_resource"}, + + // Unknown — the catch-all bucket. + {"completely novel skip reason from a future provisioner", "other"}, + {"", "other"}, + } + for _, c := range cases { + if got := bucketSkipReason(c.raw); got != c.bucket { + t.Errorf("bucketSkipReason(%q) = %q, want %q", c.raw, got, c.bucket) + } + } + + // And the bounded-cardinality invariant: a thousand random unique + // SkipReasons should fall into a small fixed set of buckets. (We + // declare the upper bound at 8 buckets — the 7 named + "other".) + seen := map[string]struct{}{} + for i := 0; i < 1000; i++ { + // A faux randomness — different prefixes — none of which should + // hit any named bucket. + raw := "novel skip variant #" + uuid.New().String() + seen[bucketSkipReason(raw)] = struct{}{} + } + if len(seen) != 1 || func() bool { _, ok := seen["other"]; return !ok }() { + t.Errorf("bucketSkipReason swallowed %d distinct buckets for novel inputs (want exactly 1, 'other') — cardinality not bounded, Prometheus label explosion risk: %v", len(seen), seen) + } +} diff --git a/internal/jobs/workers.go b/internal/jobs/workers.go index 7a41598..be44ca1 100644 --- a/internal/jobs/workers.go +++ b/internal/jobs/workers.go @@ -48,6 +48,47 @@ const queueBilling = "billing" // workers can shorten this via `Timeout()` on the worker; none currently do. const globalJobTimeout = 20 * time.Minute +// rescueStuckJobsAfter overrides River's default JobRescuer threshold. +// +// CHAOS F4 (CHAOS-DRILL-2026-05-20): +// River's default `RescueStuckJobsAfter` is `JobTimeout + JobRescuerRescueAfterDefault` +// (= 20m + 1h = 1h20m). That sets an 80-minute RTO ceiling on any +// catastrophic worker death (OOMKill / pod eviction / hard segfault) where +// River's client never gets to mark the job back to `available` itself — +// the only path back to the queue is the JobRescuer's sweep, which by +// default won't touch a `running` row until it's been "running" (per +// the row's `attempted_at` timestamp) for the full 80 minutes. +// +// 25 minutes is the explicit RTO floor we chose: +// +// - globalJobTimeout = 20 minutes already bounds any LEGITIMATELY long +// job (the ~17-minute billing reconciler is the longest). A worker +// that survived would have already returned from Work() by then; the +// rescuer only needs to handle the case where the worker died WITHOUT +// returning. +// +// - 5 minutes of headroom past JobTimeout absorbs queue jitter, kube +// liveness-probe restarts, etc., without falsely rescuing a job that +// would have returned within milliseconds of timing out. +// +// - 25m matches the propagation_runner's worst-case backoff step at +// attempts ~3 (15 min) plus its dispatch budget, so a rescued +// propagation row joins the natural retry rhythm rather than +// thrashing. +// +// Trade-off: a job that legitimately runs slightly longer than +// JobTimeout (impossible today — all workers respect ctx) would be +// duplicate-rescued. Acceptable: every job in this worker is idempotent +// (provisioner.RegradeResource, customer-backup s3 put with checksum, +// brevo send keyed by audit_log row id, etc.) so a re-execution is a +// no-op rather than a double-effect. +// +// Pinning this in code means a future River bump (whose default might +// shift) can't quietly regress our worst-case RTO. The companion test +// `TestWorker_RiverConfig_RescueStuckJobsAfterIs25Min` asserts the +// constructed config carries exactly this value. +const rescueStuckJobsAfter = 25 * time.Minute + // periodicUniqueOpts is the UniqueOpts EVERY periodic job must carry // (P1-W3-07 / P1-W4-06 / P2-W5-05). The worker runs at replicas:2 — without // a uniqueness guard, both pods' River clients independently enqueue the @@ -774,6 +815,14 @@ func StartWorkers(ctx context.Context, db *sql.DB, rdb *redis.Client, cfg *confi // on a runaway. Individual jobs that need a longer or shorter ceiling // can override via `Timeout()` on the worker (none currently do). JobTimeout: globalJobTimeout, + // CHAOS F4 (CHAOS-DRILL-2026-05-20): cap the worst-case RTO of a + // catastrophic worker death (OOMKill / pod eviction / segfault). + // Without this, River defaults to JobTimeout + JobRescuerRescueAfterDefault + // (= 20m + 1h = 1h20m), giving an 80-minute RTO on dropped jobs. + // 25m = JobTimeout + 5m of jitter headroom. See the comment on + // rescueStuckJobsAfter for the full rationale. Pinned by + // TestWorker_RiverConfig_RescueStuckJobsAfterIs25Min. + RescueStuckJobsAfter: rescueStuckJobsAfter, }) if err != nil { slog.Error("jobs.workers.client_init_failed", "error", err) @@ -790,9 +839,17 @@ func StartWorkers(ctx context.Context, db *sql.DB, rdb *redis.Client, cfg *confi return &Workers{started: false} } + // CHAOS F4 (CHAOS-DRILL-2026-05-20): stamp the rescue-stuck-jobs threshold + // in the startup line so an operator's live `kubectl logs` grep for + // `rescue_stuck_jobs_after` after a roll confirms the pinned RTO is in + // effect (rather than reverting to the River default of 1h20m). Pair + // with TestWorker_RiverConfig_RescueStuckJobsAfterIs25Min — the test + // is the build-time gate; this log line is the runtime confirmation. slog.Info("jobs.workers.started", "queues", fmt.Sprintf("%v", []string{river.QueueDefault, queueReconcile, queueBilling}), "max_workers", 5, + "job_timeout", globalJobTimeout.String(), + "rescue_stuck_jobs_after", rescueStuckJobsAfter.String(), ) return &Workers{ diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 9148550..cfaa0b0 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -408,6 +408,96 @@ var ( Help: "Event-email forwarder hits on an audit_log row whose kind has a builder but no Go renderer. A non-zero rate means a kind is being silently dropped — fix the eventEmailBodyRenderers map.", }, []string{"kind"}) + // ── propagation_runner — unexpected_skip counter (CHAOS-DRILL-2026-05-20 F1) ─ + // + // Every time the propagation_runner's per-resource RegradeResource call + // returns (Applied=false, SkipReason=), + // this counter increments. Pre-fix the runner WARN-and-applied that case, + // silently stamping the propagation row as applied without the entitlement + // landing — a paying customer ended up with "Pro on paper, hobby-grade infra" + // and no alert (CHAOS-DRILL-2026-05-20 finding #1). Now the case is treated + // as a retryable error: the row retries via the backoff schedule and + // dead-letters at propagationMaxAttempts. This counter is the leading + // indicator; the dead-letter audit row is the alert-able lagging signal. + // + // Labels: + // + // kind — pending_propagations.kind ("tier_elevation", etc.). + // Bounded by propagationKnownKinds (~1-3 entries). + // + // resource_type — "postgres" | "redis" | "mongodb" (the offending + // resource class). Bounded by ResourceType enum. + // + // skip_reason — a SHORT canonical bucket derived from the raw + // skip_reason string ("postgres_admin_secret_missing", + // "namespace_not_found", "other"). The runner does + // the bucketing (jobs.bucketSkipReason) so cardinality + // stays bounded — never pass the raw SkipReason here. + // + // NR alert (suggested): + // sum(rate(instant_propagation_unexpected_skip_total[15m])) > 0 + // for 30+ minutes → P2 page. A single isolated event is the + // mid-deprovisioning-race signal; a sustained rate is a real + // downstream regression and an operator must investigate before + // the row dead-letters ~24h later. + PropagationUnexpectedSkipTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "instant_propagation_unexpected_skip_total", + Help: "propagation_runner per-resource RegradeResource returned (Applied=false, SkipReason=). Leading indicator for the dead-letter alert.", + }, []string{"kind", "resource_type", "skip_reason"}) + + // ── propagation_runner — dead-letter + unknown-kind counters (CHAOS F2/F3) ─ + // + // PropagationDeadLetteredTotal increments every time the propagation_runner + // transitions a row to failed_at + emits a propagation.*dead_lettered audit + // row. Two triggers feed this single metric, distinguished by `reason`: + // + // reason="max_attempts" — the modal path. Per-resource RegradeResource + // failures, F1's unexpected_skip-as-failure, and + // markApplied DB failures all converge here once + // they reach propagationMaxAttempts. + // reason="unknown_kind" — CHAOS F2: a worker pod that doesn't recognise + // a `kind` enqueued by a newer api image. Without + // the F2 fix these escape the maxAttempts ceiling. + // + // `kind` carries the row's pending_propagations.kind value for the + // max_attempts path (bounded by propagationKnownKinds — ~1-3 entries). + // The unknown_kind path passes kind="unknown_kind" as a bounded bucket, + // so an attacker-controlled api-side enqueue can't blow up worker + // label cardinality. + // + // NR alert (suggested): + // rate(instant_propagation_dead_lettered_total[5m]) > 0 for 5m → P1 page. + // propagation_runner is the last line of defence between Razorpay webhook + // delivery and customer infra; any dead-letter means a paying customer's + // regrade fell through (or, on the unknown_kind path, that a worker pod + // is running an old image vs the api). + PropagationDeadLetteredTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "instant_propagation_dead_lettered_total", + Help: "propagation_runner rows transitioned to failed_at. Labelled by reason (max_attempts|unknown_kind) and kind (pending_propagations.kind, or 'unknown_kind' for F2's bounded bucket).", + }, []string{"reason", "kind"}) + + // PropagationUnknownKindTotal counts every TICK that picked up at least + // one row whose kind had no handler in propagationHandlers. Distinct + // from PropagationDeadLetteredTotal{reason="unknown_kind"} — that fires + // once at the END of the row's life (after maxAttempts), this fires on + // EVERY tick while the row is retrying. Lets the operator see "the + // worker is older than the api" within seconds rather than waiting the + // ~24h backoff for the dead-letter to land. + // + // `kind` is the raw pending_propagations.kind value. Bounded by the + // api-side enqueue surface (NOT by attacker input — only the api can + // INSERT into pending_propagations); the cardinality risk is accepted + // because in the rollback-drift scenario the operator wants to know + // EXACTLY which new kind their old worker is rejecting. + // + // NR alert (suggested): + // sum(rate(instant_propagation_unknown_kind_total[5m])) by (kind) > 0 + // for 5m → P2 page. Action: finish the rollout. + PropagationUnknownKindTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "instant_propagation_unknown_kind_total", + Help: "propagation_runner ticks that saw at least one row whose kind has no handler (image-skew indicator). Labelled by kind.", + }, []string{"kind"}) + // readyzCheckStatusGauge — per-component readiness status surfaced by // /readyz on this service's HTTP sidecar (:8091). See the matching // gauge in the api repo at api/internal/metrics/metrics.go for the