Skip to content

Commit baaf595

Browse files
fix(forwarder): P1-3/F4/F5 email-pipeline correctness bundle
P1-3 (forwarder_sent ledger enrichment, migration 059): Enrich the worker-side send ledger with provider / provider_id / recipient (masked) / template_kind / classification columns so support staff can grep "what happened to email X" without log- spelunking. New ledgerClaim struct + maskRecipientForLedger helper thread these through every markSent call. Classification today is 'success' for confirmed 2xx and 'permanent_drop' for terminal-but- no-2xx (F4 missing_renderer + provider Permanent / SkippedNoTemplate). F4 (missing renderer reshape): A kind registered in eventEmailBuilders but missing from eventEmailBodyRenderers used to hold the cursor (queue pinned until registry repaired). Now: log loud ERROR "missing_email_renderer", increment metrics.EmailMissingRendererTotal{kind}, INSERT a permanent_drop forwarder_sent row, and advance the cursor — the row will never produce an email regardless of retries, so pinning the queue is worse than dropping. The CI registry test TestEventEmail_EverySupportedKindFullyWired (and the new TestEventEmail_EveryBuilderHasARenderer inverse-coverage walk) catches the half-registration at gate time; this is the runtime backstop. F5 (suppression vs send recipient): Already extracted recipient once at the top of the loop in earlier fix; this PR also threads that same recipient (masked) into the forwarder_sent ledger row, so the suppression check, the provider send, and the audit row all reference the SAME address. Tests: - Updated TestEventForwarder_MissingRenderer to assert the new advance + permanent_drop ledger behavior (was: cursor held). - Added TestEventEmail_EveryBuilderHasARenderer inverse-coverage test (walks live eventEmailBuilders map, asserts each kind has a renderer). Per CLAUDE.md rule 18 — iterate the live registry. Coverage block — F4: Symptom: missing_email_renderer (registered builder, missing renderer) Enumeration: grep -n "eventEmailBuilders\[" worker/internal/jobs/event_email_mapping.go + iterate registry in TestEventEmail_EveryBuilderHasARenderer Sites found: 1 fall-through path in Work() Sites touched: 1 (event_email_forwarder.go Work() missing-renderer branch) Coverage test: TestEventEmail_EveryBuilderHasARenderer + TestEventForwarder_MissingRenderer_LoudErrorDropAndAdvance Live verified: awaiting deploy (sql migration 059 must land first via api) Coverage block — F5: Symptom: suppression check using row.OwnerEmail while send uses metadata.email Enumeration: grep -n "row.OwnerEmail\|resolveRecipient" worker/internal/jobs/event_email_forwarder.go Sites found: 1 (was: hasSuppression(row.OwnerEmail)) Sites touched: 1 (already fixed in earlier commit; this PR threads same masked recipient into ledger.markSent for consistency) Coverage test: TestEventForwarder_SuppressionUsesSentRecipient Live verified: awaiting deploy Coverage block — P1-3 ledger enrichment: Symptom: forwarder_sent has audit_id + sent_at only — support can't trace Enumeration: grep -rn "forwarder_sent\b" worker/ api/ Sites found: sqlSentLedger.markSent (1), sql/055_*.sql (1) Sites touched: 4 (sqlSentLedger.markSent, ledgerClaim type, 3 call sites in Work() — success / Permanent / F4 paths; migration 059 added) Coverage test: TestEventForwarder_MissingRenderer_LoudErrorDropAndAdvance (asserts ledger.lastClaim.Classification == permanent_drop + Provider == "none" + ProviderID == "missing_renderer") Live verified: awaiting deploy Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4178278 commit baaf595

4 files changed

Lines changed: 311 additions & 49 deletions

File tree

internal/jobs/event_email_forwarder.go

Lines changed: 187 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"errors"
4040
"fmt"
4141
"log/slog"
42+
"strings"
4243
"time"
4344

4445
"github.com/lib/pq"
@@ -47,8 +48,80 @@ import (
4748
"go.opentelemetry.io/otel"
4849

4950
"instant.dev/worker/internal/email"
51+
"instant.dev/worker/internal/metrics"
5052
)
5153

