-
-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathdistributedLock.js
More file actions
86 lines (79 loc) · 3.13 KB
/
distributedLock.js
File metadata and controls
86 lines (79 loc) · 3.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
* Distributed lock primitive for multi-pod cron jobs.
*
* Uses a MongoDB TTL collection (`cron_locks`) to ensure mutual exclusion
* across replicas. A lock document expires automatically after `ttlMs`
* milliseconds — pod crashes therefore never permanently block scheduling.
*
* Usage:
* const holder = `${process.env.HOSTNAME ?? 'unknown'}:${randomUUID()}`
* const acquired = await acquireLock({ name: 'billing.weeklyReset', ttlMs: 10 * 60 * 1000, holder })
* if (!acquired) return // another pod holds the lock
* try {
* // ... work
* } finally {
* await releaseLock({ name: 'billing.weeklyReset', holder })
* }
*/
import mongoose from 'mongoose';
const LockSchema = new mongoose.Schema(
{
_id: { type: String, required: true },
lockedAt: { type: Date, required: true },
lockedUntil: { type: Date, required: true },
holder: { type: String, required: true },
},
{ collection: 'cron_locks', versionKey: false },
);
// MongoDB TTL index — auto-deletes expired docs so stale locks don't accumulate.
LockSchema.index({ lockedUntil: 1 }, { expireAfterSeconds: 0 });
export const CronLock = mongoose.models.CronLock ?? mongoose.model('CronLock', LockSchema);
/**
* @function acquireLock
* @description Attempt to acquire a named lock. Returns true if acquired,
* false if the lock is currently held by another holder.
*
* Implementation: findOneAndUpdate with upsert on the condition that either
* no doc exists (_id absent) or the existing doc has expired (lockedUntil < now).
* Duplicate-key errors (E11000) from the unique _id index are caught and
* returned as false (another pod raced to acquire simultaneously).
*
* @param {object} opts
* @param {string} opts.name - Unique lock name (e.g. 'billing.weeklyReset')
* @param {number} opts.ttlMs - Lock duration in milliseconds
* @param {string} opts.holder - Unique identifier for the calling pod/process
* @returns {Promise<boolean>}
*/
export async function acquireLock({ name, ttlMs, holder }) {
if (!Number.isFinite(ttlMs) || ttlMs <= 0) {
throw new Error(`acquireLock: ttlMs must be a positive number, received ${ttlMs}`);
}
const now = new Date();
const lockedUntil = new Date(now.getTime() + ttlMs);
try {
const result = await CronLock.findOneAndUpdate(
{ _id: name, lockedUntil: { $lt: now } },
{ $set: { lockedAt: now, lockedUntil, holder } },
{ upsert: true, returnDocument: 'after' },
);
return result?.holder === holder;
} catch (err) {
if (err.code === 11000) return false;
throw err;
}
}
/**
* @function releaseLock
* @description Release a lock only if the caller is the current holder.
* No-op if the lock is held by a different holder (prevents accidental release
* after a TTL expiry + re-acquire by another pod).
*
* @param {object} opts
* @param {string} opts.name - Lock name to release
* @param {string} opts.holder - Must match the holder that acquired the lock
* @returns {Promise<void>}
*/
export async function releaseLock({ name, holder }) {
await CronLock.deleteOne({ _id: name, holder });
}
export default { CronLock, acquireLock, releaseLock };