Skip to content

Commit 02536f5

Browse files
fix(billing): F1 reconciler orphan sweep + F6 entitlement drift signal
Billing-trust audit 2026-05-19 — worker-repo findings. F1 (P0) — billing reconciler blind spot. The primary sweep starts from teams WHERE stripe_customer_id IS NOT NULL — the persisted Razorpay subscription id, written by a best-effort, non-fatal UPDATE at checkout. If that write is lost and the customer then pays, the team is structurally invisible to the reconciler forever: Razorpay bills the card, the DB stays on free/hobby, nothing corrects it. Fix: add a Razorpay-authoritative orphan sweep (billing_reconciler.go runOrphanSweep) that runs after the primary sweep. It enumerates pending_checkouts — which records the (subscription_id, team_id) pair for EVERY checkout, independent of the lost UPDATE — fetches each live Razorpay subscription, and elevates any team Razorpay reports paid-and-active whose DB tier is still below the entitled tier. It then backfills teams.stripe_customer_id so the team is visible to the primary sweep thereafter. Fully fail-open: a query error logs and returns, a per-checkout Razorpay error / circuit-open aborts or skips that row only. No api-side change needed — pending_checkouts already carries the pair the worker needs. F6 (P2) — infra entitlement drift unalerted. The entitlement reconciler corrected Postgres connection-cap / Redis maxmemory drift but only via a generic per-resource INFO line, so monitoring could not alert on a rising drift-correction rate. Fix: emit a dedicated WARN-level jobs.entitlement_reconciler.drift_corrected signal + a 1:1 instant_entitlement_drift_corrected_total counter (labelled by resource_type) on every applied correction, for both the Postgres and Redis paths. Tests (fail without the fix, pass with it): - TestBillingReconciler_OrphanSweep_PaidTeamNoSubID_Recovered — paid team with NULL stripe_customer_id is detected and elevated. - TestBillingReconciler_OrphanSweep_AlreadyCorrectTier_NoUpgrade - TestBillingReconciler_OrphanSweep_NonActiveStatus_NoUpgrade - TestBillingReconciler_OrphanSweep_QueryFailure_FailOpen - TestEntitlementReconciler_PostgresDriftCorrected_EmitsSignal - TestEntitlementReconciler_RedisDriftCorrected_EmitsSignal - TestEntitlementReconciler_NoDrift_NoSignal The 14 existing billing-reconciler Work() tests updated for the new orphan-sweep query (expectEmptyOrphanSweep helper). New metrics: instant_billing_reconciler_orphan_scanned_total, instant_billing_reconciler_orphan_corrected_total, instant_entitlement_drift_corrected_total. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent b631e25 commit 02536f5

5 files changed

Lines changed: 740 additions & 0 deletions

File tree

internal/jobs/billing_reconciler.go

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,38 @@ const defaultBillingReconcileInterval = 15 * time.Minute
8181
// consecutive ticks via the stable ORDER BY id pagination.
8282
const billingReconcilerBatchLimit = 100
8383