54+
// ── forwarder_sent classification constants (migration 059) ───────────────
55+
//
56+
// Named constants per CLAUDE.md feedback_no_hardcoded_strings. The
57+
// `classification` column on forwarder_sent records the terminal outcome
58+
// of one forwarding attempt. Today only two values land: ledgerClassSuccess
59+
// (a confirmed 2xx) and ledgerClassPermanentDrop (a terminal-but-no-2xx
60+
// outcome — F4 missing_renderer + provider Permanent / SkippedNoTemplate).
61+
// The transient_retry value is reserved for a future per-attempt audit;
62+
// the current ordering keeps no ledger row for a Transient send (so the
63+
// next tick retries cleanly).
64+
const (
65+
ledgerClassSuccess = "success"
66+
ledgerClassPermanentDrop = "permanent_drop"
67+
ledgerClassTransient = "transient_retry"
68+
69+
// providerNoneMissingRenderer is the synthetic provider name written
70+
// to forwarder_sent.provider when the F4 missing-renderer path drops a
71+
// row. No provider was actually called — store 'none' so a support
72+
// query can grep classification='permanent_drop' AND provider='none'
73+
// to isolate exactly the missing-renderer drops (vs Brevo Permanent
74+
// failures which carry provider='brevo').
75+
providerNoneMissingRenderer = "none"
76+
77+
// providerIDMissingRenderer is the synthetic provider_id used for F4
78+
// rows. Real provider message ids never start with this literal so a
79+
// support query like
80+
// SELECT * FROM forwarder_sent WHERE provider_id='missing_renderer'
81+
// returns exactly the F4 drops.
82+
providerIDMissingRenderer = "missing_renderer"
83+
)
84+
85+
// ledgerClaim is the bundle of audit columns persisted on a forwarder_sent
86+
// row. Migration 059 added the columns; this struct is the in-memory
87+
// projection passed into markSent.
88+
//
89+
// Recipient is ALREADY masked at the call site (via maskRecipientForLedger)
90+
// — the ledger row stores PII-safe text only. Storing raw addresses
91+
// would defeat the worker-wide PII discipline (see logsafe.go in
92+
// internal/email/ + the api models.MaskEmail mirror).
93+
type ledgerClaim struct {
94+
AuditID string
95+
Provider string
96+
ProviderID string
97+
Recipient string // MUST be pre-masked by the caller
98+
TemplateKind string
99+
Classification string
100+
}
101+
102+
// maskRecipientForLedger returns a privacy-preserving rendering of an
103+
// email address for persistence in forwarder_sent.recipient. Mirrors the
104+
// algorithm in internal/email/logsafe.go and api/internal/email/email.go
105+
// :maskEmail byte-for-byte (the email-package helper is unexported;
106+
// duplicating the 7-line function is the cheapest way to avoid widening
107+
// the email-package API surface for this one caller).
108+
//
109+
// "alice@example.com" → "a***@example.com"
110+
// "a@example.com" → "a@example.com" (1-char local kept)
111+
// "" / "no-at-sign" / "@only" → returned unchanged (defensive)
112+
func maskRecipientForLedger(addr string) string {
113+
at := strings.LastIndex(addr, "@")
114+
if at <= 0 {
115+
return addr
116+
}
117+
local := addr[:at]
118+
domain := addr[at:]
119+
if len(local) == 1 {
120+
return local + domain
121+
}
122+
return local[:1] + "***" + domain
123+
}
124+
52125
// EventEmailForwarderArgs is the River job payload — no fields, runs as a sweep.
53126
type EventEmailForwarderArgs struct{}
54127

@@ -131,7 +204,7 @@ const eventEmailCursorSeedGrace = 5 * time.Minute
131204
// pre-existing claim are tolerated.
132205
type sentLedger interface {
133206
isSent(ctx context.Context, auditID string) (sent bool, err error)
134-
markSent(ctx context.Context, auditID string) (claimed bool, err error)
207+
markSent(ctx context.Context, claim ledgerClaim) (claimed bool, err error)
135208
release(ctx context.Context, auditID string) error
136209
}
137210

