Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions modules/billing/controllers/billing.webhook.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ const handleWebhook = async (req, res) => {

const sig = req.headers['stripe-signature'];
const { webhookSecret } = config.stripe;
// req.id is injected by the global requestId middleware (lib/middlewares/requestId.js).
// Propagate it through to enable end-to-end traceability across webhook log lines.
const requestId = req.id;

let event;
try {
Expand All @@ -30,70 +33,83 @@ const handleWebhook = async (req, res) => {
return responses.error(res, 400, 'Bad Request', 'Webhook signature verification failed')(err);
}

logger.info('[billing.webhook] received', { type: event.type, id: event.id, requestId });

try {
switch (event.type) {
case 'checkout.session.completed':
await BillingWebhookService.withIdempotency(event, (e) =>
BillingWebhookService.handleCheckoutSessionCompleted(e),
{ requestId },
);
break;
case 'customer.subscription.created':
await BillingWebhookService.withIdempotency(event, (e) =>
BillingWebhookService.handleSubscriptionCreated(e.data.object, e),
{ requestId },
);
break;
case 'customer.subscription.updated':
await BillingWebhookService.withIdempotency(event, (e) =>
BillingWebhookService.handleSubscriptionUpdated(e.data.object, e),
{ requestId },
);
break;
case 'customer.subscription.deleted':
await BillingWebhookService.withIdempotency(event, (e) =>
BillingWebhookService.handleSubscriptionDeleted(e.data.object, e),
{ requestId },
);
break;
case 'invoice.payment_failed':
await BillingWebhookService.withIdempotency(event, (e) =>
BillingWebhookService.handleInvoicePaymentFailed(e.data.object, e),
{ requestId },
);
break;
case 'invoice.payment_succeeded':
await BillingWebhookService.withIdempotency(event, (e) =>
BillingWebhookService.handleInvoicePaymentSucceeded(e.data.object, e),
{ requestId },
);
break;
case 'charge.refunded':
await BillingWebhookService.withIdempotency(event, (e) =>
BillingWebhookService.handleChargeRefunded(e.data.object),
{ requestId },
);
break;
case 'customer.deleted':
await BillingWebhookService.withIdempotency(event, (e) =>
BillingWebhookService.handleCustomerDeleted(e.data.object, e),
{ requestId },
);
break;
case 'charge.dispute.created':
await BillingWebhookService.withIdempotency(event, (e) =>
BillingWebhookService.handleChargeDisputeCreated(e.data.object, e),
{ requestId },
);
break;
case 'charge.dispute.funds_withdrawn':
await BillingWebhookService.withIdempotency(event, (e) =>
BillingWebhookService.handleChargeDisputeFundsWithdrawn(e.data.object, e),
{ requestId },
);
break;
case 'charge.dispute.funds_reinstated':
await BillingWebhookService.withIdempotency(event, (e) =>
BillingWebhookService.handleChargeDisputeFundsReinstated(e.data.object, e),
{ requestId },
);
break;
default:
logger.info('[billing.webhook] unhandled event type', { type: event.type, id: event.id });
logger.info('[billing.webhook] unhandled event type', { type: event.type, id: event.id, requestId });
break;
}
return res.status(200).json({ received: true });
} catch (err) {
logger.error('Stripe webhook handler error:', err);
logger.error('[billing.webhook] handler error', { error: err?.message ?? String(err), stack: err?.stack, requestId });
return responses.error(res, 500, 'Internal Server Error', 'Webhook handler failed')(err);
}
};
Expand Down
11 changes: 2 additions & 9 deletions modules/billing/lib/billing.constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,11 @@ export const DEFAULT_EXTRAS_EXHAUSTED_EVENT = 'billing.extras_debit.exhausted';
export const DEFAULT_GRACE_PERIOD_DAYS = 7;
export const DEFAULT_DUNNING_THRESHOLD_DAYS = 14;

/**
* Floor charge per run. `runBaseUnits` is kept as a backward-compatible alias
* for downstream projects that already adopted the earlier config name.
*/
export const METER_RUN_BASE =
config?.billing?.meter?.runBase
?? config?.billing?.meter?.runBaseUnits
?? DEFAULT_METER_RUN_BASE;

/**
* @function getMeterRunBase
* @description Resolve the configured base units charged when no costs are present.
* Evaluated lazily on each call so test overrides and runtime config
* changes are always reflected (avoids the stale module-scope const pattern).
* @returns {number} Meter run base units.
*/
export const getMeterRunBase = () =>
Comment on lines 25 to 32
Expand Down
22 changes: 22 additions & 0 deletions modules/billing/models/billing.usage.model.mongoose.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,28 @@ UsageMongoose.index(
*/
UsageMongoose.index({ organizationId: 1, weekKey: 1 }, { unique: true, sparse: true });

/**
* TTL index: automatically purge archived usage documents after 1 year.
*
* Conditional on archivedAt being set (non-null) — active (non-archived) documents
* are excluded from this TTL via partialFilterExpression, so only past periods are
* eligible for purge. The 1-year retention keeps the audit trail long enough for
* billing reconciliation while preventing unbounded collection growth.
*
* Notes:
* - The TTL daemon runs once per minute; actual deletion may lag by up to ~1min.
* - Verify this index with: db.billingusages.getIndexes() — look for expireAfterSeconds.
* - archivedAt is set by archiveOtherWeeks() in the usage repository when the
* weekly reset sweep moves old weeks to the archive state.
*/
UsageMongoose.index(
{ archivedAt: 1 },
{
expireAfterSeconds: 365 * 24 * 60 * 60,
partialFilterExpression: { archivedAt: { $type: 'date' } },
},
);

/**
* Returns the hex string representation of the document ObjectId.
* @returns {string} Hex string of the ObjectId.
Expand Down
55 changes: 55 additions & 0 deletions modules/billing/repositories/billing.extraBalance.repository.js
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,60 @@ const getBalance = async (orgId) => {
return doc ? doc.cachedBalance : 0;
};

/**
* @function listLedgerPage
* @description Return a paginated slice of the ledger array for an organization using
* MongoDB aggregation `$slice` — only the requested page is transferred over the
* wire, avoiding full-document fetches for large ledgers (1000+ entries).
*
* Entries are sorted descending by `at` (newest first) at the aggregation layer.
* The aggregation pipeline is:
* 1. $match — find the org's document
* 2. $project — sort + slice the ledger, plus cachedBalance and _id=0
*
* Returns null when no document exists for the org yet (balance = 0).
*
* @param {string} orgId - The organization ObjectId (string).
* @param {number} skip - Number of entries to skip (0-based).
* @param {number} limit - Maximum number of entries to return.
* @returns {Promise<{ledgerPage: Object[], total: number, cachedBalance: number}|null>}
* null when the org has no ExtraBalance document.
*/
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js repository, not Qwik
const listLedgerPage = async (orgId, skip, limit) => {
if (!isValidOrgId(orgId)) return null;
if (!Number.isFinite(skip) || skip < 0) throw new TypeError('skip must be a non-negative number');
if (!Number.isFinite(limit) || limit <= 0) throw new TypeError('limit must be a positive number');

const results = await BillingExtraBalance().aggregate([
{ $match: { organization: new mongoose.Types.ObjectId(orgId) } },
{
$project: {
_id: 0,
cachedBalance: 1,
total: { $size: { $ifNull: ['$ledger', []] } },
// Sort descending by `at` then slice the requested page.
// $ifNull guards against missing/null ledger field on legacy docs.
ledgerPage: {
$slice: [
{
$sortArray: {
input: { $ifNull: ['$ledger', []] },
sortBy: { at: -1 },
},
},
skip,
limit,
],
},
},
Comment on lines +288 to +306
},
]).exec();

if (!results || results.length === 0) return null;
return results[0];
};

/**
* @function findOrgsWithExpiringTopups
* @description Return the distinct organizationIds that have at least one topup ledger entry
Expand Down Expand Up @@ -308,5 +362,6 @@ export default {
addExpirationEntries,
refundPartial,
getBalance,
listLedgerPage,
findOrgsWithExpiringTopups,
};
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,37 @@ const deleteByEventId = async (eventId) => {
* @function incrementAttempts
* @description Atomically increment the attempts counter and record the last error details
* on the processed event document. Used by withIdempotency to track retry depth.
* Stores the full stack trace when available (err.stack), falling back to the
* message string — provides full context for ops investigation of dead-letters.
* @param {string} eventId - Stripe event ID.
* @param {string} errorMessage - Error message from the last failed handler execution.
* @param {string|Error} errorOrMessage - Error object (preferred) or message string from the
* last failed handler execution. When an Error is passed, err.stack is persisted; otherwise
* the string value is stored as-is.
* @returns {Promise<Object|null>} Updated document or null if not found.
*/
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js repository, not Qwik
const incrementAttempts = async (eventId, errorMessage) => {
const incrementAttempts = async (eventId, errorOrMessage) => {
if (typeof eventId !== 'string' || eventId.trim() === '') {
throw new Error('invalid argument: eventId must be a non-empty string');
}
// Persist stack trace when available — provides full context for ops investigation.
// Falls back gracefully for non-Error throws (string, plain object, etc.).
// JSON.stringify for plain objects avoids the uninformative "[object Object]" fallback.
let lastError;
if (errorOrMessage instanceof Error) {
lastError = errorOrMessage.stack || errorOrMessage.message || String(errorOrMessage);
} else if (errorOrMessage === null || errorOrMessage === undefined) {
lastError = '';
} else if (typeof errorOrMessage === 'object') {
try { lastError = JSON.stringify(errorOrMessage); } catch { lastError = String(errorOrMessage); }
} else {
lastError = String(errorOrMessage);
}
return ProcessedStripeEvent().findOneAndUpdate(
{ eventId },
{
$inc: { attempts: 1 },
$set: { lastError: String(errorMessage ?? ''), lastErrorAt: new Date() },
$set: { lastError, lastErrorAt: new Date() },
},
{ returnDocument: 'after' },
).exec();
Expand Down
27 changes: 16 additions & 11 deletions modules/billing/services/billing.extra.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ const refundPartial = async (orgId, stripeSessionId, amountRefundedCents, packId
* @description Return a paginated slice of the ledger for an organization.
* Entries are returned in reverse chronological order (newest first).
*
* Uses MongoDB aggregation `$slice` via the repository so only the
* requested page is transferred over the wire — avoids loading the full
* ledger array into memory on large accounts (1000+ entries).
*
* @param {string} orgId - The organization ObjectId (string).
* @param {Object} [options={}] - Pagination options.
* @param {number} [options.page=1] - 1-based page number.
Expand All @@ -224,17 +228,18 @@ const refundPartial = async (orgId, stripeSessionId, amountRefundedCents, packId
*/
// biome-ignore lint/correctness/useQwikValidLexicalScope: false positive — Node.js service, not Qwik
const listLedger = async (orgId, { page = 1, limit = 20 } = {}) => {
const doc = await BillingExtraBalanceRepository.getOrCreate(orgId);
if (!doc) return { entries: [], total: 0, balance: 0 };
const ledger = doc.ledger ?? [];
const total = ledger.length;

// Sort descending by at date
const sorted = [...ledger].sort((a, b) => new Date(b.at) - new Date(a.at));
const start = (page - 1) * limit;
const entries = sorted.slice(start, start + limit);

return { entries, total, balance: doc.cachedBalance ?? 0 };
const safePage = Math.max(1, Math.floor(Number(page)) || 1);
const safeLimit = Math.max(1, Math.floor(Number(limit)) || 20);
const skip = (safePage - 1) * safeLimit;

const result = await BillingExtraBalanceRepository.listLedgerPage(orgId, skip, safeLimit);
if (!result) return { entries: [], total: 0, balance: 0 };

return {
entries: result.ledgerPage ?? [],
total: result.total ?? 0,
balance: result.cachedBalance ?? 0,
};
};

export default {
Expand Down
5 changes: 1 addition & 4 deletions modules/billing/services/billing.meter.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@ import {
getMeterRunBase,
getDollarsToUnitRatio,
getMaxUnitsPerOperation,
METER_RUN_BASE,
} from '../lib/billing.constants.js';

export { METER_RUN_BASE };

/**
* @function unitsFromCosts
* @description Convert a feature-keyed cost map (in USD) to meter units using
Expand Down Expand Up @@ -237,5 +234,5 @@ export default {
unitsFromCosts,
attribute,
capBreakdown,
METER_RUN_BASE,
getMeterRunBase,
};
43 changes: 42 additions & 1 deletion modules/billing/services/billing.plans.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,14 @@ const fetchPlansFromStripe = async (stripe) => {
pricesByProduct[price.product].push(price);
}

// Build reverse-lookup: priceId → [planId, ...] to detect price ID reuse across plans.
// A Stripe price ID mapped to multiple plans is a configuration error that causes silent
// billing misrouting — warn explicitly with both plan IDs + the duplicate price ID.
const priceIdToPlanIds = {};

const plans = products.map((product) => {
const productPrices = pricesByProduct[product.id] || [];
const planId = product.metadata?.planId || product.id;

let monthlyPrice = 0;
let annualPrice = 0;
Expand All @@ -82,10 +88,15 @@ const fetchPlansFromStripe = async (stripe) => {
annualPrice = amount / 100;
stripePriceAnnual = price.id;
}
// Track price-to-plan mapping for duplicate detection
if (price.id) {
if (!priceIdToPlanIds[price.id]) priceIdToPlanIds[price.id] = [];
priceIdToPlanIds[price.id].push(planId);
}
}

return {
planId: product.metadata?.planId || product.id,
planId,
name: product.name,
monthlyPrice,
annualPrice,
Expand All @@ -94,6 +105,17 @@ const fetchPlansFromStripe = async (stripe) => {
};
});

// Emit explicit warn for any Stripe price ID mapped to more than one plan.
// This is a Stripe account configuration error that causes silent billing misrouting.
for (const [priceId, planIds] of Object.entries(priceIdToPlanIds)) {
if (planIds.length > 1) {
logger.warn('[billing.plans] duplicate Stripe price ID mapped to multiple plans — config error', {
priceId,
planIds,
});
}
}

return plans.sort((a, b) => a.monthlyPrice - b.monthlyPrice);
};

Expand Down Expand Up @@ -164,6 +186,25 @@ const getPlans = async () => {
return inFlightFetch;
};

/**
* @desc Reset the in-memory plan cache.
*
* Exposed for test isolation: each test that exercises different Stripe configurations
* should call `clearPlansCache()` in `beforeEach`/`afterEach` to prevent stale module-scope
* state from leaking between test files (ESM module cache is per-worker but shared across
* tests in the same worker file when `jest.resetModules()` is not in use).
*
* NOT intended for production use — the cache is self-refreshing via stale-while-revalidate.
*/
const clearPlansCache = () => {
cachedPlans = null;
cacheTimestamp = 0;
stalePlans = null;
staleTimestamp = 0;
inFlightFetch = null;
};

export default {
getPlans,
clearPlansCache,
};
Loading
Loading