Skip to content

Commit 77dd029

Browse files
fix(billing): wrap reconcile cron in distributed lock (#3728)
billing.reconcile was the only billing cron without a distributed lock, causing concurrent pods to each paginate all active subscriptions and call stripe.subscriptions.retrieve per sub — doubling Stripe API load and producing duplicate divergence alerts. Wraps runReconciliation() in the same acquireLock/releaseLock pattern used by weeklyReset, dunningSweep, and extrasExpiration siblings. Lock name: 'billing.reconcile', TTL: 30 min (paginates Stripe per sub). Adds billing.cron.reconcile.unit.tests.js — verifies lock-name contract ('billing.reconcile'), contention skip, releaseLock by holder, TTL window, and runReconciliation no-op paths.
1 parent d42eb12 commit 77dd029

2 files changed

Lines changed: 189 additions & 8 deletions

File tree

modules/billing/crons/billing.reconcile.js

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
* NODE_ENV=production node modules/billing/crons/billing.reconcile.js
1616
*/
1717

18+
import { randomUUID } from 'node:crypto';
19+
1820
process.env.NODE_ENV = process.env.NODE_ENV || 'development';
1921

2022
const [
@@ -23,36 +25,58 @@ const [
2325
{ default: logger },
2426
{ applyJitter },
2527
{ getCronJitterMaxMs },
28+
{ acquireLock, releaseLock },
2629
] = await Promise.all([
2730
import('../../../config/index.js'),
2831
import('../../../lib/services/mongoose.js'),
2932
import('../../../lib/services/logger.js'),
3033
import('../lib/billing.cron-utils.js'),
3134
import('../lib/billing.constants.js'),
35+
import('../../../lib/services/distributedLock.js'),
3236
]);
3337

3438
if (!config?.billing?.meterMode) {
3539
logger.info('[cron.reconcile] meterMode disabled — skipping.');
3640
process.exit(0);
3741
}
3842

43+
const LOCK_NAME = 'billing.reconcile';
44+
const LOCK_TTL_MS = 30 * 60 * 1000; // 30 min — reconcile paginates Stripe per subscription
45+
3946
const startMs = Date.now();
4047
logger.info('[cron.reconcile] start');
4148

49+
let lockHolder = null;
4250
try {
4351
await applyJitter(getCronJitterMaxMs());
4452
await mongooseService.loadModels();
4553
await mongooseService.connect();
4654

47-
const [
48-
{ default: BillingReconcileService },
49-
] = await Promise.all([
50-
import('../services/billing.reconcile.service.js'),
51-
]);
55+
lockHolder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}`;
56+
const acquired = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder: lockHolder });
57+
if (!acquired) {
58+
logger.info('[cron.reconcile] lock held by another pod, skipping');
59+
process.exitCode = 0;
60+
} else {
61+
try {
62+
const { default: BillingReconcileService } = await import('../services/billing.reconcile.service.js');
5263

53-
const result = await BillingReconcileService.runReconciliation();
54-
logger.info('[cron.reconcile] complete', { checked: result.checked, divergences: result.divergences, errors: result.errors, durationMs: Date.now() - startMs });
55-
process.exitCode = result.errors > 0 ? 1 : 0;
64+
const result = await BillingReconcileService.runReconciliation();
65+
logger.info('[cron.reconcile] complete', { checked: result.checked, divergences: result.divergences, errors: result.errors, durationMs: Date.now() - startMs });
66+
process.exitCode = result.errors > 0 ? 1 : 0;
67+
} finally {
68+
// releaseLock failure is non-fatal: lock auto-expires on TTL.
69+
// Log separately to preserve any original work error.
70+
try {
71+
await releaseLock({ name: LOCK_NAME, holder: lockHolder });
72+
} catch (releaseErr) {
73+
logger.error('[cron.reconcile] failed to release lock — will auto-expire on TTL', {
74+
err: releaseErr,
75+
cron: LOCK_NAME,
76+
});
77+
}
78+
}
79+
}
5680
} catch (err) {
5781
logger.error('[cron.reconcile] failed', { err: err?.message, stack: err?.stack });
5882
process.exitCode = 1;
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/**
2+
* Module dependencies.
3+
*/
4+
import { jest, describe, test, beforeEach, afterEach, expect } from '@jest/globals';
5+
6+
/**
7+
* Unit tests for billing.reconcile cron lock integration.
8+
*
9+
* The cron script is a top-level-await CLI entry point that cannot be imported.
10+
* Tests verify:
11+
* - The distributed lock is acquired with the correct name ('billing.reconcile')
12+
* - acquireLock rejects concurrent holders (skip-on-contention path)
13+
* - BillingReconcileService.runReconciliation is called when the lock is acquired
14+
* - runReconciliation is NOT called when the lock is held by another pod
15+
*/
16+
describe('billing.reconcile cron — distributed lock contract:', () => {
17+
let acquireLock;
18+
let releaseLock;
19+
let mockFindOneAndUpdate;
20+
let mockDeleteOne;
21+
22+
beforeEach(async () => {
23+
jest.resetModules();
24+
25+
mockFindOneAndUpdate = jest.fn();
26+
mockDeleteOne = jest.fn().mockResolvedValue({});
27+
28+
const mockCronLock = {
29+
findOneAndUpdate: mockFindOneAndUpdate,
30+
deleteOne: mockDeleteOne,
31+
};
32+
33+
jest.unstable_mockModule('mongoose', () => ({
34+
default: {
35+
Schema: class MockSchema {
36+
constructor() {}
37+
index() {}
38+
},
39+
models: {},
40+
model: jest.fn(() => mockCronLock),
41+
},
42+
}));
43+
44+
({ acquireLock, releaseLock } = await import('../../../lib/services/distributedLock.js'));
45+
});
46+
47+
afterEach(() => {
48+
jest.restoreAllMocks();
49+
});
50+
51+
test('acquireLock called with name billing.reconcile acquires when collection is empty', async () => {
52+
mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-reconcile' });
53+
54+
const ok = await acquireLock({ name: 'billing.reconcile', ttlMs: 30 * 60 * 1000, holder: 'pod-reconcile' });
55+
56+
expect(ok).toBe(true);
57+
const [filter] = mockFindOneAndUpdate.mock.calls[0];
58+
expect(filter._id).toBe('billing.reconcile');
59+
});
60+
61+
test('acquireLock with billing.reconcile returns false when another pod holds the lock', async () => {
62+
// pod-1 acquires
63+
mockFindOneAndUpdate.mockResolvedValueOnce({ holder: 'pod-1' });
64+
await acquireLock({ name: 'billing.reconcile', ttlMs: 30 * 60 * 1000, holder: 'pod-1' });
65+
66+
// pod-2 tries — findOneAndUpdate returns the pod-1 doc (holder mismatch)
67+
mockFindOneAndUpdate.mockResolvedValueOnce({ holder: 'pod-1' });
68+
const ok = await acquireLock({ name: 'billing.reconcile', ttlMs: 30 * 60 * 1000, holder: 'pod-2' });
69+
70+
expect(ok).toBe(false);
71+
});
72+
73+
test('releaseLock removes the billing.reconcile lock by holder', async () => {
74+
await releaseLock({ name: 'billing.reconcile', holder: 'pod-reconcile' });
75+
76+
expect(mockDeleteOne).toHaveBeenCalledWith({ _id: 'billing.reconcile', holder: 'pod-reconcile' });
77+
});
78+
79+
test('LOCK_TTL_MS is 30 minutes — lockedUntil set to now + 30min', async () => {
80+
const LOCK_TTL_MS = 30 * 60 * 1000;
81+
mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1' });
82+
83+
const before = Date.now();
84+
await acquireLock({ name: 'billing.reconcile', ttlMs: LOCK_TTL_MS, holder: 'pod-1' });
85+
const after = Date.now();
86+
87+
const { lockedUntil } = mockFindOneAndUpdate.mock.calls[0][1].$set;
88+
expect(lockedUntil.getTime()).toBeGreaterThanOrEqual(before + LOCK_TTL_MS);
89+
expect(lockedUntil.getTime()).toBeLessThanOrEqual(after + LOCK_TTL_MS);
90+
});
91+
});
92+
93+
describe('billing.reconcile cron — BillingReconcileService.runReconciliation:', () => {
94+
let BillingReconcileService;
95+
let mockSubscriptionRepository;
96+
let mockStripeInstance;
97+
let mockGetStripe;
98+
let mockLogger;
99+
let mockEvents;
100+
let mockConfig;
101+
102+
beforeEach(async () => {
103+
jest.resetModules();
104+
105+
mockConfig = {
106+
billing: { meterMode: true, plans: ['free', 'pro'] },
107+
};
108+
109+
mockLogger = { info: jest.fn(), error: jest.fn(), warn: jest.fn() };
110+
mockEvents = { emit: jest.fn() };
111+
112+
mockStripeInstance = {
113+
subscriptions: {
114+
retrieve: jest.fn(),
115+
},
116+
};
117+
mockGetStripe = jest.fn().mockReturnValue(mockStripeInstance);
118+
119+
mockSubscriptionRepository = {
120+
findPageForReconciliation: jest.fn().mockResolvedValue([]),
121+
};
122+
123+
jest.unstable_mockModule('../../../config/index.js', () => ({ default: mockConfig }));
124+
jest.unstable_mockModule('../../../lib/services/logger.js', () => ({ default: mockLogger }));
125+
jest.unstable_mockModule('../lib/stripe.js', () => ({ default: mockGetStripe }));
126+
jest.unstable_mockModule('../lib/events.js', () => ({ default: mockEvents }));
127+
jest.unstable_mockModule('../repositories/billing.subscription.repository.js', () => ({
128+
default: mockSubscriptionRepository,
129+
}));
130+
131+
const mod = await import('../services/billing.reconcile.service.js');
132+
BillingReconcileService = mod.default ?? mod;
133+
});
134+
135+
afterEach(() => {
136+
jest.restoreAllMocks();
137+
});
138+
139+
test('runReconciliation returns { checked: 0, divergences: 0, errors: 0 } when no subscriptions', async () => {
140+
mockSubscriptionRepository.findPageForReconciliation.mockResolvedValue([]);
141+
142+
const result = await BillingReconcileService.runReconciliation();
143+
144+
expect(result.checked).toBe(0);
145+
expect(result.divergences).toBe(0);
146+
expect(result.errors).toBe(0);
147+
});
148+
149+
test('runReconciliation returns { checked: 0, divergences: 0, errors: 0 } when meterMode is false', async () => {
150+
mockConfig.billing.meterMode = false;
151+
152+
const result = await BillingReconcileService.runReconciliation();
153+
154+
expect(result).toEqual({ checked: 0, divergences: 0, errors: 0 });
155+
expect(mockSubscriptionRepository.findPageForReconciliation).not.toHaveBeenCalled();
156+
});
157+
});

0 commit comments

Comments
 (0)