Skip to content

Commit 8d52fb9

Browse files
committed
cmon GitHub
1 parent b7b8007 commit 8d52fb9

1 file changed

Lines changed: 115 additions & 59 deletions

File tree

cache/index.js

Lines changed: 115 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,17 @@ class ClusterCache {
9494
return null
9595
}
9696

97-
const wrappedValue = await this.clusterCache.get(key, undefined)
98-
if (wrappedValue !== undefined) {
99-
this.stats.hits++
100-
this.keyAccessTimes.set(key, Date.now()) // Update access time for LRU
101-
// Unwrap the value if it's wrapped with metadata
102-
return wrappedValue.data !== undefined ? wrappedValue.data : wrappedValue
97+
// Only use cluster cache in PM2 mode to avoid IPC timeouts
98+
if (this.isPM2) {
99+
const wrappedValue = await this.clusterCache.get(key, undefined)
100+
if (wrappedValue !== undefined) {
101+
this.stats.hits++
102+
this.keyAccessTimes.set(key, Date.now()) // Update access time for LRU
103+
// Unwrap the value if it's wrapped with metadata
104+
return wrappedValue.data !== undefined ? wrappedValue.data : wrappedValue
105+
}
103106
}
107+
104108
// Check local cache (single lookup instead of has + get)
105109
const localValue = this.localCache.get(key)
106110
if (localValue !== undefined) {
@@ -200,8 +204,10 @@ class ClusterCache {
200204
size: valueSize
201205
}
202206

203-
// Set in cluster cache immediately (most critical operation)
204-
await this.clusterCache.set(key, wrappedValue, effectiveTTL)
207+
// Set in cluster cache only in PM2 mode to avoid IPC timeouts
208+
if (this.isPM2) {
209+
await this.clusterCache.set(key, wrappedValue, effectiveTTL)
210+
}
205211

206212
// Update local state (reuse precalculated values)
207213
this.stats.sets++
@@ -262,7 +268,11 @@ class ClusterCache {
262268
const workerId = process.env.pm_id || process.pid
263269

264270
try {
265-
await this.clusterCache.delete(key)
271+
// Only delete from cluster cache in PM2 mode to avoid IPC timeouts
272+
if (this.isPM2) {
273+
await this.clusterCache.delete(key)
274+
}
275+
266276
this.allKeys.delete(key)
267277
this.keyAccessTimes.delete(key) // Clean up access time tracking
268278
this.keyExpirations.delete(key) // Clean up expiration tracking
@@ -392,10 +402,15 @@ class ClusterCache {
392402
* @private
393403
*/
394404
async _getClusterKeyCount() {
405+
// In non-PM2 mode, use local count directly to avoid IPC timeouts
406+
if (!this.isPM2) {
407+
return this.allKeys.size
408+
}
409+
395410
try {
396411
const keysMap = await this.clusterCache.keys()
397412
const uniqueKeys = new Set()
398-
413+
399414
for (const instanceKeys of Object.values(keysMap)) {
400415
if (Array.isArray(instanceKeys)) {
401416
instanceKeys.forEach(key => {
@@ -406,7 +421,7 @@ class ClusterCache {
406421
})
407422
}
408423
}
409-
424+
410425
return uniqueKeys.size
411426
} catch (err) {
412427
// Fallback to local count on error
@@ -459,39 +474,45 @@ class ClusterCache {
459474
*/
460475
async invalidate(pattern, invalidatedKeys = new Set()) {
461476
let count = 0
462-
477+
463478
try {
464-
const keysMap = await this.clusterCache.keys()
465-
const allKeys = new Set()
466-
467-
for (const instanceKeys of Object.values(keysMap)) {
468-
if (Array.isArray(instanceKeys)) {
469-
instanceKeys.forEach(key => allKeys.add(key))
479+
let allKeys = new Set()
480+
481+
// In PM2 mode, get keys from cluster cache; otherwise use local keys
482+
if (this.isPM2) {
483+
const keysMap = await this.clusterCache.keys()
484+
for (const instanceKeys of Object.values(keysMap)) {
485+
if (Array.isArray(instanceKeys)) {
486+
instanceKeys.forEach(key => allKeys.add(key))
487+
}
470488
}
489+
} else {
490+
// In non-PM2 mode, use local keys to avoid IPC timeouts
491+
allKeys = new Set(this.allKeys)
471492
}
472-
493+
473494
const regex = pattern instanceof RegExp ? pattern : new RegExp(pattern)
474-
495+
475496
const deletePromises = []
476497
const matchedKeys = []
477498
for (const key of allKeys) {
478499
if (invalidatedKeys.has(key)) {
479500
continue
480501
}
481-
502+
482503
if (regex.test(key)) {
483504
deletePromises.push(this.delete(key, true))
484505
matchedKeys.push(key)
485506
invalidatedKeys.add(key)
486507
count++
487508
}
488509
}
489-
510+
490511
await Promise.all(deletePromises)
491512
} catch (err) {
492513
console.error('Cache invalidate error:', err)
493514
}
494-
515+
495516
return count
496517
}
497518

@@ -513,30 +534,36 @@ class ClusterCache {
513534
try {
514535
// Wait for all workers to sync
515536
await this.waitForSync()
516-
537+
517538
const aggregatedStats = await this._aggregateStats()
518-
519-
const keysMap = await this.clusterCache.keys()
520-
const uniqueKeys = new Set()
521-
522-
for (const instanceKeys of Object.values(keysMap)) {
523-
if (Array.isArray(instanceKeys)) {
524-
instanceKeys.forEach(key => {
525-
// Exclude internal keys from cache length
526-
if (!key.startsWith('_stats_worker_') && key !== '_clear_signal') {
527-
uniqueKeys.add(key)
528-
}
529-
})
539+
540+
let cacheLength = this.allKeys.size
541+
542+
// In PM2 mode, get actual cluster key count; otherwise use local count
543+
if (this.isPM2) {
544+
const keysMap = await this.clusterCache.keys()
545+
const uniqueKeys = new Set()
546+
547+
for (const instanceKeys of Object.values(keysMap)) {
548+
if (Array.isArray(instanceKeys)) {
549+
instanceKeys.forEach(key => {
550+
// Exclude internal keys from cache length
551+
if (!key.startsWith('_stats_worker_') && key !== '_clear_signal') {
552+
uniqueKeys.add(key)
553+
}
554+
})
555+
}
530556
}
557+
cacheLength = uniqueKeys.size
531558
}
532-
559+
533560
const uptime = Date.now() - this.life
534561
const hitRate = aggregatedStats.hits + aggregatedStats.misses > 0
535562
? (aggregatedStats.hits / (aggregatedStats.hits + aggregatedStats.misses) * 100).toFixed(2)
536563
: '0.00'
537-
564+
538565
return {
539-
length: uniqueKeys.size,
566+
length: cacheLength,
540567
maxLength: this.maxLength,
541568
totalBytes: aggregatedStats.totalBytes,
542569
maxBytes: this.maxBytes,
@@ -576,29 +603,43 @@ class ClusterCache {
576603
*/
577604
async getDetails() {
578605
try {
579-
const keysMap = await this.clusterCache.keys()
580-
const allKeys = new Set()
581-
582-
for (const instanceKeys of Object.values(keysMap)) {
583-
if (Array.isArray(instanceKeys)) {
584-
instanceKeys.forEach(key => {
585-
if (!key.startsWith('_stats_worker_') && !key.startsWith('_clear_signal')) {
586-
allKeys.add(key)
587-
}
588-
})
606+
let allKeys = new Set()
607+
608+
// In PM2 mode, get keys from cluster cache; otherwise use local keys
609+
if (this.isPM2) {
610+
const keysMap = await this.clusterCache.keys()
611+
for (const instanceKeys of Object.values(keysMap)) {
612+
if (Array.isArray(instanceKeys)) {
613+
instanceKeys.forEach(key => {
614+
if (!key.startsWith('_stats_worker_') && !key.startsWith('_clear_signal')) {
615+
allKeys.add(key)
616+
}
617+
})
618+
}
589619
}
620+
} else {
621+
// In non-PM2 mode, use local keys to avoid IPC timeouts
622+
allKeys = new Set(this.allKeys)
590623
}
591-
624+
592625
const details = []
593626
let position = 0
594627
for (const key of allKeys) {
595-
const wrappedValue = await this.clusterCache.get(key, undefined)
628+
let wrappedValue
629+
630+
// In PM2 mode, get from cluster cache; otherwise get from local cache
631+
if (this.isPM2) {
632+
wrappedValue = await this.clusterCache.get(key, undefined)
633+
} else {
634+
wrappedValue = this.localCache.get(key)
635+
}
636+
596637
// Handle both wrapped and unwrapped values
597638
const actualValue = wrappedValue?.data !== undefined ? wrappedValue.data : wrappedValue
598639
const size = wrappedValue?.size || this._calculateSize(actualValue)
599640
const cachedAt = wrappedValue?.cachedAt || Date.now()
600641
const age = Date.now() - cachedAt
601-
642+
602643
details.push({
603644
position,
604645
key,
@@ -607,7 +648,7 @@ class ClusterCache {
607648
})
608649
position++
609650
}
610-
651+
611652
return details
612653
} catch (err) {
613654
console.error('Cache getDetails error:', err)
@@ -620,26 +661,31 @@ class ClusterCache {
620661
* @private
621662
*/
622663
async _checkClearSignal() {
664+
// Only check for clear signal in PM2 cluster mode to avoid IPC timeouts
665+
if (!this.isPM2) {
666+
return
667+
}
668+
623669
try {
624670
const signal = await this.clusterCache.get('_clear_signal', undefined)
625671
if (signal && signal.generation > this.clearGeneration) {
626672
// Another worker initiated a clear - reset our local state
627673
this.clearGeneration = signal.generation
628-
674+
629675
this.allKeys.clear()
630676
this.keyAccessTimes.clear()
631677
this.keySizes.clear()
632678
this.totalBytes = 0
633679
this.localCache.clear()
634-
680+
635681
this.stats = {
636682
hits: 0,
637683
misses: 0,
638684
evictions: 0,
639685
sets: 0,
640686
invalidations: 0
641687
}
642-
688+
643689
// Delete our worker stats key immediately
644690
const workerId = process.env.pm_id || process.pid
645691
const statsKey = `_stats_worker_${workerId}`
@@ -655,6 +701,11 @@ class ClusterCache {
655701
* @private
656702
*/
657703
async _syncStats() {
704+
// Only sync stats in PM2 cluster mode to avoid IPC timeouts
705+
if (!this.isPM2) {
706+
return
707+
}
708+
658709
try {
659710
const workerId = process.env.pm_id || process.pid
660711
const statsKey = `_stats_worker_${workerId}`
@@ -675,6 +726,11 @@ class ClusterCache {
675726
* @returns {Promise<Object>} Aggregated stats
676727
*/
677728
async _aggregateStats() {
729+
// In non-PM2 mode, return local stats directly to avoid IPC timeouts
730+
if (!this.isPM2) {
731+
return { ...this.stats, totalBytes: this.totalBytes }
732+
}
733+
678734
try {
679735
const keysMap = await this.clusterCache.keys()
680736
const aggregated = {
@@ -685,7 +741,7 @@ class ClusterCache {
685741
totalBytes: 0
686742
}
687743
const processedWorkers = new Set()
688-
744+
689745
for (const instanceKeys of Object.values(keysMap)) {
690746
if (Array.isArray(instanceKeys)) {
691747
for (const key of instanceKeys) {
@@ -694,7 +750,7 @@ class ClusterCache {
694750
if (processedWorkers.has(workerId)) {
695751
continue
696752
}
697-
753+
698754
try {
699755
const workerStats = await this.clusterCache.get(key, undefined)
700756
if (workerStats && typeof workerStats === 'object') {
@@ -712,7 +768,7 @@ class ClusterCache {
712768
}
713769
}
714770
}
715-
771+
716772
return aggregated
717773
} catch (err) {
718774
return { ...this.stats, totalBytes: this.totalBytes }

0 commit comments

Comments
 (0)