@@ -32,7 +32,7 @@ export type ResourceLease = {
3232export type ResourceLockManager = {
3333 acquire : (
3434 key : string ,
35- options ?: ResourceLockAcquireOptions
35+ options ?: ResourceLockAcquireOptions ,
3636 ) => Promise < ResourceLease > ;
3737} ;
3838
@@ -115,6 +115,21 @@ const readJsonFile = async <T>(filePath: string): Promise<T | null> => {
115115 }
116116} ;
117117
118+ const writeJsonFileAtomic = async < T > (
119+ filePath : string ,
120+ value : T ,
121+ ) : Promise < void > => {
122+ const tempPath = `${ filePath } .${ process . pid } .${ crypto . randomUUID ( ) } .tmp` ;
123+
124+ try {
125+ await fs . writeFile ( tempPath , JSON . stringify ( value ) , 'utf8' ) ;
126+ await fs . rename ( tempPath , filePath ) ;
127+ } catch ( error ) {
128+ await removeFileIfPresent ( tempPath ) ;
129+ throw error ;
130+ }
131+ } ;
132+
118133const removeFileIfPresent = async ( filePath : string ) : Promise < void > => {
119134 try {
120135 await fs . rm ( filePath , { force : true } ) ;
@@ -143,7 +158,7 @@ const isMetadataStale = (
143158 metadata : ResourceLockMetadata ,
144159 now : number ,
145160 staleLockTimeoutMs : number ,
146- isProcessActive : ( pid : number ) => boolean
161+ isProcessActive : ( pid : number ) => boolean ,
147162) : boolean => {
148163 if ( ! isProcessActive ( metadata . pid ) ) {
149164 return true ;
@@ -154,14 +169,14 @@ const isMetadataStale = (
154169
155170const isQueuedTicketStale = (
156171 metadata : ResourceLockMetadata ,
157- isProcessActive : ( pid : number ) => boolean
172+ isProcessActive : ( pid : number ) => boolean ,
158173) : boolean => {
159174 return ! isProcessActive ( metadata . pid ) ;
160175} ;
161176
162177const waitForPollInterval = (
163178 ms : number ,
164- signal ?: AbortSignal
179+ signal ?: AbortSignal ,
165180) : Promise < void > => {
166181 if ( ! signal ) {
167182 return wait ( ms ) ;
@@ -187,7 +202,7 @@ const waitForPollInterval = (
187202} ;
188203
189204const readQueueTickets = async (
190- queueDir : string
205+ queueDir : string ,
191206) : Promise < ResourceLockMetadata [ ] > => {
192207 const ticketEntries = await fs . readdir ( queueDir , { withFileTypes : true } ) ;
193208 const tickets = await Promise . all (
@@ -196,15 +211,15 @@ const readQueueTickets = async (
196211 . map ( async ( entry ) => ( {
197212 name : entry . name ,
198213 metadata : await readJsonFile < ResourceLockMetadata > (
199- path . join ( queueDir , entry . name )
214+ path . join ( queueDir , entry . name ) ,
200215 ) ,
201- } ) )
216+ } ) ) ,
202217 ) ;
203218
204219 return tickets
205220 . filter (
206221 ( entry ) : entry is { name : string ; metadata : ResourceLockMetadata } =>
207- entry . metadata !== null
222+ entry . metadata !== null ,
208223 )
209224 . sort ( ( left , right ) => left . name . localeCompare ( right . name ) )
210225 . map ( ( entry ) => entry . metadata ) ;
@@ -229,10 +244,10 @@ const cleanupQueue = async (options: {
229244 logger . debug (
230245 'removing stale queued ticket %s for key %s' ,
231246 ticket . ticketId ,
232- ticket . key
247+ ticket . key ,
233248 ) ;
234249 await removeFileIfPresent (
235- path . join ( paths . queueDir , `${ ticket . ticketId } .json` )
250+ path . join ( paths . queueDir , `${ ticket . ticketId } .json` ) ,
236251 ) ;
237252 continue ;
238253 }
@@ -265,15 +280,15 @@ const maybeClearStaleOwner = async (options: {
265280 logger . debug (
266281 'removing stale owner ticket %s for key %s' ,
267282 owner . ticketId ,
268- owner . key
283+ owner . key ,
269284 ) ;
270285 await removeFileIfPresent ( ownerFilePath ) ;
271286 return null ;
272287} ;
273288
274289const claimOwnership = async (
275290 ownerFilePath : string ,
276- metadata : ResourceLockMetadata
291+ metadata : ResourceLockMetadata ,
277292) : Promise < boolean > => {
278293 try {
279294 await fs . writeFile ( ownerFilePath , JSON . stringify ( metadata ) , {
@@ -291,7 +306,7 @@ const claimOwnership = async (
291306} ;
292307
293308export const createResourceLockManager = (
294- options : ResourceLockManagerOptions = { }
309+ options : ResourceLockManagerOptions = { } ,
295310) : ResourceLockManager => {
296311 const rootDir = options . rootDir ?? DEFAULT_ROOT_DIR ;
297312 const pollIntervalMs = options . pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS ;
@@ -323,6 +338,7 @@ export const createResourceLockManager = (
323338 scopedLogger . debug ( 'queued ticket %s for key %s' , ticketId , key ) ;
324339
325340 let heartbeatTimer : NodeJS . Timeout | null = null ;
341+ let heartbeatInFlight = false ;
326342 let released = false ;
327343 let didNotifyWait = false ;
328344 const waitStartedAt = Date . now ( ) ;
@@ -339,7 +355,7 @@ export const createResourceLockManager = (
339355 }
340356
341357 const owner = await readJsonFile < ResourceLockMetadata > (
342- paths . ownerFilePath
358+ paths . ownerFilePath ,
343359 ) ;
344360 if ( owner ?. ticketId === ticketId ) {
345361 await removeFileIfPresent ( paths . ownerFilePath ) ;
@@ -351,30 +367,36 @@ export const createResourceLockManager = (
351367
352368 const startHeartbeat = ( ) => {
353369 heartbeatTimer = setInterval ( async ( ) => {
354- const nextHeartbeatAt = Date . now ( ) ;
355- const owner = await readJsonFile < ResourceLockMetadata > (
356- paths . ownerFilePath
357- ) ;
358-
359- if ( released || owner ?. ticketId !== ticketId ) {
370+ if ( heartbeatInFlight ) {
360371 return ;
361372 }
362373
363- const nextMetadata : ResourceLockMetadata = {
364- ...owner ,
365- heartbeatAt : nextHeartbeatAt ,
366- } ;
374+ heartbeatInFlight = true ;
367375
368- if ( released ) {
369- return ;
370- }
376+ try {
377+ const nextHeartbeatAt = Date . now ( ) ;
378+ const owner = await readJsonFile < ResourceLockMetadata > (
379+ paths . ownerFilePath ,
380+ ) ;
371381
372- await fs . writeFile (
373- paths . ownerFilePath ,
374- JSON . stringify ( nextMetadata ) ,
375- 'utf8'
376- ) ;
377- scopedLogger . debug ( 'refreshed heartbeat for ticket %s' , ticketId ) ;
382+ if ( released || owner ?. ticketId !== ticketId ) {
383+ return ;
384+ }
385+
386+ const nextMetadata : ResourceLockMetadata = {
387+ ...owner ,
388+ heartbeatAt : nextHeartbeatAt ,
389+ } ;
390+
391+ if ( released ) {
392+ return ;
393+ }
394+
395+ await writeJsonFileAtomic ( paths . ownerFilePath , nextMetadata ) ;
396+ scopedLogger . debug ( 'refreshed heartbeat for ticket %s' , ticketId ) ;
397+ } finally {
398+ heartbeatInFlight = false ;
399+ }
378400 } , heartbeatIntervalMs ) ;
379401 heartbeatTimer . unref ?.( ) ;
380402 } ;
@@ -391,12 +413,12 @@ export const createResourceLockManager = (
391413 isProcessActive,
392414 } ) ;
393415 const ownIndex = activeTickets . findIndex (
394- ( entry ) => entry . ticketId === ticketId
416+ ( entry ) => entry . ticketId === ticketId ,
395417 ) ;
396418
397419 if ( ownIndex === - 1 ) {
398420 throw new Error (
399- `Queued ticket ${ ticketId } disappeared before acquisition.`
421+ `Queued ticket ${ ticketId } disappeared before acquisition.` ,
400422 ) ;
401423 }
402424
@@ -420,7 +442,7 @@ export const createResourceLockManager = (
420442 scopedLogger . debug (
421443 'acquired lock for key %s with ticket %s' ,
422444 key ,
423- ticketId
445+ ticketId ,
424446 ) ;
425447 return { release } ;
426448 }
@@ -436,7 +458,7 @@ export const createResourceLockManager = (
436458 'waiting for key %s with ticket %s at queue position %d' ,
437459 key ,
438460 ticketId ,
439- ownIndex + 1
461+ ownIndex + 1 ,
440462 ) ;
441463
442464 await waitForPollInterval ( pollIntervalMs , acquireOptions . signal ) ;
0 commit comments