|
| 1 | +package jobs |
| 2 | + |
| 3 | +// chaos_lease_recovery.go — STUB JOB for the chaos drill of 2026-05-20. |
| 4 | +// |
| 5 | +// See CHAOS-DRILL-2026-05-20.md (repo root) for the full procedure. |
| 6 | +// |
| 7 | +// ─── WHAT THIS JOB EXISTS FOR ───────────────────────────────────────────────── |
| 8 | +// |
| 9 | +// CLAUDE.md rule 12 ("Shipped ≠ Verified"). When task #172 added |
| 10 | +// `JobTimeout: globalJobTimeout` to the River client config (workers.go), the |
| 11 | +// CHANGE was shipped but the FAILURE CASE — what happens when a pod dies |
| 12 | +// mid-job-execution — was never exercised. River's lease-takeover (rescuer) |
| 13 | +// path is what guarantees an orphaned job re-leases to a sibling worker |
| 14 | +// instead of being lost. The rescuer was deliberately set to its default in |
| 15 | +// our config: |
| 16 | +// |
| 17 | +// JobRescuerRescueAfterDefault = time.Hour (from river internals) |
| 18 | +// JobTimeout = 20 * time.Minute (workers.go const) |
| 19 | +// |
| 20 | +// The effective rescue window is therefore |
| 21 | +// max_takeover_RTO = JobTimeout + JobRescuerRescueAfterDefault ≈ 1h20m |
| 22 | +// |
| 23 | +// — a job orphaned by an OOMKill mid-execution can stay un-leased for up to |
| 24 | +// 80 minutes before another worker picks it up. That is a FINDING from the |
| 25 | +// drill, not something we knew before the drill ran. |
| 26 | +// |
| 27 | +// ─── HOW THIS JOB IS USED IN THE DRILL ──────────────────────────────────────── |
| 28 | +// |
| 29 | +// The drill in api/e2e/propagation_chaos_test.go enqueues ONE |
| 30 | +// ChaosLeaseRecoveryArgs row, then: |
| 31 | +// |
| 32 | +// 1. Waits for the START audit_log marker (chaos.lease_recovery.start) |
| 33 | +// to be persisted by some worker pod. The marker carries pod_id = |
| 34 | +// $HOSTNAME so the test knows which pod owns the in-flight job. |
| 35 | +// 2. `kubectl delete pod -n instant-infra <that-pod-id> --grace-period=0 |
| 36 | +// --force` — simulates OOMKill (no graceful drain, no defer |
| 37 | +// completion). |
| 38 | +// 3. The job's sleep wakes up in the (now-doomed) original pod or is |
| 39 | +// already terminated mid-sleep. River's rescuer (default 1h interval, |
| 40 | +// 1h RescueAfter) eventually re-leases the row to a different pod. |
| 41 | +// 4. The other pod runs the job from scratch (River retries from |
| 42 | +// `attempted_at` reset — semantics matches "at-least-once |
| 43 | +// delivery"). It writes its OWN start marker, sleeps, writes an END |
| 44 | +// marker chaos.lease_recovery.end. |
| 45 | +// 5. The test asserts: |
| 46 | +// - chaos.lease_recovery.start rows exist with TWO distinct |
| 47 | +// pod_id values (the killed pod + the rescuer pod), or one |
| 48 | +// start+one end if the original pod is killed AFTER the end |
| 49 | +// marker (the test handles both orderings). |
| 50 | +// - chaos.lease_recovery.end exists exactly once (the job |
| 51 | +// eventually completed successfully). |
| 52 | +// - Wall-clock from FIRST start marker to END marker is the |
| 53 | +// observed lease-recovery RTO. This is the real number the |
| 54 | +// drill produces. |
| 55 | +// |
| 56 | +// ─── PARAMETERS ─────────────────────────────────────────────────────────────── |
| 57 | +// |
| 58 | +// The job's payload carries: |
| 59 | +// |
| 60 | +// - SleepSeconds — how long the worker holds the slot. 30s is enough for |
| 61 | +// the operator (or the drill test) to `kubectl get pods -l app= |
| 62 | +// instant-worker -w` and pick the running pod to kill, while short |
| 63 | +// enough not to occupy a worker slot for 5 minutes if the kill never |
| 64 | +// happens. 0 = a normal completion (no kill) — useful for testing the |
| 65 | +// happy path of the stub. |
| 66 | +// |
| 67 | +// - RunID — a stable string the test generates per-drill so it can find |
| 68 | +// ITS audit rows among multiple concurrent drill runs. |
| 69 | +// |
| 70 | +// ─── IDEMPOTENCY ────────────────────────────────────────────────────────────── |
| 71 | +// |
| 72 | +// The audit_log writes are appended unconditionally. A re-run from the |
| 73 | +// rescuer's takeover yields a SECOND start marker (with a different |
| 74 | +// pod_id) — that is the SIGNAL the drill keys on. We do NOT dedupe in |
| 75 | +// this stub; the whole point is to expose the at-least-once execution |
| 76 | +// semantics River guarantees. |
| 77 | +// |
| 78 | +// ─── SAFETY ────────────────────────────────────────────────────────────────── |
| 79 | +// |
| 80 | +// The job does NO real work. It sleeps + writes audit_log markers under |
| 81 | +// the synthetic team_id supplied in the payload (the drill seeds a |
| 82 | +// synthetic team for this purpose). No customer resource is touched. |
| 83 | +// The job kind is documented as "chaos-drill-only" and is never enqueued |
| 84 | +// outside of the drill — there is no periodic schedule, no api enqueue |
| 85 | +// path. The worker only RUNS it if a row already exists in river_job |
| 86 | +// with kind=chaos_lease_recovery, which only happens when the drill |
| 87 | +// inserts one. |
| 88 | + |
| 89 | +import ( |
| 90 | + "context" |
| 91 | + "database/sql" |
| 92 | + "encoding/json" |
| 93 | + "fmt" |
| 94 | + "log/slog" |
| 95 | + "os" |
| 96 | + "time" |
| 97 | + |
| 98 | + "github.com/google/uuid" |
| 99 | + "github.com/riverqueue/river" |
| 100 | +) |
| 101 | + |
| 102 | +// ─── named constants ───────────────────────────────────────────────────────── |
| 103 | + |
| 104 | +const ( |
| 105 | + // chaosLeaseRecoveryKind is the River worker kind. Mirrored as a |
| 106 | + // constant in api/e2e/lease_recovery_chaos_test.go so the test can |
| 107 | + // query river_job for status. |
| 108 | + chaosLeaseRecoveryKind = "chaos_lease_recovery" |
| 109 | + |
| 110 | + // AuditKindChaosLeaseRecoveryStart / End are the markers the drill |
| 111 | + // test polls audit_log for. The summaries are deliberately distinctive |
| 112 | + // so a support query can find them long after a drill. |
| 113 | + AuditKindChaosLeaseRecoveryStart = "chaos.lease_recovery.start" |
| 114 | + AuditKindChaosLeaseRecoveryEnd = "chaos.lease_recovery.end" |
| 115 | + |
| 116 | + // chaosLeaseRecoveryActor is the audit_log.actor value. Mirrors the |
| 117 | + // propagation_runner / billing_reconciler conventions (one actor per |
| 118 | + // subsystem). |
| 119 | + chaosLeaseRecoveryActor = "chaos_lease_recovery" |
| 120 | + |
| 121 | + // chaosLeaseRecoveryMaxSleep clamps the SleepSeconds payload value to |
| 122 | + // guard against an oversized sleep accidentally seeded by a typo — |
| 123 | + // the job is otherwise indistinguishable from a real long-running |
| 124 | + // task to River's slot scheduler. |
| 125 | + chaosLeaseRecoveryMaxSleep = 5 * time.Minute |
| 126 | +) |
| 127 | + |
| 128 | +// ─── job definition ─────────────────────────────────────────────────────────── |
| 129 | + |
| 130 | +// ChaosLeaseRecoveryArgs is the River job payload for the lease-recovery |
| 131 | +// drill. Field names are JSON-tagged because River serialises args through |
| 132 | +// encoding/json. |
| 133 | +type ChaosLeaseRecoveryArgs struct { |
| 134 | + // SleepSeconds is how long the worker holds the slot before |
| 135 | + // completing. Clamped at chaosLeaseRecoveryMaxSleep. |
| 136 | + SleepSeconds int `json:"sleep_seconds"` |
| 137 | + // TeamID is the synthetic team the drill created; the worker writes |
| 138 | + // audit_log rows against this team_id. |
| 139 | + TeamID string `json:"team_id"` |
| 140 | + // RunID is a stable identifier the drill uses to find ITS audit rows |
| 141 | + // among multiple concurrent drill runs. |
| 142 | + RunID string `json:"run_id"` |
| 143 | +} |
| 144 | + |
| 145 | +// Kind is the River worker key. |
| 146 | +func (ChaosLeaseRecoveryArgs) Kind() string { return chaosLeaseRecoveryKind } |
| 147 | + |
| 148 | +// InsertOpts overrides the default queue + uniqueness rules for this kind. |
| 149 | +// We deliberately do NOT set river.UniqueOpts so a drill that takes over |
| 150 | +// after a kill genuinely sees a separate enqueue → River picks the row up |
| 151 | +// on a fresh attempt rather than treating it as a duplicate. |
| 152 | +func (ChaosLeaseRecoveryArgs) InsertOpts() river.InsertOpts { |
| 153 | + return river.InsertOpts{ |
| 154 | + // Drill jobs run on the default queue alongside the bulk-email + |
| 155 | + // heavyweight periodics. They occupy ONE slot (MaxWorkers=5) for |
| 156 | + // SleepSeconds — a 30s sleep is cheap. |
| 157 | + Queue: river.QueueDefault, |
| 158 | + Priority: 4, // lowest priority — never starves real work |
| 159 | + } |
| 160 | +} |
| 161 | + |
| 162 | +// ChaosLeaseRecoveryWorker is the in-process executor. |
| 163 | +type ChaosLeaseRecoveryWorker struct { |
| 164 | + river.WorkerDefaults[ChaosLeaseRecoveryArgs] |
| 165 | + db *sql.DB |
| 166 | +} |
| 167 | + |
| 168 | +// NewChaosLeaseRecoveryWorker constructs the worker. |
| 169 | +func NewChaosLeaseRecoveryWorker(db *sql.DB) *ChaosLeaseRecoveryWorker { |
| 170 | + return &ChaosLeaseRecoveryWorker{db: db} |
| 171 | +} |
| 172 | + |
| 173 | +// Work executes the drill job. Writes a START marker, sleeps, writes an END |
| 174 | +// marker. Both markers carry the pod's $HOSTNAME so the drill test can tell |
| 175 | +// which pod ran each side of the kill. |
| 176 | +func (w *ChaosLeaseRecoveryWorker) Work(ctx context.Context, job *river.Job[ChaosLeaseRecoveryArgs]) error { |
| 177 | + args := job.Args |
| 178 | + sleep := time.Duration(args.SleepSeconds) * time.Second |
| 179 | + if sleep < 0 { |
| 180 | + sleep = 0 |
| 181 | + } |
| 182 | + if sleep > chaosLeaseRecoveryMaxSleep { |
| 183 | + sleep = chaosLeaseRecoveryMaxSleep |
| 184 | + } |
| 185 | + |
| 186 | + pod := podHostname() |
| 187 | + |
| 188 | + // Parse the team_id — the audit_log FK requires a real uuid. |
| 189 | + teamID, err := uuid.Parse(args.TeamID) |
| 190 | + if err != nil { |
| 191 | + return fmt.Errorf("ChaosLeaseRecoveryWorker.Work: parse team_id %q: %w", args.TeamID, err) |
| 192 | + } |
| 193 | + |
| 194 | + startedAt := time.Now() |
| 195 | + if mErr := w.markChaos(ctx, teamID, args, pod, AuditKindChaosLeaseRecoveryStart, "drill start", startedAt, 0); mErr != nil { |
| 196 | + return fmt.Errorf("write start marker: %w", mErr) |
| 197 | + } |
| 198 | + slog.Info("jobs.chaos_lease_recovery.start", |
| 199 | + "run_id", args.RunID, |
| 200 | + "team_id", args.TeamID, |
| 201 | + "pod", pod, |
| 202 | + "sleep", sleep.String(), |
| 203 | + "river_attempt", job.Attempt, |
| 204 | + ) |
| 205 | + |
| 206 | + // The sleep is the part the kill is intended to interrupt. If ctx |
| 207 | + // expires (River cancels via JobTimeout or the pod terminates), exit |
| 208 | + // with an error so River retries via the rescuer. |
| 209 | + select { |
| 210 | + case <-time.After(sleep): |
| 211 | + // Normal completion — the kill never happened, or the job was |
| 212 | + // re-leased AFTER the original sleep would have completed. |
| 213 | + case <-ctx.Done(): |
| 214 | + // Pod is being torn down or River cancelled the job. Return ctx.Err |
| 215 | + // so River reschedules. |
| 216 | + slog.Warn("jobs.chaos_lease_recovery.interrupted", |
| 217 | + "run_id", args.RunID, |
| 218 | + "team_id", args.TeamID, |
| 219 | + "pod", pod, |
| 220 | + "river_attempt", job.Attempt, |
| 221 | + "reason", ctx.Err(), |
| 222 | + ) |
| 223 | + return fmt.Errorf("ctx done before completion: %w", ctx.Err()) |
| 224 | + } |
| 225 | + |
| 226 | + endedAt := time.Now() |
| 227 | + duration := endedAt.Sub(startedAt) |
| 228 | + if mErr := w.markChaos(ctx, teamID, args, pod, AuditKindChaosLeaseRecoveryEnd, "drill end", endedAt, duration); mErr != nil { |
| 229 | + return fmt.Errorf("write end marker: %w", mErr) |
| 230 | + } |
| 231 | + slog.Info("jobs.chaos_lease_recovery.end", |
| 232 | + "run_id", args.RunID, |
| 233 | + "team_id", args.TeamID, |
| 234 | + "pod", pod, |
| 235 | + "duration_ms", duration.Milliseconds(), |
| 236 | + "river_attempt", job.Attempt, |
| 237 | + ) |
| 238 | + return nil |
| 239 | +} |
| 240 | + |
| 241 | +// markChaos writes one audit_log row carrying the drill markers + pod id. |
| 242 | +// The metadata JSONB carries everything the drill test needs to assert the |
| 243 | +// recovery shape. |
| 244 | +func (w *ChaosLeaseRecoveryWorker) markChaos(ctx context.Context, teamID uuid.UUID, args ChaosLeaseRecoveryArgs, pod, kind, summary string, ts time.Time, duration time.Duration) error { |
| 245 | + meta, _ := json.Marshal(map[string]any{ |
| 246 | + "run_id": args.RunID, |
| 247 | + "pod": pod, |
| 248 | + "sleep_seconds": args.SleepSeconds, |
| 249 | + "duration_ms": duration.Milliseconds(), |
| 250 | + "ts": ts.UTC().Format(time.RFC3339Nano), |
| 251 | + }) |
| 252 | + _, err := w.db.ExecContext(ctx, ` |
| 253 | + INSERT INTO audit_log (team_id, actor, kind, summary, metadata) |
| 254 | + VALUES ($1::uuid, $2, $3, $4, $5::jsonb) |
| 255 | + `, teamID, chaosLeaseRecoveryActor, kind, summary, meta) |
| 256 | + return err |
| 257 | +} |
| 258 | + |
| 259 | +// podHostname returns $HOSTNAME (k8s injects the pod name into HOSTNAME for |
| 260 | +// every container) or "unknown" when unset. Used as the pod_id marker in |
| 261 | +// audit rows so the drill test can distinguish the killed pod from the |
| 262 | +// rescuer pod. |
| 263 | +func podHostname() string { |
| 264 | + if v := os.Getenv("HOSTNAME"); v != "" { |
| 265 | + return v |
| 266 | + } |
| 267 | + return "unknown" |
| 268 | +} |
0 commit comments