Skip to content

Commit d397764

Browse files
feat(billing): distributed lock for multi-pod crons (#3688)
* feat(lib): add distributed lock primitive for multi-pod crons Mongo-backed TTL doc lock with auto-expiry. Acquired via findOneAndUpdate upsert with stale-or-absent predicate; released by holder match. Addresses devkit audit P1 (2026-05-21) — billing crons race when K8s concurrencyPolicy bypass occurs (e.g. crash post-jitter pre-finalize). * feat(billing): wrap crons with distributed lock weeklyReset, dunningSweep, extrasExpiration now acquire a distributed lock before mutating subscriptions. Skip-on-contention with info log; release in finally for clean recovery. Also fixes Mongoose 8 deprecation: `new: true` → `returnDocument: 'after'` in findOneAndUpdate. Closes audit P1 — multi-pod race on crash post-jitter pre-finalize. * docs(billing): document distributed cron lock + runbook entry * fix(tests): remove unused vars caught by lint (distributedLock) * fix(billing): address code-quality findings on cron lock - Move distributedLock unit test to canonical lib location (or document deviation) - Add releaseLock throw-path test + non-fatal comment in 3 crons' finally - Harmonize log style in skip-on-contention path - Rename + document integration test (in-process concurrency clarification) - Add findOne verification step in RUNBOOKS before deleteOne Addresses code-quality review I1, I2, M1, M3, M4. * fix(billing): address DeepSeek gate findings on cron lock - HIGH: restore (errors || desyncErrors) condition in dunningSweep exit code (regression introduced when wrapping with lock — would make K8s alerts silent) - MEDIUM: wrap releaseLock in try/catch within finally to preserve original work error if both throw - LOW: guard acquireLock against invalid ttlMs (must be positive finite) - NIT: drop redundant setDefaultsOnInsert (all fields explicitly $set) Addresses gate iteration 1 BLOCK. * fix(billing): address CodeRabbit findings on PR #3688 - Move lib/distributedLock.js → lib/services/distributedLock.js (aligns with existing convention for service-level primitives) - Move lib/tests/distributedLock.unit.tests.js → lib/services/tests/ - Update import paths in 3 billing crons + integration tests - Flip logger.error arg order in dunningSweep + extrasExpiration finally (match file convention: (message, meta) not (meta, message)) - Same fix applied to weeklyReset (same pattern found, not flagged by CR) - Update modules/billing/crons/README.md → reference '## 6 — Cron lock stuck' (exact match with RUNBOOKS section header) Heartbeat/renewLock suggestion intentionally NOT added — see PR reply. * fix(billing): add loadModels to dunningSweep+extrasExpiration crons + clarify lock test name Resolves Copilot MissingSchemaError finding: dunningSweep and extrasExpiration called mongooseService.connect() without loadModels(), risking MissingSchemaError on repository top-level mongoose.model() calls. Matches weeklyReset/reconcile pattern. Also renames test 'ttlMs < 0' → 'existing lock has expired (lockedUntil < now)' to accurately describe the scenario being exercised.
1 parent cbf3abf commit d397764

8 files changed

Lines changed: 558 additions & 63 deletions

File tree

lib/services/distributedLock.js

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/**
2+
* Distributed lock primitive for multi-pod cron jobs.
3+
*
4+
* Uses a MongoDB TTL collection (`cron_locks`) to ensure mutual exclusion
5+
* across replicas. A lock document expires automatically after `ttlMs`
6+
* milliseconds — pod crashes therefore never permanently block scheduling.
7+
*
8+
* Usage:
9+
* const holder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}`
10+
* const acquired = await acquireLock({ name: 'billing.weeklyReset', ttlMs: 10 * 60 * 1000, holder })
11+
* if (!acquired) return // another pod holds the lock
12+
* try {
13+
* // ... work
14+
* } finally {
15+
* await releaseLock({ name: 'billing.weeklyReset', holder })
16+
* }
17+
*/
18+
19+
import mongoose from 'mongoose';
20+
21+
const LockSchema = new mongoose.Schema(
22+
{
23+
_id: { type: String, required: true },
24+
lockedAt: { type: Date, required: true },
25+
lockedUntil: { type: Date, required: true },
26+
holder: { type: String, required: true },
27+
},
28+
{ collection: 'cron_locks', versionKey: false },
29+
);
30+
31+
// MongoDB TTL index — auto-deletes expired docs so stale locks don't accumulate.
32+
LockSchema.index({ lockedUntil: 1 }, { expireAfterSeconds: 0 });
33+
34+
export const CronLock = mongoose.models.CronLock ?? mongoose.model('CronLock', LockSchema);
35+
36+
/**
37+
* @function acquireLock
38+
* @description Attempt to acquire a named lock. Returns true if acquired,
39+
* false if the lock is currently held by another holder.
40+
*
41+
* Implementation: findOneAndUpdate with upsert on the condition that either
42+
* no doc exists (_id absent) or the existing doc has expired (lockedUntil < now).
43+
* Duplicate-key errors (E11000) from the unique _id index are caught and
44+
* returned as false (another pod raced to acquire simultaneously).
45+
*
46+
* @param {object} opts
47+
* @param {string} opts.name - Unique lock name (e.g. 'billing.weeklyReset')
48+
* @param {number} opts.ttlMs - Lock duration in milliseconds
49+
* @param {string} opts.holder - Unique identifier for the calling pod/process
50+
* @returns {Promise<boolean>}
51+
*/
52+
export async function acquireLock({ name, ttlMs, holder }) {
53+
if (!Number.isFinite(ttlMs) || ttlMs <= 0) {
54+
throw new Error(`acquireLock: ttlMs must be a positive number, received ${ttlMs}`);
55+
}
56+
const now = new Date();
57+
const lockedUntil = new Date(now.getTime() + ttlMs);
58+
try {
59+
const result = await CronLock.findOneAndUpdate(
60+
{ _id: name, lockedUntil: { $lt: now } },
61+
{ $set: { lockedAt: now, lockedUntil, holder } },
62+
{ upsert: true, returnDocument: 'after' },
63+
);
64+
return result?.holder === holder;
65+
} catch (err) {
66+
if (err.code === 11000) return false;
67+
throw err;
68+
}
69+
}
70+
71+
/**
72+
* @function releaseLock
73+
* @description Release a lock only if the caller is the current holder.
74+
* No-op if the lock is held by a different holder (prevents accidental release
75+
* after a TTL expiry + re-acquire by another pod).
76+
*
77+
* @param {object} opts
78+
* @param {string} opts.name - Lock name to release
79+
* @param {string} opts.holder - Must match the holder that acquired the lock
80+
* @returns {Promise<void>}
81+
*/
82+
export async function releaseLock({ name, holder }) {
83+
await CronLock.deleteOne({ _id: name, holder });
84+
}
85+
86+
export default { CronLock, acquireLock, releaseLock };
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/**
2+
* Module dependencies.
3+
*/
4+
import { jest, describe, test, beforeEach, afterEach, expect } from '@jest/globals';
5+
6+
/**
7+
* Unit tests for lib/services/distributedLock.js
8+
*
9+
* All Mongoose interactions are mocked — no real DB connection required.
10+
* Tests verify the acquire / release contract and contention handling.
11+
*/
12+
describe('distributedLock — acquireLock:', () => {
13+
let acquireLock;
14+
let mockFindOneAndUpdate;
15+
let mockDeleteOne;
16+
17+
beforeEach(async () => {
18+
jest.resetModules();
19+
20+
mockFindOneAndUpdate = jest.fn();
21+
mockDeleteOne = jest.fn();
22+
23+
const mockCronLock = {
24+
findOneAndUpdate: mockFindOneAndUpdate,
25+
deleteOne: mockDeleteOne,
26+
};
27+
28+
jest.unstable_mockModule('mongoose', () => ({
29+
default: {
30+
Schema: class MockSchema {
31+
constructor() {}
32+
index() {}
33+
},
34+
models: {},
35+
model: jest.fn(() => mockCronLock),
36+
},
37+
}));
38+
39+
({ acquireLock } = await import('../distributedLock.js'));
40+
});
41+
42+
afterEach(() => {
43+
jest.restoreAllMocks();
44+
});
45+
46+
test('returns true when findOneAndUpdate resolves with matching holder', async () => {
47+
mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1' });
48+
49+
const ok = await acquireLock({ name: 'job-a', ttlMs: 60_000, holder: 'pod-1' });
50+
51+
expect(ok).toBe(true);
52+
expect(mockFindOneAndUpdate).toHaveBeenCalledTimes(1);
53+
const [filter, update, opts] = mockFindOneAndUpdate.mock.calls[0];
54+
expect(filter._id).toBe('job-a');
55+
expect(filter.lockedUntil.$lt).toBeInstanceOf(Date);
56+
expect(update.$set.holder).toBe('pod-1');
57+
expect(opts.upsert).toBe(true);
58+
});
59+
60+
test('returns false when findOneAndUpdate returns doc held by different holder', async () => {
61+
mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1' });
62+
63+
const ok = await acquireLock({ name: 'job-b', ttlMs: 60_000, holder: 'pod-2' });
64+
65+
expect(ok).toBe(false);
66+
});
67+
68+
test('returns false on E11000 duplicate-key (concurrent upsert race)', async () => {
69+
const dupErr = new Error('E11000 duplicate key');
70+
dupErr.code = 11000;
71+
mockFindOneAndUpdate.mockRejectedValue(dupErr);
72+
73+
const ok = await acquireLock({ name: 'job-c', ttlMs: 60_000, holder: 'pod-1' });
74+
75+
expect(ok).toBe(false);
76+
});
77+
78+
test('re-throws non-duplicate errors', async () => {
79+
const dbErr = new Error('network timeout');
80+
dbErr.code = 13;
81+
mockFindOneAndUpdate.mockRejectedValue(dbErr);
82+
83+
await expect(acquireLock({ name: 'job-d', ttlMs: 60_000, holder: 'pod-1' })).rejects.toThrow('network timeout');
84+
});
85+
86+
test.each([
87+
[0, 'zero'],
88+
[-1, 'negative'],
89+
[Number.NaN, 'NaN'],
90+
[Infinity, 'Infinity'],
91+
[undefined, 'undefined'],
92+
[null, 'null'],
93+
])('throws when ttlMs is %s (%s)', async (ttlMs) => {
94+
await expect(acquireLock({ name: 'job-guard', ttlMs, holder: 'pod-1' })).rejects.toThrow(
95+
'acquireLock: ttlMs must be a positive number',
96+
);
97+
expect(mockFindOneAndUpdate).not.toHaveBeenCalled();
98+
});
99+
100+
test('lockedUntil is set to now + ttlMs', async () => {
101+
const before = Date.now();
102+
mockFindOneAndUpdate.mockResolvedValue({ holder: 'pod-1' });
103+
104+
await acquireLock({ name: 'job-e', ttlMs: 10_000, holder: 'pod-1' });
105+
106+
const after = Date.now();
107+
const { lockedUntil } = mockFindOneAndUpdate.mock.calls[0][1].$set;
108+
expect(lockedUntil.getTime()).toBeGreaterThanOrEqual(before + 10_000);
109+
expect(lockedUntil.getTime()).toBeLessThanOrEqual(after + 10_000);
110+
});
111+
});
112+
113+
describe('distributedLock — releaseLock:', () => {
114+
let releaseLock;
115+
let mockDeleteOne;
116+
117+
beforeEach(async () => {
118+
jest.resetModules();
119+
120+
mockDeleteOne = jest.fn().mockResolvedValue({});
121+
122+
const mockCronLock = {
123+
findOneAndUpdate: jest.fn(),
124+
deleteOne: mockDeleteOne,
125+
};
126+
127+
jest.unstable_mockModule('mongoose', () => ({
128+
default: {
129+
Schema: class MockSchema {
130+
constructor() {}
131+
index() {}
132+
},
133+
models: {},
134+
model: jest.fn(() => mockCronLock),
135+
},
136+
}));
137+
138+
({ releaseLock } = await import('../distributedLock.js'));
139+
});
140+
141+
afterEach(() => {
142+
jest.restoreAllMocks();
143+
});
144+
145+
test('calls deleteOne with name and holder', async () => {
146+
await releaseLock({ name: 'job-a', holder: 'pod-1' });
147+
148+
expect(mockDeleteOne).toHaveBeenCalledWith({ _id: 'job-a', holder: 'pod-1' });
149+
});
150+
151+
test('does not throw when deleteOne resolves', async () => {
152+
await expect(releaseLock({ name: 'job-b', holder: 'pod-2' })).resolves.toBeUndefined();
153+
});
154+
155+
test('propagates deleteOne errors to the caller', async () => {
156+
const dbErr = new Error('network timeout');
157+
mockDeleteOne.mockRejectedValue(dbErr);
158+
await expect(releaseLock({ name: 'job-c', holder: 'pod-1' })).rejects.toThrow('network timeout');
159+
});
160+
});

modules/billing/RUNBOOKS.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,3 +175,46 @@ Operational runbooks for the billing module. Each runbook references real endpoi
175175
4. Monitor `billing.plans.stale` event frequency — if the stale cache is 24h+, alert the on-call to decide whether to take the plans endpoint down entirely or serve a static fallback.
176176
5. Once Stripe recovers: `POST /api/admin/billing/sync/:orgId` on any org that attempted a subscription change during the outage.
177177
6. Check dead-letter queue for events that exhausted retries during the outage window: `GET /api/admin/billing/dead-letters`.
178+
179+
---
180+
181+
## 6 — Cron lock stuck
182+
183+
**Symptom:** All billing crons emit `lock held by another pod, skipping` for longer than the lock TTL duration, meaning no billing cron is running at all.
184+
185+
**Cause:** A pod crashed mid-job without reaching the `finally` block that calls `releaseLock`. The TTL has not yet expired on the stale lock doc in `cron_locks`.
186+
187+
**Wait first:** Lock TTLs are sized 2–3× typical exec time. Wait for the TTL to expire (max 15 min for `dunningSweep`). MongoDB's TTL monitor runs every 60 seconds, so actual cleanup may lag up to 60 s after expiry.
188+
189+
**If urgent — drop the stale lock manually:**
190+
191+
**Before drop:** verify the holder and TTL window first to avoid kicking a running cron.
192+
193+
```js
194+
db.cron_locks.findOne({ _id: "billing.weeklyReset" })
195+
// If lockedUntil is in the past → safe to drop.
196+
// If in the future → the lock is genuinely held; wait for TTL unless the holder pod is confirmed dead.
197+
```
198+
199+
Then drop:
200+
201+
```js
202+
// weeklyReset
203+
db.cron_locks.deleteOne({ _id: "billing.weeklyReset" })
204+
205+
// dunningSweep
206+
db.cron_locks.deleteOne({ _id: "billing.dunningSweep" })
207+
208+
// extrasExpiration
209+
db.cron_locks.deleteOne({ _id: "billing.extrasExpiration" })
210+
```
211+
212+
Or via `kubectl exec` on the mongo pod:
213+
214+
```bash
215+
kubectl exec -n pierreb-projects mongo-0 -- mongosh \
216+
"mongodb://localhost:27017/<your-db>" \
217+
--eval 'db.cron_locks.deleteOne({ _id: "billing.weeklyReset" })'
218+
```
219+
220+
**Prevention:** Lock TTLs are intentionally conservative. If you see frequent stuck-lock incidents, investigate cron duration (slow query? tenant scale?) rather than lower the TTL — a TTL too short defeats the mutual-exclusion guarantee.

modules/billing/crons/README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,3 +118,22 @@ const orgs = allOrgs.filter(o => {
118118
## Dependency: meterMode flag
119119

120120
All scripts check `config.billing.meterMode` at startup. Downstream projects must set this flag to `true` in their project config to activate billing crons. The devkit default is `false` — all crons are no-ops until explicitly enabled.
121+
122+
## Concurrency control
123+
124+
All billing crons acquire a distributed lock (`lib/services/distributedLock.js`) before
125+
mutating state. The lock auto-expires after TTL (5–15 min depending on cron)
126+
so that pod crashes don't permanently block scheduling.
127+
128+
Lock names and TTLs:
129+
130+
| Lock name | TTL | Cron |
131+
|-----------|-----|------|
132+
| `billing.weeklyReset` | 10 min | `billing.weeklyReset.js` |
133+
| `billing.dunningSweep` | 15 min | `billing.dunningSweep.js` |
134+
| `billing.extrasExpiration` | 5 min | `billing.extrasExpiration.js` |
135+
136+
If you see `lock held by another pod, skipping` in logs, that is expected when
137+
two pods race after a K8s `concurrencyPolicy` bypass (e.g. pod crash after
138+
jitter but before finalize). See the runbook entry `## 6 — Cron lock stuck` in
139+
`modules/billing/RUNBOOKS.md` for manual resolution.

0 commit comments

Comments
 (0)