@@ -154,15 +227,21 @@ func (l *sqlSentLedger) isSent(ctx context.Context, auditID string) (bool, error
154227
return exists, nil
155228
}
156229

157-
// markSent inserts (audit_id) ON CONFLICT DO NOTHING. RowsAffected==1
158-
// means this call claimed the send; ==0 means the audit_id was already
159-
// in the ledger and the send must be skipped.
160-
func (l *sqlSentLedger) markSent(ctx context.Context, auditID string) (bool, error) {
230+
// markSent inserts the ledgerClaim row ON CONFLICT DO NOTHING.
231+
// RowsAffected==1 means this call claimed the send; ==0 means the audit_id
232+
// was already in the ledger and the send must be skipped.
233+
//
234+
// Migration 059 enriched the table with provider / provider_id / recipient
235+
// (masked) / template_kind / classification columns so support staff can
236+
// answer "what happened to email X" without log-spelunking. The caller
237+
// MUST pre-mask the recipient (see ledgerClaim doc).
238+
func (l *sqlSentLedger) markSent(ctx context.Context, c ledgerClaim) (bool, error) {
161239
res, err := l.db.ExecContext(ctx, `
162-
INSERT INTO forwarder_sent (audit_id)
163-
VALUES ($1)
240+
INSERT INTO forwarder_sent
241+
(audit_id, provider, provider_id, recipient, template_kind, classification)
242+
VALUES ($1, $2, $3, $4, $5, $6)
164243
ON CONFLICT (audit_id) DO NOTHING
165-
`, auditID)
244+
`, c.AuditID, c.Provider, c.ProviderID, c.Recipient, c.TemplateKind, c.Classification)
166245
if err != nil {
167246
return false, fmt.Errorf("forwarder_sent insert: %w", err)
168247
}
@@ -188,9 +267,9 @@ func (l *sqlSentLedger) release(ctx context.Context, auditID string) error {
188267
// about the ledger path. Production NEVER uses this.
189268
type noopSentLedger struct{}
190269

191-
func (noopSentLedger) isSent(context.Context, string) (bool, error) { return false, nil }
192-
func (noopSentLedger) markSent(context.Context, string) (bool, error) { return true, nil }
193-
func (noopSentLedger) release(context.Context, string) error { return nil }
270+
func (noopSentLedger) isSent(context.Context, string) (bool, error) { return false, nil }
271+
func (noopSentLedger) markSent(context.Context, ledgerClaim) (bool, error) { return true, nil }
272+
func (noopSentLedger) release(context.Context, string) error { return nil }
194273

195274
// Suppression-related constants — mirrored from the api package's
196275
// internal/models.email_events.go so the worker doesn't import the api
@@ -524,11 +603,16 @@ batchLoop:
524603
// EXPECTED, benign state (deleted / orphan / test teams), so
525604
// this logs at INFO, not WARN — a steady trickle of orphan
526605
// rows must not erode WARN's signal value.
606+
// #147 (BugBash 2026-05-20): standardize on `skip_reason` as the
607+
// dedupe-friendly facet name (matches the rest of the worker's
608+
// `skipped_ephemeral` / `skip_ingress_steps` shape). Renamed from
609+
// `reason` so NR Logs can pivot on a single field across builders.
527610
slog.Info("jobs.event_email_forwarder.builder_skipped_row",
528611
"kind", row.Kind,
529612
"audit_id", row.ID,
530613
"team_id", row.TeamID,
531-
"reason", "builder returned ok=false (no resolvable owner email — expected for deleted/orphan/test teams)",
614+
"skip_reason", "no_owner_email",
615+
"note", "builder returned ok=false (expected for deleted/orphan/test teams)",
532616
)
533617
if advErr := w.cursor.write(ctx, eventCursor{CreatedAt: row.CreatedAt, ID: row.ID}); advErr != nil {
534618
return fmt.Errorf("event_email_forwarder: advance cursor after builder skip: %w", advErr)
@@ -604,25 +688,67 @@ batchLoop:
604688
continue
605689
}
606690

607-
// F4 (BugBash 2026-05-19): a kind that has a builder but NO
608-
// registered renderer used to fall through to the dead Brevo
609-
// dashboard-template path → SkippedNoTemplate → cursor advanced
610-
// silently → zero email, zero error, audit row consumed forever.
611-
// AS OF 2026-05-15 every kind IS Go-rendered, so a missing
612-
// renderer is now unambiguously a programming bug (a 19th kind
613-
// added to eventEmailBuilders without a renderer). Treat it as a
614-
// loud ERROR and HOLD the cursor — never advance silently. The
615-
// TestEveryEmailKindHasAGoRenderer registry test catches this at
616-
// CI time; this is the runtime backstop if it ever ships anyway.
691+
// F4 (BugBash 2026-05-19, reshaped BugBash 2026-05-20): a kind
692+
// that has a builder but NO registered renderer used to fall
693+
// through to the dead Brevo dashboard-template path →
694+
// SkippedNoTemplate → cursor advanced silently → zero email, zero
695+
// error, audit row consumed forever. The
696+
// TestEventEmail_EverySupportedKindFullyWired registry test catches
697+
// this at CI time; this is the runtime backstop if it ever ships
698+
// anyway.
699+
//
700+
// Behavior on a missing renderer:
701+
// 1. Log loud ERROR with the literal message
702+
// "missing_email_renderer" and `audit_id` / `team_id` / `kind`.
703+
// The literal lets the NR alert key on
704+
// message:"missing_email_renderer" independent of kind
705+
// cardinality.
706+
// 2. Increment metrics.EmailMissingRendererTotal{kind} so an
707+
// operator alert can fire on rate without parsing logs.
708+
// 3. INSERT into forwarder_sent with classification='permanent_drop'
709+
// / provider='none' / provider_id='missing_renderer' so
710+
// support can grep the ledger for exactly these rows.
711+
// 4. ADVANCE the cursor — the row will never produce an email
712+
// regardless of how many retries we do, and holding the
713+
// cursor pins the queue behind a programming bug for as long
714+
// as the registry stays broken. The CI registry test makes
715+
// this condition vanishingly rare in practice; the runtime
716+
// path prioritizes "don't pin the queue" over "don't drop
717+
// the row" because in this state the row was already going
718+
// to be dropped.
617719
renderer, hasRenderer := eventEmailBodyRenderers[row.Kind]
618720
if !hasRenderer {
619-
slog.Error("jobs.event_email_forwarder.missing_renderer",
721+
slog.Error("missing_email_renderer",
620722
"kind", row.Kind,
621723
"audit_id", row.ID,
622-
"note", "kind has a builder but no Go renderer — holding cursor (NOT advancing). This is a registry bug; add a renderer to eventEmailBodyRenderers. Email NOT sent.",
724+
"team_id", row.TeamID,
725+
"note", "kind has a builder but no Go renderer — F4 permanent_drop. Add a renderer to eventEmailBodyRenderers. Email NOT sent; forwarder_sent ledger gets a permanent_drop row.",
623726
)
624-
transient++
625-
break batchLoop
727+
metrics.EmailMissingRendererTotal.WithLabelValues(row.Kind).Inc()
728+
if _, ledgerErr := w.ledger.markSent(ctx, ledgerClaim{
729+
AuditID: row.ID,
730+
Provider: providerNoneMissingRenderer,
731+
ProviderID: providerIDMissingRenderer,
732+
Recipient: maskRecipientForLedger(recipient),
733+
TemplateKind: row.Kind,
734+
Classification: ledgerClassPermanentDrop,
735+
}); ledgerErr != nil {
736+
// Best-effort — log and advance even if the ledger insert
737+
// fails. The alternative (holding the cursor) would still
738+
// pin the queue behind a programming bug we already know
739+
// the row can never resolve.
740+
slog.Warn("jobs.event_email_forwarder.missing_renderer_ledger_failed",
741+
"audit_id", row.ID,
742+
"kind", row.Kind,
743+
"error", ledgerErr,
744+
"note", "F4 permanent_drop ledger insert failed — advancing cursor anyway (row can never produce an email); support will grep the ERROR log instead",
745+
)
746+
}
747+
if advErr := w.cursor.write(ctx, eventCursor{CreatedAt: row.CreatedAt, ID: row.ID}); advErr != nil {
748+
return fmt.Errorf("event_email_forwarder: advance cursor after missing_renderer: %w", advErr)
749+
}
750+
skipped++
751+
continue
626752
}
627753

628754
// recipient was resolved once at the top of the loop (F5) — the
@@ -692,8 +818,17 @@ batchLoop:
692818
// then advance the cursor. If the markSent insert fails the
693819
// send was real and the cursor must not advance — retry next
694820
// tick (Brevo dedup header absorbs the at-most-once duplicate
695-
// where honored).
696-
claimed, ledgerErr := w.ledger.markSent(ctx, row.ID)
821+
// where honored). The ledger row carries the audit columns
822+
// added by migration 059 so support can reconstruct what got
823+
// sent without grepping logs.
824+
claimed, ledgerErr := w.ledger.markSent(ctx, ledgerClaim{
825+
AuditID: row.ID,
826+
Provider: w.provider.Name(),
827+
ProviderID: evt.IdempotencyKey, // best available — providers don't surface a message id today
828+
Recipient: maskRecipientForLedger(recipient),
829+
TemplateKind: row.Kind,
830+
Classification: ledgerClassSuccess,
831+
})
697832
if ledgerErr != nil {
698833
slog.Warn("jobs.event_email_forwarder.ledger_claim_failed_post_send",
699834
"audit_id", row.ID,
@@ -729,8 +864,17 @@ batchLoop:
729864
// Permanent failure (e.g. an invalid recipient). A best-effort
730865
// claim — a failure here is non-fatal because the cursor still
731866
// advances; worst case a duplicate Permanent error is logged
732-
// on a cursor reset, which is harmless.
733-
if claimed, ledgerErr := w.ledger.markSent(ctx, row.ID); ledgerErr != nil {
867+
// on a cursor reset, which is harmless. classification is
868+
// permanent_drop so a support grep against the 059 columns
869+
// finds these alongside the F4 missing_renderer drops.
870+
if claimed, ledgerErr := w.ledger.markSent(ctx, ledgerClaim{
871+
AuditID: row.ID,
872+
Provider: w.provider.Name(),
873+
ProviderID: evt.IdempotencyKey,
874+
Recipient: maskRecipientForLedger(recipient),
875+
TemplateKind: row.Kind,
876+
Classification: ledgerClassPermanentDrop,
877+
}); ledgerErr != nil {
734878
slog.Warn("jobs.event_email_forwarder.ledger_claim_failed_terminal",
735879
"audit_id", row.ID,
736880
"kind", row.Kind,
@@ -772,7 +916,19 @@ batchLoop:
772916
}
773917
}
774918

775-
slog.Info("jobs.event_email_forwarder.completed",
919+
// #146 (BugBash 2026-05-20 idle-tick noise pass): the forwarder runs
920+
// every 60s. The early-return path on no_new_rows is already DEBUG
921+
// (line 482), so this branch only fires when there WERE rows. But the
922+
// dominant low-traffic shape is: cursor advanced past a row that all
923+
// skipped (suppression / duplicate / unsupported-kind) — sent==0,
924+
// transient==0. That is also idle-tick steady state from the
925+
// observability standpoint. Emit DEBUG when nothing went out the wire
926+
// AND no transient errors occurred; INFO when bits genuinely moved.
927+
forwarderLevel := slog.LevelInfo
928+
if sent == 0 && transient == 0 {
929+
forwarderLevel = slog.LevelDebug
930+
}
931+
slog.Log(ctx, forwarderLevel, "jobs.event_email_forwarder.completed",
776932
"provider", w.provider.Name(),
777933
"sent", sent,
778934
"skipped", skipped,

0 commit comments

Comments
 (0)