Skip to content

Commit 7169493

Browse files
fix(email): worker email subsystem BugBash 2026-05-19 — P0/P1/P2 fixes
P0-1 — Brevo auth error silently dropped all email. brevo_provider.go classified a 401/403 (bad/expired/revoked API key) and 429 (rate limit) as Permanent, so the forwarder advanced the cursor past every audit row in every batch — total, silent, unrecoverable email loss. Reclassify 401/403/429 as Transient (cursor held, recoverable) and emit an alert-able email.brevo.auth_wall ERROR. 400/422 payload rejects stay Permanent. Test: TestBrevoProvider_AuthFailureIsTransient. P1-2 — cursor had no time floor. A Redis wipe reset the cursor to zero and fetchBatch (no age bound) re-sent the entire audit_log history. Add a 48h age floor to fetchBatch and seed a missing cursor to now()-grace instead of the zero value. Tests: TestEventForwarder_FetchBatch_HasAgeFloor, TestEventForwarder_MissingCursor_SeedsToNow. P1-3 — no send ledger. Idempotency rested on the Brevo X-Mailin-Custom header, which is not a delivery-dedup mechanism. Add a forwarder_sent ledger table (migration 055) claimed ON CONFLICT DO NOTHING before each send; a duplicate audit_id is skipped. A Transient send releases the claim so the retry can re-send. Tests: TestEventForwarder_Ledger_SendsOnce, TestEventForwarder_TransientReleasesLedger. Log noise (P1, reported symptom) — idle-tick INFO demoted to DEBUG: event_email_forwarder.no_new_rows, middleware.work_ok, and the zero-candidate *.completed lines in checkout_reconcile / expiry_reminder / payment_grace_reminder / payment_grace_terminator / deployment_reminder. INFO now only fires on non-zero work. Test: TestEventForwarder_IdleTick_NoInfoLog. P2-1 — builder_skipped_row demoted WARN -> INFO (no owner email is an expected state for deleted/orphan/test teams). Cursor-advance unchanged. Test: TestEventForwarder_BuilderSkippedRow_NotWarn. F5 — suppression bypass. The forwarder checked suppression against row.OwnerEmail but sent to resolveRecipient(row); for anon-tier metadata.email recipients the check ran against "" and was bypassed. Resolve the recipient once and check suppression against the same address that is emailed. Test: TestEventForwarder_SuppressionUsesSentRecipient. F4 — silent no-op. A kind in eventEmailBuilders with no renderer fell through the dead Brevo-template path -> SkippedNoTemplate -> cursor advanced, zero email, zero error. A missing renderer is now a loud ERROR and holds the cursor. Test: TestEventForwarder_MissingRenderer_LoudErrorNoAdvance. F3 — deploy-reminder spam. deployment_reminder fired 6 identical emails over 12h. Reduced to a 3-stage escalating cadence (maxDeployReminders=3) with a reminder_index-keyed subject prefix (Heads up / Reminder / Final reminder), matching anon.expiry_warning. Test: TestDeployExpiringSoon_EscalatingCadence. Migration 055_forwarder_sent.sql added to worker/sql (canonical) and mirrored into api/internal/db/migrations + api testhelpers schema mirror (separate commit in the api repo). go build ./... && go vet ./... && go test ./... -count=1 — all green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c3bf998 commit 7169493

14 files changed

Lines changed: 910 additions & 70 deletions

internal/email/brevo_provider.go

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,20 @@ package email
2020
// forwarder doesn't need to know the wire format:
2121
//
2222
// 2xx → nil (success — forwarder advances cursor)
23-
// 4xx (401 / 422 / 400 etc.) → *SendError{Class: SendClassPermanent} (forwarder advances + logs ERROR)
23+
// 401 / 403 (auth) / 429 (rate) → *SendError{Class: SendClassTransient} (forwarder HOLDS cursor — recoverable)
24+
// 4xx (400 / 422 payload reject) → *SendError{Class: SendClassPermanent} (forwarder advances + logs ERROR)
2425
// 5xx / network / timeout → *SendError{Class: SendClassTransient} (forwarder holds cursor)
2526
// no template configured for kind → *SendError{Class: SendClassSkippedNoTemplate} (forwarder advances silently)
2627
//
27-
// The "4xx-advances" behaviour is deliberate: a single audit row with
28-
// malformed content shouldn't block every event behind it forever. We
29-
// log loudly so the poisoned row is visible in the structured-log stream.
28+
// The "payload-4xx-advances" behaviour is deliberate: a single audit row
29+
// with malformed content (400/422) shouldn't block every event behind it
30+
// forever. We log loudly so the poisoned row is visible in the log stream.
31+
//
32+
// P0-1 (2026-05-19): 401/403/429 are NOT advanced. A bad/expired/revoked
33+
// API key (401/403) or a rate-limit (429) is an ACCOUNT-level condition,
34+
// recoverable without operator data loss — classifying it Permanent made
35+
// the forwarder silently drop every audit row in every batch. These now
36+
// map to Transient (cursor held) and log the alert-able auth_wall ERROR.
3037

