Skip to content

Commit ddb0285

Browse files
chore(billing): batch 4 — RUNBOOKS accuracy, params validation, cursor pagination, log levels (#3623)
* chore(billing): batch 4 — RUNBOOKS accuracy, params validation, cursor pagination, log levels 1. RUNBOOKS.md: #2 dead-letter retry count 3→5 (matches BILLING_WEBHOOK_MAX_ATTEMPTS=5); #3 plans/bump body: drop 'reason' field (not in AdminBumpPlanRequest schema, was 422) 2. Zod params validation on admin endpoints: AdminOrgIdParam (ObjectId regex) + AdminEventIdParam (evt_* regex) wired via safeParse in adminGetCustomerStatus, adminSyncFromStripe, adminCancelSubscription, adminPurgeDeadLetter → 422 on invalid input 3. Cursor-based pagination in reconcile service: replace skip+limit with _id > lastSeenId cursor in _fetchPage / findPageForReconciliation. Stable across new subscription inserts mid-run. Tests for multi-page + cursor stability. 4. dispute.funds_reinstated log level: logger.warn → logger.error in handleChargeDisputeFundsReinstated (action-required state, not warn-noise) 5. creditPack duplicate observability: log result.applied=false at debug level in handleCheckoutPaymentCompleted to surface Stripe redelivery phantom calls 6. billing.init.js console.warn → logger.warn (6 occurrences) * fix(billing): add AdminOrgIdParam validation to adminDisputeCredit path param Completes the Batch 4 admin endpoint param coverage — adminDisputeCredit /:orgId was missed in the initial pass. Now all 5 :orgId admin endpoints validate with AdminOrgIdParam.safeParse → 422 on invalid ObjectId. * test(billing): strengthen service-wiring assertions in admin param validation tests CodeRabbit nit: tests asserting "reaches the service" now also verify the controller passed the validated param to the service method (getCustomerStatus called with orgId, purgeDeadLetter called with eventId).
1 parent 60f723b commit ddb0285

11 files changed

Lines changed: 298 additions & 58 deletions

modules/billing/RUNBOOKS.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ Operational runbooks for the billing module. Each runbook references real endpoi
5454

5555
## 2 — Dead-Letter Investigation
5656

57-
**Context**: Stripe webhook events that fail processing 3+ times (or where the idempotency guard fires on a poisoned payload) are marked `deadLetter: true` in `processedStripeEvents`. They accumulate and must be reviewed manually — partial TTL index excludes them from auto-expiry.
57+
**Context**: Stripe webhook events that fail processing 5+ times (or where the idempotency guard fires on a poisoned payload) are marked `deadLetter: true` in `processedStripeEvents`. They accumulate and must be reviewed manually — partial TTL index excludes them from auto-expiry.
5858

5959
**Steps**:
6060

@@ -123,7 +123,7 @@ Operational runbooks for the billing module. Each runbook references real endpoi
123123

124124
```text
125125
PATCH /api/admin/billing/plans/bump
126-
Body: { "orgId": "...", "planId": "pro", "reason": "manual reconciliation post-mismatch" }
126+
Body: { "orgId": "...", "planId": "pro" }
127127
```
128128

129129
5. Re-run `GET /api/admin/billing/customer/:orgId` to confirm `stripeSnapshot` and `dbSnapshot` now match.

modules/billing/billing.init.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ export default async (app) => {
2222
if (config.billing?.packs?.length) {
2323
for (const pack of config.billing.packs) {
2424
if (typeof pack.priceUsd !== 'number' || pack.priceUsd <= 0) {
25-
console.warn(`[billing] pack '${pack.packId}' missing valid priceUsd; refundPartial fallback will be inaccurate`);
25+
logger.warn(`[billing] pack '${pack.packId}' missing valid priceUsd; refundPartial fallback will be inaccurate`);
2626
}
2727
}
2828
}
@@ -33,7 +33,7 @@ export default async (app) => {
3333
const SUPPORTED_THRESHOLD_PERCENTS = new Set([80, 100]);
3434
for (const threshold of getAlertThresholdPercents()) {
3535
if (!SUPPORTED_THRESHOLD_PERCENTS.has(threshold)) {
36-
console.warn(
36+
logger.warn(
3737
`[billing] Configured alert threshold ${threshold}% is not in schema-supported set [80, 100] — alert will be silently skipped`,
3838
);
3939
}
@@ -45,7 +45,7 @@ export default async (app) => {
4545
try {
4646
AnalyticsService.groupIdentify('company', String(organizationId), { plan: newPlan });
4747
} catch (err) {
48-
console.warn('[billing] analytics groupIdentify failed (non-fatal):', err?.message ?? err);
48+
logger.warn('[billing] analytics groupIdentify failed (non-fatal)', { error: err?.message ?? String(err) });
4949
}
5050
});
5151

@@ -131,11 +131,11 @@ export default async (app) => {
131131
const distinctPlans = await Subscription.distinct('plan');
132132
for (const plan of distinctPlans) {
133133
if (!knownPlans.has(plan)) {
134-
console.warn(`[billing] Subscription.plan value "${plan}" not in planDefinitions — orphaned plan, may resolve quota=0`);
134+
logger.warn(`[billing] Subscription.plan value "${plan}" not in planDefinitions — orphaned plan, may resolve quota=0`);
135135
}
136136
}
137137
} catch (err) {
138-
console.warn('[billing] Subscription.plan boot validator failed (non-fatal):', err?.message ?? err);
138+
logger.warn('[billing] Subscription.plan boot validator failed (non-fatal)', { error: err?.message ?? String(err) });
139139
}
140140
}
141141
};

modules/billing/controllers/billing.admin.controller.js

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import responses from '../../../lib/helpers/responses.js';
55
import getStripe from '../lib/stripe.js';
66
import BillingAdminService from '../services/billing.admin.service.js';
7-
import { AdminDeadLettersQuery } from '../models/billing.subscription.schema.js';
7+
import { AdminDeadLettersQuery, AdminOrgIdParam, AdminEventIdParam } from '../models/billing.subscription.schema.js';
88
import logger from '../../../lib/services/logger.js';
99

1010
/**
@@ -106,7 +106,11 @@ const adminBumpPlan = async (req, res) => {
106106
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js controller, not Qwik
107107
const adminGetCustomerStatus = async (req, res) => {
108108
try {
109-
const { orgId } = req.params;
109+
const parsed = AdminOrgIdParam.safeParse(req.params);
110+
if (!parsed.success) {
111+
return responses.error(res, 422, 'Unprocessable Entity', 'Invalid path parameters')(parsed.error);
112+
}
113+
const { orgId } = parsed.data;
110114
const result = await BillingAdminService.getCustomerStatus(orgId);
111115
return responses.success(res, 'customer status')(result);
112116
} catch (err) {
@@ -126,7 +130,11 @@ const adminGetCustomerStatus = async (req, res) => {
126130
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js controller, not Qwik
127131
const adminSyncFromStripe = async (req, res) => {
128132
try {
129-
const { orgId } = req.params;
133+
const parsed = AdminOrgIdParam.safeParse(req.params);
134+
if (!parsed.success) {
135+
return responses.error(res, 422, 'Unprocessable Entity', 'Invalid path parameters')(parsed.error);
136+
}
137+
const { orgId } = parsed.data;
130138
const result = await BillingAdminService.syncOrgFromStripe(orgId);
131139
return responses.success(res, 'subscription synced from Stripe')(result);
132140
} catch (err) {
@@ -190,7 +198,11 @@ const adminListDeadLetters = async (req, res) => {
190198
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js controller, not Qwik
191199
const adminPurgeDeadLetter = async (req, res) => {
192200
try {
193-
const { eventId } = req.params;
201+
const parsed = AdminEventIdParam.safeParse(req.params);
202+
if (!parsed.success) {
203+
return responses.error(res, 422, 'Unprocessable Entity', 'Invalid path parameters')(parsed.error);
204+
}
205+
const { eventId } = parsed.data;
194206
const result = await BillingAdminService.purgeDeadLetter(eventId);
195207
return responses.success(res, 'dead letter purged')(result);
196208
} catch (err) {
@@ -211,7 +223,11 @@ const adminPurgeDeadLetter = async (req, res) => {
211223
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js controller, not Qwik
212224
const adminCancelSubscription = async (req, res) => {
213225
try {
214-
const { orgId } = req.params;
226+
const parsed = AdminOrgIdParam.safeParse(req.params);
227+
if (!parsed.success) {
228+
return responses.error(res, 422, 'Unprocessable Entity', 'Invalid path parameters')(parsed.error);
229+
}
230+
const { orgId } = parsed.data;
215231
const result = await BillingAdminService.cancelSubscription(orgId);
216232
return responses.success(res, 'subscription canceled')(result);
217233
} catch (err) {
@@ -232,8 +248,13 @@ const adminCancelSubscription = async (req, res) => {
232248
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js controller, not Qwik
233249
const adminDisputeCredit = async (req, res) => {
234250
try {
251+
const parsedParams = AdminOrgIdParam.safeParse(req.params);
252+
if (!parsedParams.success) {
253+
return responses.error(res, 422, 'Unprocessable Entity', 'Invalid path parameters')(parsedParams.error);
254+
}
255+
const { orgId } = parsedParams.data;
256+
235257
const { chargeId, amountCents, reason, refundRequestId } = req.body;
236-
const { orgId } = req.params;
237258

238259
const rawAdminId = req.user?._id;
239260
if (!rawAdminId) {

modules/billing/models/billing.subscription.schema.js

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,27 @@ const AdminDisputeCreditRequest = z
146146
})
147147
.strict();
148148

149+
/**
150+
* Path parameter schemas for admin routes.
151+
* orgId: MongoDB ObjectId (24 hex chars, case-insensitive).
152+
* eventId: Stripe event ID (evt_ prefix).
153+
*/
154+
const AdminOrgIdParam = z.object({
155+
orgId: z.string().regex(/^[a-f0-9]{24}$/i, 'orgId must be a valid ObjectId'),
156+
});
157+
158+
const AdminEventIdParam = z.object({
159+
eventId: z.string().regex(/^evt_/, 'eventId must be a Stripe event ID (evt_...)'),
160+
});
161+
149162
export {
150163
AdminRefundRequest,
151164
AdminBumpPlanRequest,
152165
AdminWebhookReplayRequest,
153166
AdminDeadLettersQuery,
154167
AdminDisputeCreditRequest,
168+
AdminOrgIdParam,
169+
AdminEventIdParam,
155170
};
156171

157172
export default {
@@ -165,4 +180,6 @@ export default {
165180
AdminWebhookReplayRequest,
166181
AdminDeadLettersQuery,
167182
AdminDisputeCreditRequest,
183+
AdminOrgIdParam,
184+
AdminEventIdParam,
168185
};

modules/billing/repositories/billing.subscription.repository.js

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -312,26 +312,30 @@ const adminUpdatePlanOnly = (id, planId, adminUserId) => {
312312

313313
/**
314314
* Fetch one page of subscriptions matching given statuses for reconciliation.
315-
* Used by the billing reconcile service (replaces direct mongoose.model() access there).
315+
* Uses cursor-based pagination (_id > lastSeenId) instead of skip+limit for stability:
316+
* skip offsets shift when new documents are inserted mid-run, causing silent skips or
317+
* double-processing. The _id cursor is monotonically increasing and unaffected by inserts.
316318
* @param {string[]} statuses - Subscription statuses to include.
317-
* @param {number} page - 0-based page index.
319+
* @param {string|null} lastSeenId - ObjectId string of the last document from the previous page, or null for first page.
318320
* @param {number} limit - Page size.
319321
* @returns {Promise<Object[]>} Lean subscription documents.
320322
*/
321323
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js repository, not Qwik
322-
const findPageForReconciliation = (statuses, page, limit) =>
323-
Subscription.find(
324-
{
325-
status: { $in: statuses },
326-
stripeSubscriptionId: { $exists: true, $ne: null },
327-
},
324+
const findPageForReconciliation = (statuses, lastSeenId, limit) => {
325+
const filter = {
326+
status: { $in: statuses },
327+
stripeSubscriptionId: { $exists: true, $ne: null },
328+
...(lastSeenId ? { _id: { $gt: new mongoose.Types.ObjectId(lastSeenId) } } : {}),
329+
};
330+
return Subscription.find(
331+
filter,
328332
{ _id: 1, organization: 1, stripeSubscriptionId: 1, stripeCustomerId: 1, plan: 1, status: 1, currentPeriodStart: 1 },
329333
)
330334
.sort({ _id: 1 })
331-
.skip(page * limit)
332335
.limit(limit)
333336
.lean()
334337
.exec();
338+
};
335339

336340
export default {
337341
list,

modules/billing/services/billing.reconcile.service.js

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ const runReconciliation = async () => {
6565
let checked = 0;
6666
let divergences = 0;
6767
let errors = 0;
68-
let page = 0;
68+
let lastSeenId = null;
6969

7070
let hasMore = true;
7171
while (hasMore) {
72-
// Paginate via skip+limit — for large collections consider cursor-based pagination,
73-
// but skip is acceptable for ops crons running at low frequency.
74-
const subs = await _fetchPage(SubscriptionRepository, page, RECONCILE_PAGE_SIZE);
72+
// Cursor-based pagination via _id > lastSeen — stable across new subscriptions inserted
73+
// mid-run (skip+limit would shift offsets and silently skip or double-process docs).
74+
const subs = await _fetchPage(SubscriptionRepository, lastSeenId, RECONCILE_PAGE_SIZE);
7575
if (!subs || subs.length === 0) break;
7676

7777
for (const sub of subs) {
@@ -93,7 +93,7 @@ const runReconciliation = async () => {
9393
if (subs.length < RECONCILE_PAGE_SIZE) {
9494
hasMore = false;
9595
} else {
96-
page += 1;
96+
lastSeenId = String(subs[subs.length - 1]._id);
9797
}
9898
}
9999

@@ -102,17 +102,20 @@ const runReconciliation = async () => {
102102
};
103103

104104
/**
105-
* Fetch one page of active|past_due subscriptions.
105+
* Fetch one page of active|past_due subscriptions using cursor-based pagination.
106+
* Passing lastSeenId advances the cursor to the next page; null fetches the first page.
107+
* Cursor approach is stable across new subscription inserts during a reconcile run —
108+
* skip+limit would shift offsets and silently skip or double-process documents.
106109
* @param {Object} SubscriptionRepository - Subscription repository.
107-
* @param {number} page - 0-based page index.
110+
* @param {string|null} lastSeenId - ObjectId string of the last document from the previous page, or null for first page.
108111
* @param {number} limit - Page size.
109112
* @returns {Promise<Array>}
110113
*/
111114
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js service, not Qwik
112-
const _fetchPage = async (SubscriptionRepository, page, limit) => {
115+
const _fetchPage = async (SubscriptionRepository, lastSeenId, limit) => {
113116
return SubscriptionRepository.findPageForReconciliation(
114117
RECONCILE_STATUSES,
115-
page,
118+
lastSeenId,
116119
limit,
117120
);
118121
};

modules/billing/services/billing.webhook.service.js

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,17 @@ const handleCheckoutPaymentCompleted = async (session) => {
286286
if (!organizationId || !mongoose.Types.ObjectId.isValid(organizationId)) return;
287287
if (!packId) return;
288288

289-
await BillingExtraService.creditPack(organizationId, packId, stripeSessionId);
289+
const result = await BillingExtraService.creditPack(organizationId, packId, stripeSessionId);
290+
291+
// Observability: log when creditPack returns applied=false (duplicate session detected).
292+
// Debug level — not an error, but useful to detect phantom Stripe redeliveries in prod.
293+
if (result && !result.applied) {
294+
logger.debug('[billing.webhook] creditPack duplicate session detected — idempotency guard fired', {
295+
organizationId,
296+
packId,
297+
stripeSessionId,
298+
});
299+
}
290300

291301
// Backfill PaymentIntent metadata with the real session ID so that charge.refunded
292302
// events can correlate the charge back to this ledger entry.
@@ -1116,9 +1126,9 @@ const handleChargeDisputeFundsReinstated = async (dispute, event) => {
11161126
return;
11171127
}
11181128

1119-
// funds_reinstated is good news (dispute was won back) — log as warn, not error.
1120-
// The alert is for ops to apply a manual ledger credit via the admin endpoint.
1121-
logger.warn('[billing.webhook] dispute.funds_reinstated received — use POST /api/admin/billing/dispute/credit/:orgId to restore the extras balance', {
1129+
// funds_reinstated requires immediate manual action (apply ledger credit via admin endpoint)
1130+
// — log as error so this surfaces in error-level alerting dashboards, not buried in warn noise.
1131+
logger.error('[billing.webhook] dispute.funds_reinstated received — ACTION REQUIRED: use POST /api/admin/billing/dispute/credit/:orgId to restore the extras balance', {
11221132
disputeId,
11231133
chargeId,
11241134
amount,

0 commit comments

Comments
 (0)