84+
// ── F1: orphan-checkout sweep constants ───────────────────────────────────────
85+
//
86+
// The primary sweep above starts from teams.stripe_customer_id (the persisted
87+
// Razorpay subscription id). That id is written by a best-effort, non-fatal
88+
// UPDATE in the api at checkout time — if the write is lost and the customer
89+
// then pays, the team is structurally invisible to the primary sweep forever:
90+
// Razorpay bills the card, the DB stays on free/hobby, nothing corrects it (F1).
91+
//
92+
// The orphan sweep is a Razorpay-authoritative second pass. It does NOT start
93+
// from the teams table — it enumerates pending_checkouts (api migration 034),
94+
// which records the (subscription_id, team_id) pair for EVERY checkout the api
95+
// ever minted, independent of whether the teams.stripe_customer_id UPDATE
96+
// landed. For each checkout it fetches the live Razorpay subscription; any team
97+
// Razorpay reports paid-and-active whose DB tier is still below the entitled
98+
// tier is elevated — the same correction the primary sweep applies, just
99+
// reachable for teams the primary sweep cannot see.
100+
101+
// billingReconcilerOrphanBatchLimit caps the orphan sweep's per-tick fan-out.
102+
// Smaller than the primary batch: the orphan population is the steady-state
103+
// pending_checkouts table minus already-up-to-date teams — tiny in practice.
104+
// A backlog drains across consecutive ticks via the stable ORDER BY.
105+
const billingReconcilerOrphanBatchLimit = 100
106+
107+
// billingReconcilerOrphanMinAge is how old a pending_checkouts row must be
108+
// before the orphan sweep considers it. A just-minted checkout is still in the
109+
// happy-path window (webhook in flight, primary sweep will pick it up once the
110+
// sub-id persists); only a checkout that has been around long enough that a
111+
// missed-write + missed-webhook is plausible is worth a Razorpay round-trip.
112+
// 15 minutes matches checkoutReconcileGracePeriod — the same "long enough to
113+
// be a real failure, not an in-flight checkout" threshold.
114+
const billingReconcilerOrphanMinAge = 15 * time.Minute
115+
84116
// billingReconcilerRazorpayTimeout is the per-team Razorpay fetch budget.
85117
// FetchSubscriptionForReconciler makes up to 2 Razorpay API calls; 10s is
86118
// the ceiling so a slow response cannot hold the tick open for the full batch.
@@ -863,18 +895,205 @@ func (w *BillingReconcilerWorker) Work(ctx context.Context, job *river.Job[Billi
863895
}
864896

865897
done:
898+
// F1: Razorpay-authoritative orphan sweep. Catches paid teams the primary
899+
// teams-table sweep is structurally blind to (their checkout-time
900+
// subscription_id UPDATE was lost). Fail-open: an error here is logged and
901+
// swallowed — the primary sweep's corrections are already committed and the
902+
// next tick retries the orphan pass.
903+
orphanScanned, orphanCorrected := w.runOrphanSweep(ctx)
904+
866905
slog.Info("jobs.billing_reconciler.completed",
867906
"teams_scanned", len(teams),
868907
"gap_upgrade", gapUpgrade,
869908
"gap_downgrade", gapDowngrade,
870909
"corrected_upgrade", correctedUpgrade,
871910
"corrected_downgrade", correctedDowngrade,
872911
"razorpay_errors", razorpayErrors,
912+
"orphan_scanned", orphanScanned,
913+
"orphan_corrected", orphanCorrected,
873914
"duration_ms", time.Since(start).Milliseconds(),
874915
)
875916
return nil
876917
}
877918