3138
import (
3239
"bytes"
@@ -376,11 +383,53 @@ func (p *BrevoProvider) doRequest(ctx context.Context, evt EventEmail, body []by
376383
)
377384
return nil
378385

386+
case resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden:
387+
// P0-1 FIX (2026-05-19): 401/403 is an ACCOUNT-LEVEL auth failure
388+
// (bad / expired / revoked BREVO_API_KEY) — NOT a per-row payload
389+
// problem. It is transient from the queue's point of view: it
390+
// resolves the instant an operator rotates the secret. Classifying
391+
// it Permanent made the forwarder advance the cursor past every
392+
// row in every batch, silently and unrecoverably dropping all
393+
// email. Transient holds the cursor so nothing is lost and the
394+
// backlog drains once the key is fixed.
395+
//
396+
// This is logged at ERROR with a distinct, alert-able key
397+
// (auth_wall) so an operator is paged before a batch is burned —
398+
// the generic email.brevo.permanent_4xx key is for per-row
399+
// payload rejects, not account auth failure.
400+
slog.Error("email.brevo.auth_wall",
401+
"kind", evt.Kind,
402+
"recipient", evt.Recipient,
403+
"status", resp.StatusCode,
404+
"path", string(path),
405+
"body", string(respBody),
406+
"note", "account-level Brevo auth failure (bad/expired/revoked BREVO_API_KEY) — classified Transient, cursor held, email NOT lost; rotate the secret",
407+
)
408+
return &SendError{
409+
Class: SendClassTransient,
410+
Message: fmt.Sprintf("brevo: auth failure %d %s", resp.StatusCode, string(respBody)),
411+
}
412+
413+
case resp.StatusCode == http.StatusTooManyRequests:
414+
// P0-1 FIX (2026-05-19): 429 rate-limited is also recoverable —
415+
// the row is fine, Brevo just wants us to back off. Transient so
416+
// the cursor holds and the row retries next tick.
417+
slog.Warn("email.brevo.rate_limited",
418+
"kind", evt.Kind,
419+
"recipient", evt.Recipient,
420+
"status", resp.StatusCode,
421+
"path", string(path),
422+
"body", string(respBody),
423+
)
424+
return &SendError{
425+
Class: SendClassTransient,
426+
Message: fmt.Sprintf("brevo: rate limited %d %s", resp.StatusCode, string(respBody)),
427+
}
428+
379429
case resp.StatusCode >= 400 && resp.StatusCode < 500:
380-
// 401/403 = bad API key (every row will fail the same way until
381-
// someone rotates the secret), 400/422 = bad payload for this row.
382-
// In both cases advancing the cursor is correct: holding it pins
383-
// the whole queue on one bad row.
430+
// 400/422 and other 4xx = genuine per-row payload rejection. The
431+
// row will never produce a valid send, so advancing the cursor is
432+
// correct: holding it pins the whole queue on one bad row.
384433
slog.Error("email.brevo.permanent_4xx",
385434
"kind", evt.Kind,
386435
"recipient", evt.Recipient,

internal/email/brevo_provider_test.go

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,12 @@ func TestBrevoProvider_2xxReturnsNil(t *testing.T) {
131131
}
132132

133133
// TestBrevoProvider_4xxReturnsPermanent verifies the "advance past poisoned
134-
// row" contract: a 401 (or any 4xx) → *SendError{Class: SendClassPermanent}.
134+
// row" contract: a genuine per-row payload reject (400/422) →
135+
// *SendError{Class: SendClassPermanent}. NOTE 401/403/429 are deliberately
136+
// EXCLUDED here — they are account-level/recoverable and now classify
137+
// Transient; see TestBrevoProvider_AuthFailureIsTransient (P0-1 regression).
135138
func TestBrevoProvider_4xxReturnsPermanent(t *testing.T) {
136-
for _, status := range []int{http.StatusBadRequest, http.StatusUnauthorized, http.StatusForbidden, http.StatusUnprocessableEntity} {
139+
for _, status := range []int{http.StatusBadRequest, http.StatusUnprocessableEntity, http.StatusNotFound, http.StatusConflict} {
137140
srv, _ := fakeBrevo(t, status)
138141
p := newTestProvider(t, srv, nil)
139142

@@ -151,7 +154,40 @@ func TestBrevoProvider_4xxReturnsPermanent(t *testing.T) {
151154
continue
152155
}
153156
if se.Class != SendClassPermanent {
154-
t.Errorf("SendEvent on %d → Class=%v; want SendClassPermanent — holding cursor on auth errors pins the queue forever", status, se.Class)
157+
t.Errorf("SendEvent on %d → Class=%v; want SendClassPermanent — a payload reject must advance past the poisoned row", status, se.Class)
158+
}
159+
}
160+
}
161+
162+
// TestBrevoProvider_AuthFailureIsTransient is the P0-1 regression test.
163+
//
164+
// BUG: a bad/expired/revoked BREVO_API_KEY returns 401/403. The provider
165+
// classified that Permanent → the forwarder advanced the cursor → every
166+
// audit row in every batch was silently, unrecoverably dropped. A 429
167+
// rate-limit had the same fate. This test FAILS against the pre-fix code
168+
// (which returned SendClassPermanent for 401/403/429) and PASSES only
169+
// when those statuses are classified Transient so the cursor is held and
170+
// the email is recoverable once the operator rotates the key / backs off.
171+
func TestBrevoProvider_AuthFailureIsTransient(t *testing.T) {
172+
for _, status := range []int{http.StatusUnauthorized, http.StatusForbidden, http.StatusTooManyRequests} {
173+
srv, _ := fakeBrevo(t, status)
174+
p := newTestProvider(t, srv, nil)
175+
176+
err := p.SendEvent(context.Background(), EventEmail{
177+
Kind: "subscription.upgraded",
178+
Recipient: "x@example.com",
179+
})
180+
if err == nil {
181+
t.Errorf("SendEvent on %d = nil; want SendError(Transient)", status)
182+
continue
183+
}
184+
var se *SendError
185+
if !errors.As(err, &se) {
186+
t.Errorf("SendEvent on %d returned %T; want *SendError", status, err)
187+
continue
188+
}
189+
if se.Class != SendClassTransient {
190+
t.Errorf("SendEvent on %d → Class=%v; want SendClassTransient — auth/rate failure must HOLD the cursor, not advance and drop email silently", status, se.Class)
155191
}
156192
}
157193
}

internal/jobs/checkout_reconcile.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,11 @@ func (w *CheckoutReconcileWorker) Work(ctx context.Context, job *river.Job[Check
175175
rows.Close()
176176

177177
if len(candidates) == 0 {
178-
slog.Info("jobs.checkout_reconcile.completed",
178+
// P1-1 (BugBash 2026-05-19): idle tick — zero candidates carries
179+
// no operational signal. Demoted INFO → DEBUG; liveness is
180+
// covered by jobs.middleware.work_ok. INFO is reserved for a
181+
// tick that actually did work.
182+
slog.Debug("jobs.checkout_reconcile.completed",
179183
"emailed", 0,
180184
"resolved_late", 0,
181185
"skipped", 0,

internal/jobs/deployment_reminder.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,15 @@ package jobs
77
// - ttl_policy != 'permanent'
88
// - status NOT IN ('deleted', 'expired')
99
// - expires_at falls within the next 12h
10-
// - reminders_sent < 6
10+
// - reminders_sent < maxDeployReminders (3)
1111
// - last_reminder_at IS NULL OR last_reminder_at < now() - 2h (cooldown)
1212
//
13+
// F3 (BugBash 2026-05-19): the cadence used to be SIX identical emails
14+
// over the final 12h (T+12/14/16/18/20/22h) — read as spam. It is now a
15+
// 3-stage escalating cadence ("Heads up" → "Reminder" → "Final reminder")
16+
// matching anon.expiry_warning's shape. See maxDeployReminders and the
17+
// reminder_index-keyed subject in renderDeployExpiringSoon.
18+
//
1319
// For each candidate, CAS-advance reminders_sent (so two ticks don't fire
1420
// the same reminder twice), then write a deploy.expiring_soon audit_log
1521
// row carrying every param the email template needs. The BrevoForwarder
@@ -30,8 +36,9 @@ package jobs
3036
// below.
3137
//
3238
// Cadence in practice: a deploy that lands at T0 with auto_24h TTL fires
33-
// reminders at T+12h, T+14h, T+16h, T+18h, T+20h, T+22h — six emails over
34-
// the final 12h.
39+
// 3 reminders in the final 12h window — roughly T+12h ("Heads up"),
40+
// T+14h ("Reminder"), T+16h ("Final reminder") — with the 2h cooldown
41+
// gating the gap. Three escalating emails, not six identical ones.
3542
//
3643
// Audit kind: deploy.expiring_soon. Metadata: {deploy_id, team_id,
3744
// reminder_index, hours_remaining, expires_at, app_id, deploy_url,
@@ -53,6 +60,12 @@ import (
5360
"instant.dev/worker/internal/metrics"
5461
)
5562

63+
// maxDeployReminders caps how many deploy-expiry reminders fire per
64+
// deployment (F3, BugBash 2026-05-19). 3 stages — "Heads up" / "Reminder"
65+
// / "Final reminder" — matching the anon.expiry_warning escalating
66+
// cadence. Was 6, which produced six identical emails over 12h.
67+
const maxDeployReminders = 3
68+
5669
// DeploymentReminderArgs is the River job payload (no fields — runs as a sweep).
5770
type DeploymentReminderArgs struct{}
5871

@@ -128,11 +141,11 @@ func (w *DeploymentReminderWorker) Work(ctx context.Context, job *river.Job[Depl
128141
AND d.status NOT IN ('deleted', 'expired')
129142
AND d.expires_at > $1
130143
AND d.expires_at <= $2
131-
AND d.reminders_sent < 6
144+
AND d.reminders_sent < $4
132145
AND (d.last_reminder_at IS NULL OR d.last_reminder_at <= $3)
133146
ORDER BY d.expires_at ASC
134147
LIMIT 500
135-
`, now, windowEnd, cooldownBefore)
148+
`, now, windowEnd, cooldownBefore, maxDeployReminders)
136149
if err != nil {
137150
return fmt.Errorf("DeploymentReminderWorker: query failed: %w", err)
138151
}
@@ -160,7 +173,10 @@ func (w *DeploymentReminderWorker) Work(ctx context.Context, job *river.Job[Depl
160173
w.sampleTTLGauge(ctx)
161174

162175
if len(candidates) == 0 {
163-
slog.Info("jobs.deployment_reminder.completed",
176+
// P1-1 (BugBash 2026-05-19): idle tick — demoted INFO → DEBUG.
177+
// deployment_reminder runs every 60s; an idle INFO every minute
178+
// is heartbeat noise. Liveness via jobs.middleware.work_ok.
179+
slog.Debug("jobs.deployment_reminder.completed",
164180
"sent", 0,
165181
"candidates", 0,
166182
"duration_ms", time.Since(start).Milliseconds(),
@@ -284,9 +300,9 @@ func advanceReminderCAS(ctx context.Context, db *sql.DB, deployIDStr string, exp
284300
last_reminder_at = now()
285301
WHERE id = $1
286302
AND reminders_sent = $2
287-
AND reminders_sent < 6
303+
AND reminders_sent < $4
288304
AND (last_reminder_at IS NULL OR last_reminder_at <= $3)
289-
`, deployIDStr, expectedRemindersSent, cooldownBefore)
305+
`, deployIDStr, expectedRemindersSent, cooldownBefore, maxDeployReminders)
290306
if err != nil {
291307
return false, err
292308
}

internal/jobs/deployment_reminder_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func TestDeploymentReminderWorker_WritesAuditWithFullMetadata(t *testing.T) {
9393

9494
// CAS UPDATE.
9595
mock.ExpectExec(`UPDATE deployments\s+SET reminders_sent`).
96-
WithArgs("deploy-1", 2, sqlmock.AnyArg()).
96+
WithArgs("deploy-1", 2, sqlmock.AnyArg(), 3 /* maxDeployReminders */).
9797
WillReturnResult(sqlmock.NewResult(0, 1))
9898

9999
// Audit INSERT — the BrevoForwarder picks this up on its next tick.
@@ -153,7 +153,7 @@ func TestDeploymentReminderWorker_CASRaceLostNoAudit(t *testing.T) {
153153

154154
// CAS returns rowsAffected=0 — another tick won.
155155
mock.ExpectExec(`UPDATE deployments\s+SET reminders_sent`).
156-
WithArgs("deploy-race", 1, sqlmock.AnyArg()).
156+
WithArgs("deploy-race", 1, sqlmock.AnyArg(), 3 /* maxDeployReminders */).
157157
WillReturnResult(sqlmock.NewResult(0, 0))
158158

159159
// NO INSERT INTO audit_log expected here.

0 commit comments

Comments
 (0)