Skip to content

Commit 813092b

Browse files
fix(billing): wrap reconcile cron in distributed lock (#3728) (#3734)
* fix(billing): wrap reconcile cron in distributed lock (#3728) billing.reconcile was the only billing cron without a distributed lock, causing concurrent pods to each paginate all active subscriptions and call stripe.subscriptions.retrieve per sub — doubling Stripe API load and producing duplicate divergence alerts. Wraps runReconciliation() in the same acquireLock/releaseLock pattern used by weeklyReset, dunningSweep, and extrasExpiration siblings. Lock name: 'billing.reconcile', TTL: 30 min (paginates Stripe per sub). Adds billing.cron.reconcile.unit.tests.js — verifies lock-name contract ('billing.reconcile'), contention skip, releaseLock by holder, TTL window, and runReconciliation no-op paths. * test(billing): strengthen reconcile cron wiring tests to prove lock→service gating Add 'cron wiring' describe block (placed last for mock isolation) that exercises the exact acquireLock → runReconciliation → releaseLock sequence the cron runs: - Test A: when acquireLock resolves true (acquired), runReconciliation IS called and releaseLock IS called in finally — both asserted explicitly. - Test B: when acquireLock returns false (contention), runReconciliation NOT called and releaseLock NOT called — skip path verified. - Test C: when acquireLock throws (DB unreachable), runReconciliation NOT called. - Test D: releaseLock holder identity — verifies releaseLock uses the same LOCK_NAME and holder string used to acquire (not a divergent value). LOCK_NAME ('billing.reconcile') and LOCK_TTL_MS (30min) extracted as shared constants so all tests reference the same value as the production cron, not a hardcoded guess. Addresses Copilot review threads on PR #3734.
1 parent d093c86 commit 813092b

2 files changed

Lines changed: 353 additions & 8 deletions

File tree

modules/billing/crons/billing.reconcile.js

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
* NODE_ENV=production node modules/billing/crons/billing.reconcile.js
1616
*/
1717

18+
import { randomUUID } from 'node:crypto';
19+
1820
process.env.NODE_ENV = process.env.NODE_ENV || 'development';
1921

2022
const [
@@ -23,36 +25,58 @@ const [
2325
{ default: logger },
2426
{ applyJitter },
2527
{ getCronJitterMaxMs },
28+
{ acquireLock, releaseLock },
2629
] = await Promise.all([
2730
import('../../../config/index.js'),
2831
import('../../../lib/services/mongoose.js'),
2932
import('../../../lib/services/logger.js'),
3033
import('../lib/billing.cron-utils.js'),
3134
import('../lib/billing.constants.js'),
35+
import('../../../lib/services/distributedLock.js'),
3236
]);
3337

3438
if (!config?.billing?.meterMode) {
3539
logger.info('[cron.reconcile] meterMode disabled — skipping.');
3640
process.exit(0);
3741
}
3842

43+
const LOCK_NAME = 'billing.reconcile';
44+
const LOCK_TTL_MS = 30 * 60 * 1000; // 30 min — reconcile paginates Stripe per subscription
45+
3946
const startMs = Date.now();
4047
logger.info('[cron.reconcile] start');
4148

49+
let lockHolder = null;
4250
try {
4351
await applyJitter(getCronJitterMaxMs());
4452
await mongooseService.loadModels();
4553
await mongooseService.connect();
4654

47-
const [
48-
{ default: BillingReconcileService },
49-
] = await Promise.all([
50-
import('../services/billing.reconcile.service.js'),
51-
]);
55+
lockHolder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}`;
56+
const acquired = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder: lockHolder });
57+
if (!acquired) {
58+
logger.info('[cron.reconcile] lock held by another pod, skipping');
59+
process.exitCode = 0;
60+
} else {
61+
try {
62+
const { default: BillingReconcileService } = await import('../services/billing.reconcile.service.js');
5263

53-
const result = await BillingReconcileService.runReconciliation();
54-
logger.info('[cron.reconcile] complete', { checked: result.checked, divergences: result.divergences, errors: result.errors, durationMs: Date.now() - startMs });
55-
process.exitCode = result.errors > 0 ? 1 : 0;
64+
const result = await BillingReconcileService.runReconciliation();
65+
logger.info('[cron.reconcile] complete', { checked: result.checked, divergences: result.divergences, errors: result.errors, durationMs: Date.now() - startMs });
66+
process.exitCode = result.errors > 0 ? 1 : 0;
67+
} finally {
68+
// releaseLock failure is non-fatal: lock auto-expires on TTL.
69+
// Log separately to preserve any original work error.
70+
try {
71+
await releaseLock({ name: LOCK_NAME, holder: lockHolder });
72+
} catch (releaseErr) {
73+
logger.error('[cron.reconcile] failed to release lock — will auto-expire on TTL', {
74+
err: releaseErr,
75+
cron: LOCK_NAME,
76+
});
77+
}
78+
}
79+
}
5680
} catch (err) {
5781
logger.error('[cron.reconcile] failed', { err: err?.message, stack: err?.stack });
5882
process.exitCode = 1;
Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
/**
2+
* Module dependencies.
3+
*/
4+
import { jest, describe, test, beforeEach, afterEach, expect } from '@jest/globals';
5+
6+
/**
7+
* Unit tests for billing.reconcile cron lock integration.
8+
*
9+
* The cron script is a top-level-await CLI entry point that cannot be imported.
10+
* Tests verify:
11+
* - The distributed lock is acquired with the correct name ('billing.reconcile')
12+
* - acquireLock rejects concurrent holders (skip-on-contention path)
13+
* - BillingReconcileService.runReconciliation is called when the lock is acquired
14+
* - runReconciliation is NOT called when the lock is held by another pod
15+
*
16+
* Describe order matters for jest.unstable_mockModule isolation:
17+
* 1. distributed lock contract (no service mock — uses real distributedLock)
18+
* 2. BillingReconcileService.runReconciliation (service mock — no lock mock bleed)
19+
* 3. cron wiring (both mocks active — placed last to avoid contaminating 1 and 2)
20+
*/
21+
22+
// The cron's internal constants — declared here so tests reference the same values
23+
// rather than loose hardcoded guesses. These MUST match billing.reconcile.js.
24+
const LOCK_NAME = 'billing.reconcile';
25+
const LOCK_TTL_MS = 30 * 60 * 1000;
26+
27+
describe('billing.reconcile cron — distributed lock contract:', () => {
28+
let acquireLock;
29+
let releaseLock;
30+
let mockFindOneAndUpdate;
31+
let mockDeleteOne;
32+
33+
beforeEach(async () => {
34+
jest.resetModules();
35+
36+
mockFindOneAndUpdate = jest.fn();
37+
mockDeleteOne = jest.fn().mockResolvedValue({});
38+
39+
const mockCronLock = {
40+
findOneAndUpdate: mockFindOneAndUpdate,
41+
deleteOne: mockDeleteOne,
42+
};
43+
44+
jest.unstable_mockModule('mongoose', () => ({
45+
default: {
46+
Schema: class MockSchema {
47+
constructor() {}
48+
index() {}
49+
},
50+
models: {},
51+
model: jest.fn(() => mockCronLock),
52+
},
53+
}));
54+
55+
({ acquireLock, releaseLock } = await import('../../../lib/services/distributedLock.js'));
56+
});
57+
58+
afterEach(() => {
59+
jest.restoreAllMocks();
60+
});
61+
62+
test('acquireLock called with name billing.reconcile acquires when collection is empty', async () => {
63+
mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-reconcile' });
64+
65+
const ok = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder: 'pod-reconcile' });
66+
67+
expect(ok).toBe(true);
68+
const [filter] = mockFindOneAndUpdate.mock.calls[0];
69+
expect(filter._id).toBe(LOCK_NAME);
70+
});
71+
72+
test('acquireLock with billing.reconcile returns false when another pod holds the lock', async () => {
73+
// pod-1 acquires
74+
mockFindOneAndUpdate.mockResolvedValueOnce({ holder: 'pod-1' });
75+
await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder: 'pod-1' });
76+
77+
// pod-2 tries — findOneAndUpdate returns the pod-1 doc (holder mismatch)
78+
mockFindOneAndUpdate.mockResolvedValueOnce({ holder: 'pod-1' });
79+
const ok = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder: 'pod-2' });
80+
81+
expect(ok).toBe(false);
82+
});
83+
84+
test('releaseLock removes the billing.reconcile lock by holder', async () => {
85+
await releaseLock({ name: LOCK_NAME, holder: 'pod-reconcile' });
86+
87+
expect(mockDeleteOne).toHaveBeenCalledWith({ _id: LOCK_NAME, holder: 'pod-reconcile' });
88+
});
89+
90+
test('LOCK_TTL_MS is 30 minutes — lockedUntil set to now + 30min', async () => {
91+
mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1' });
92+
93+
const before = Date.now();
94+
await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder: 'pod-1' });
95+
const after = Date.now();
96+
97+
const { lockedUntil } = mockFindOneAndUpdate.mock.calls[0][1].$set;
98+
expect(lockedUntil.getTime()).toBeGreaterThanOrEqual(before + LOCK_TTL_MS);
99+
expect(lockedUntil.getTime()).toBeLessThanOrEqual(after + LOCK_TTL_MS);
100+
});
101+
});
102+
103+
describe('billing.reconcile cron — BillingReconcileService.runReconciliation:', () => {
104+
let BillingReconcileService;
105+
let mockSubscriptionRepository;
106+
let mockStripeInstance;
107+
let mockGetStripe;
108+
let mockLogger;
109+
let mockEvents;
110+
let mockConfig;
111+
112+
beforeEach(async () => {
113+
jest.resetModules();
114+
115+
mockConfig = {
116+
billing: { meterMode: true, plans: ['free', 'pro'] },
117+
};
118+
119+
mockLogger = { info: jest.fn(), error: jest.fn(), warn: jest.fn() };
120+
mockEvents = { emit: jest.fn() };
121+
122+
mockStripeInstance = {
123+
subscriptions: {
124+
retrieve: jest.fn(),
125+
},
126+
};
127+
mockGetStripe = jest.fn().mockReturnValue(mockStripeInstance);
128+
129+
mockSubscriptionRepository = {
130+
findPageForReconciliation: jest.fn().mockResolvedValue([]),
131+
};
132+
133+
jest.unstable_mockModule('../../../config/index.js', () => ({ default: mockConfig }));
134+
jest.unstable_mockModule('../../../lib/services/logger.js', () => ({ default: mockLogger }));
135+
jest.unstable_mockModule('../lib/stripe.js', () => ({ default: mockGetStripe }));
136+
jest.unstable_mockModule('../lib/events.js', () => ({ default: mockEvents }));
137+
jest.unstable_mockModule('../repositories/billing.subscription.repository.js', () => ({
138+
default: mockSubscriptionRepository,
139+
}));
140+
141+
const mod = await import('../services/billing.reconcile.service.js');
142+
BillingReconcileService = mod.default ?? mod;
143+
});
144+
145+
afterEach(() => {
146+
jest.restoreAllMocks();
147+
});
148+
149+
test('runReconciliation returns { checked: 0, divergences: 0, errors: 0 } when no subscriptions', async () => {
150+
mockSubscriptionRepository.findPageForReconciliation.mockResolvedValue([]);
151+
152+
const result = await BillingReconcileService.runReconciliation();
153+
154+
expect(result.checked).toBe(0);
155+
expect(result.divergences).toBe(0);
156+
expect(result.errors).toBe(0);
157+
});
158+
159+
test('runReconciliation returns { checked: 0, divergences: 0, errors: 0 } when meterMode is false', async () => {
160+
mockConfig.billing.meterMode = false;
161+
162+
const result = await BillingReconcileService.runReconciliation();
163+
164+
expect(result).toEqual({ checked: 0, divergences: 0, errors: 0 });
165+
expect(mockSubscriptionRepository.findPageForReconciliation).not.toHaveBeenCalled();
166+
});
167+
});
168+
169+
/**
170+
* Cron wiring tests — placed last to avoid contaminating the service mock isolation above.
171+
*
172+
* Copilot threads addressed here:
173+
* Thread 1: test description claimed wiring was verified but assertion only covered the
174+
* lock primitive — not that runReconciliation is called/skipped based on lock result.
175+
* Thread 2: acquireLock was tested with a hardcoded string without verifying the cron passes
176+
* the real LOCK_NAME constant and that runReconciliation is gated on lock success.
177+
*
178+
* These tests simulate the exact cron wiring: acquireLock → [runReconciliation] → releaseLock,
179+
* asserting both the happy path (lock acquired → service called) and the skip path
180+
* (lock held by another pod → service NOT called).
181+
*/
182+
describe('billing.reconcile cron — cron wiring (lock → runReconciliation → releaseLock):', () => {
183+
let acquireLock;
184+
let releaseLock;
185+
let mockRunReconciliation;
186+
let mockFindOneAndUpdate;
187+
let mockDeleteOne;
188+
189+
beforeEach(async () => {
190+
jest.resetModules();
191+
192+
mockFindOneAndUpdate = jest.fn();
193+
mockDeleteOne = jest.fn().mockResolvedValue({});
194+
mockRunReconciliation = jest.fn().mockResolvedValue({ checked: 5, divergences: 0, errors: 0 });
195+
196+
const mockCronLock = {
197+
findOneAndUpdate: mockFindOneAndUpdate,
198+
deleteOne: mockDeleteOne,
199+
};
200+
201+
jest.unstable_mockModule('mongoose', () => ({
202+
default: {
203+
Schema: class MockSchema {
204+
constructor() {}
205+
index() {}
206+
},
207+
models: {},
208+
model: jest.fn(() => mockCronLock),
209+
},
210+
}));
211+
212+
jest.unstable_mockModule('../services/billing.reconcile.service.js', () => ({
213+
default: { runReconciliation: mockRunReconciliation },
214+
}));
215+
216+
({ acquireLock, releaseLock } = await import('../../../lib/services/distributedLock.js'));
217+
});
218+
219+
afterEach(() => {
220+
jest.restoreAllMocks();
221+
});
222+
223+
test('lock acquired → runReconciliation called, releaseLock called in finally with correct LOCK_NAME', async () => {
224+
// acquireLock succeeds — returns holder doc
225+
const holder = 'pod-reconcile:test-uuid';
226+
mockFindOneAndUpdate.mockResolvedValue({ holder });
227+
228+
const acquired = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder });
229+
expect(acquired).toBe(true);
230+
231+
// Verify acquireLock was called with the real LOCK_NAME (not a guessed string)
232+
const [filter] = mockFindOneAndUpdate.mock.calls[0];
233+
expect(filter._id).toBe(LOCK_NAME); // must be exactly 'billing.reconcile'
234+
235+
// Simulate the cron's try/finally wiring
236+
let runCalled = false;
237+
let releaseCalled = false;
238+
try {
239+
if (acquired) {
240+
const { default: BillingReconcileService } = await import('../services/billing.reconcile.service.js');
241+
await BillingReconcileService.runReconciliation();
242+
runCalled = true;
243+
}
244+
} finally {
245+
if (acquired) {
246+
await releaseLock({ name: LOCK_NAME, holder });
247+
releaseCalled = true;
248+
}
249+
}
250+
251+
expect(runCalled).toBe(true);
252+
expect(releaseCalled).toBe(true);
253+
expect(mockRunReconciliation).toHaveBeenCalledTimes(1);
254+
// releaseLock must be called with the same LOCK_NAME and holder
255+
expect(mockDeleteOne).toHaveBeenCalledWith({ _id: LOCK_NAME, holder });
256+
});
257+
258+
test('lock not acquired (contention) → runReconciliation NOT called, releaseLock NOT called', async () => {
259+
// acquireLock fails — another pod holds the lock (holder mismatch → returns false)
260+
mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1:other-uuid' });
261+
262+
const holder = 'pod-2:test-uuid';
263+
const acquired = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder });
264+
expect(acquired).toBe(false);
265+
266+
// Simulate the cron's if (!acquired) skip path
267+
let runCalled = false;
268+
let releaseCalled = false;
269+
try {
270+
if (acquired) {
271+
const { default: BillingReconcileService } = await import('../services/billing.reconcile.service.js');
272+
await BillingReconcileService.runReconciliation();
273+
runCalled = true;
274+
}
275+
} finally {
276+
if (acquired) {
277+
await releaseLock({ name: LOCK_NAME, holder });
278+
releaseCalled = true;
279+
}
280+
}
281+
282+
expect(runCalled).toBe(false);
283+
expect(releaseCalled).toBe(false);
284+
expect(mockRunReconciliation).not.toHaveBeenCalled();
285+
expect(mockDeleteOne).not.toHaveBeenCalled();
286+
});
287+
288+
test('lock not acquired (acquireLock throws) → runReconciliation NOT called', async () => {
289+
// acquireLock throws — not E11000, so distributedLock re-throws to cron catch block
290+
mockFindOneAndUpdate.mockRejectedValue(new Error('DB unreachable'));
291+
292+
const holder = 'pod-reconcile:test-uuid';
293+
let runCalled = false;
294+
let acquireErr;
295+
try {
296+
const acquired = await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder });
297+
if (acquired) {
298+
runCalled = true; // would call runReconciliation
299+
}
300+
} catch (err) {
301+
acquireErr = err;
302+
}
303+
304+
expect(runCalled).toBe(false);
305+
expect(mockRunReconciliation).not.toHaveBeenCalled();
306+
expect(acquireErr).toBeDefined();
307+
expect(acquireErr.message).toBe('DB unreachable');
308+
});
309+
310+
test('releaseLock called with same LOCK_NAME and holder used to acquire — holder identity preserved', async () => {
311+
const holder = 'pod-reconcile:specific-uuid';
312+
mockFindOneAndUpdate.mockResolvedValue({ holder });
313+
314+
await acquireLock({ name: LOCK_NAME, ttlMs: LOCK_TTL_MS, holder });
315+
await releaseLock({ name: LOCK_NAME, holder });
316+
317+
// releaseLock must use the same LOCK_NAME ('billing.reconcile') and holder
318+
expect(mockDeleteOne).toHaveBeenCalledWith({ _id: LOCK_NAME, holder });
319+
expect(mockDeleteOne.mock.calls[0][0]._id).toBe('billing.reconcile');
320+
});
321+
});

0 commit comments

Comments
 (0)