919+
// billingReconcilerOrphanRow is the projection the orphan sweep's SELECT over
920+
// pending_checkouts returns: the (subscription_id, team_id) pair recorded at
921+
// checkout time, plus the team's CURRENT plan_tier so the sweep can decide
922+
// whether an elevation is needed without a second query.
923+
type billingReconcilerOrphanRow struct {
924+
subscriptionID string
925+
teamID uuid.UUID
926+
planTier string
927+
}
928+
929+
// runOrphanSweep is the F1 fix: a Razorpay-authoritative second pass that
930+
// starts from pending_checkouts instead of teams.stripe_customer_id.
931+
//
932+
// pending_checkouts (api migration 034) records the (subscription_id, team_id)
933+
// pair for EVERY checkout the api minted — it is written when the customer is
934+
// handed the Razorpay short_url, on the SAME insert path, NOT via the
935+
// best-effort UPDATE that the primary sweep's candidate column depends on. So a
936+
// team whose teams.stripe_customer_id write was lost still has a
937+
// pending_checkouts row carrying its subscription_id.
938+
//
939+
// For each checkout the sweep fetches the live Razorpay subscription. If
940+
// Razorpay reports the subscription active/authenticated (paid or card
941+
// authorised) and the team's DB tier is BELOW the entitled tier, the team is
942+
// elevated via the same upgradeTeamTiers path the primary sweep uses, and a
943+
// subscription.upgraded audit row is emitted for the event-email forwarder.
944+
//
945+
// Error contract — fully fail-open:
946+
// - A candidate-query failure logs and returns (0,0): the primary sweep's
947+
// work is already committed; River retries the whole tick in 15 minutes.
948+
// - A per-checkout Razorpay error / circuit-open / not-configured aborts or
949+
// skips that row only, exactly like the primary sweep.
950+
// - An elevation DB error logs and moves to the next row.
951+
//
952+
// Returns (scanned, corrected) for the completion-log summary.
953+
func (w *BillingReconcilerWorker) runOrphanSweep(ctx context.Context) (scanned, corrected int) {
954+
cutoff := time.Now().UTC().Add(-billingReconcilerOrphanMinAge)
955+
956+
// Candidate query: pending_checkouts rows joined to their team. The JOIN
957+
// projects the team's current plan_tier so the sweep needs no follow-up
958+
// query. Ordered by created_at for stable per-tick pagination.
959+
rows, err := w.db.QueryContext(ctx, `
960+
SELECT pc.subscription_id, pc.team_id, t.plan_tier
961+
FROM pending_checkouts pc
962+
JOIN teams t ON t.id = pc.team_id
963+
WHERE pc.subscription_id IS NOT NULL
964+
AND pc.subscription_id != ''
965+
AND pc.created_at < $1
966+
ORDER BY pc.created_at
967+
LIMIT $2
968+
`, cutoff, billingReconcilerOrphanBatchLimit)
969+
if err != nil {
970+
slog.Warn("billing.reconciler.orphan_query_failed",
971+
"error", err,
972+
"note", "primary sweep corrections already committed; next tick retries",
973+
)
974+
return 0, 0
975+
}
976+
defer rows.Close()
977+
978+
var candidates []billingReconcilerOrphanRow
979+
for rows.Next() {
980+
var r billingReconcilerOrphanRow
981+
if scanErr := rows.Scan(&r.subscriptionID, &r.teamID, &r.planTier); scanErr != nil {
982+
slog.Warn("billing.reconciler.orphan_scan_failed", "error", scanErr)
983+
continue
984+
}
985+
candidates = append(candidates, r)
986+
}
987+
if rowsErr := rows.Err(); rowsErr != nil {
988+
slog.Warn("billing.reconciler.orphan_rows_error", "error", rowsErr)
989+
return 0, 0
990+
}
991+
rows.Close()
992+
993+
for i, c := range candidates {
994+
// 100ms stagger between Razorpay calls, same as the primary sweep.
995+
if i > 0 {
996+
select {
997+
case <-ctx.Done():
998+
slog.Warn("billing.reconciler.orphan_context_cancelled", "processed", i)
999+
return scanned, corrected
1000+
case <-time.After(billingReconcilerStaggerDelay):
1001+
}
1002+
}
1003+
1004+
scanned++
1005+
metrics.BillingReconcilerOrphanScanned.Inc()
1006+
1007+
fetchCtx, cancel := context.WithTimeout(ctx, billingReconcilerRazorpayTimeout)
1008+
details, fetchErr := w.fetcher.FetchSubscriptionForReconciler(fetchCtx, c.subscriptionID)
1009+
cancel()
1010+
1011+
if fetchErr != nil {
1012+
metrics.BillingReconcilerRazorpayErrors.Inc()
1013+
// Circuit-open / not-configured → abort the orphan sweep cleanly,
1014+
// matching the primary sweep's batch-abort behaviour.
1015+
if errors.Is(fetchErr, errReconcilerCircuitOpen) ||
1016+
errors.Is(fetchErr, errSubFetcherNotConfigured) {
1017+
slog.Warn("billing.reconciler.orphan_sweep_aborted",
1018+
"reason", fetchErr,
1019+
"processed", i,
1020+
)
1021+
return scanned, corrected
1022+
}
1023+
slog.Warn("billing.reconciler.orphan_fetch_failed",
1024+
"team_id", c.teamID,
1025+
"subscription_id", c.subscriptionID,
1026+
"error", fetchErr,
1027+
)
1028+
continue
1029+
}
1030+
1031+
// Only an active/authenticated subscription means "Razorpay says this
1032+
// team is paid". pending/halted/cancelled checkouts are handled by the
1033+
// primary sweep (once the sub-id persists) and the grace/terminal
1034+
// machinery; the orphan sweep's job is narrowly the charged-but-not-
1035+
// upgraded hole.
1036+
statusClass, known := razorpayStatusClass[details.Status]
1037+
if !known || statusClass != rzpStatusClassActive {
1038+
continue
1039+
}
1040+
1041+
expectedTier := billingReconcilerPlanIDToTier(details.PlanID)
1042+
if billingTierRank(c.planTier) >= billingTierRank(expectedTier) {
1043+
// DB tier already at or above expected — no orphan correction
1044+
// needed (the team was upgraded by some other path).
1045+
continue
1046+
}
1047+
1048+
// Charged-but-not-upgraded, recovered. This team paid at Razorpay but
1049+
// its checkout-time subscription_id UPDATE was lost, so the primary
1050+
// teams-table sweep never scanned it. Elevate it now.
1051+
slog.Warn("billing.reconciler.orphan_tier_corrected",
1052+
"team_id", c.teamID,
1053+
"from", c.planTier,
1054+
"to", expectedTier,
1055+
"razorpay_status", details.Status,
1056+
"subscription_id", c.subscriptionID,
1057+
"note", "team paid at Razorpay but had no persisted subscription_id — recovered via pending_checkouts orphan sweep (F1)",
1058+
)
1059+
metrics.BillingReconcilerGapDetected.WithLabelValues("upgrade").Inc()
1060+
1061+
if upgradeErr := w.upgradeTeamTiers(ctx, c.teamID, expectedTier); upgradeErr != nil {
1062+
slog.Error("billing.reconciler.orphan_upgrade_failed",
1063+
"team_id", c.teamID, "error", upgradeErr,
1064+
)
1065+
continue
1066+
}
1067+
corrected++
1068+
metrics.BillingReconcilerOrphanCorrected.Inc()
1069+
metrics.BillingReconcilerGapCorrected.WithLabelValues("upgrade").Inc()
1070+
1071+
// Backfill teams.stripe_customer_id so the team is visible to the
1072+
// primary sweep from now on — without this it would remain an orphan
1073+
// and be re-checked by this sweep every tick. Fail-open: the upgrade is
1074+
// already committed; a failed backfill just means another orphan-sweep
1075+
// pass next tick (idempotent — the rank check above no-ops it).
1076+
if _, backfillErr := w.db.ExecContext(ctx,
1077+
`UPDATE teams SET stripe_customer_id = $1
1078+
WHERE id = $2
1079+
AND (stripe_customer_id IS NULL OR stripe_customer_id = '')`,
1080+
c.subscriptionID, c.teamID,
1081+
); backfillErr != nil {
1082+
slog.Warn("billing.reconciler.orphan_subid_backfill_failed",
1083+
"team_id", c.teamID,
1084+
"subscription_id", c.subscriptionID,
1085+
"error", backfillErr,
1086+
)
1087+
}
1088+
1089+
// Emit the audit row for the event-email forwarder — same as the
1090+
// primary sweep's upgrade path. Fail-open.
1091+
w.emitUpgradeAudit(ctx, c.teamID, c.planTier, expectedTier, c.subscriptionID)
1092+
}
1093+
1094+
return scanned, corrected
1095+
}
1096+
8781097
// upgradeTeamTiers is the worker-local equivalent of models.UpgradeTeamAllTiers.
8791098
// It runs four UPDATE statements inside a transaction, mirroring the api's model.
8801099
//

0 commit comments

Comments
 (0)