Skip to content

Commit 3c6f915

Browse files
committed
Limit path depth and add cache cleanup
Prevent excessive memory usage and reduce GC pressure by limiting traversal path depth in data-filter (cap depth logic at 20). In grouper worker, add a periodic cache cleanup interval (every 5 minutes) started on worker start and cleared on finish to avoid unbounded cache growth. Free large references after delta computation by nulling event payloads to allow garbage collection. Also tighten memoization for findSimilarEvent (max reduced from 200 to 50 and ttl set to 600s) to further limit memory retained by caches.
1 parent c4cc417 commit 3c6f915

2 files changed

Lines changed: 37 additions & 2 deletions

File tree

workers/grouper/src/data-filter.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@ function forAll(obj: Record<string, unknown>, callback: (path: string[], key: st
1818
if (!(typeof value === 'object' && !Array.isArray(value))) {
1919
callback(path, key, current);
2020
} else {
21-
visit(value, [...path, key]);
21+
/**
22+
* Limit path depth to prevent excessive memory allocations from deep nesting
23+
* This reduces GC pressure and memory usage for deeply nested objects
24+
*/
25+
const newPath = path.length < 20 ? path.concat(key) : [...path, key];
26+
visit(value, newPath);
2227
}
2328
}
2429
};

workers/grouper/src/index.ts

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ export default class GrouperWorker extends Worker {
7272
*/
7373
private redis = new RedisHelper();
7474

75+
/**
76+
* Interval for periodic cache cleanup to prevent memory leaks from unbounded cache growth
77+
*/
78+
private cacheCleanupInterval: NodeJS.Timeout | null = null;
79+
7580
/**
7681
* Start consuming messages
7782
*/
@@ -85,13 +90,30 @@ export default class GrouperWorker extends Worker {
8590

8691
await this.redis.initialize();
8792
console.log('redis initialized');
93+
94+
/**
95+
* Start periodic cache cleanup to prevent memory leaks from unbounded cache growth
96+
* Runs every 5 minutes to clear old cache entries
97+
*/
98+
this.cacheCleanupInterval = setInterval(() => {
99+
this.clearCache();
100+
}, 5 * 60 * 1000);
101+
88102
await super.start();
89103
}
90104

91105
/**
92106
* Finish everything
93107
*/
94108
public async finish(): Promise<void> {
109+
/**
110+
* Clear cache cleanup interval to prevent resource leaks
111+
*/
112+
if (this.cacheCleanupInterval) {
113+
clearInterval(this.cacheCleanupInterval);
114+
this.cacheCleanupInterval = null;
115+
}
116+
95117
await super.finish();
96118
this.prepareCache();
97119
await this.eventsDb.close();
@@ -237,6 +259,14 @@ export default class GrouperWorker extends Worker {
237259
} as RepetitionDBScheme;
238260

239261
repetitionId = await this.saveRepetition(task.projectId, newRepetition);
262+
263+
/**
264+
* Clear the large event payload references to allow garbage collection
265+
* This prevents memory leaks from retaining full event objects after delta is computed
266+
*/
267+
delta = null;
268+
existedEvent.payload = null;
269+
task.payload = null;
240270
}
241271

242272
/**
@@ -334,7 +364,7 @@ export default class GrouperWorker extends Worker {
334364
* @param projectId - where to find
335365
* @param title - title of the event to find similar one
336366
*/
337-
@memoize({ max: 200, ttl: MEMOIZATION_TTL, strategy: 'hash', skipCache: [undefined] })
367+
@memoize({ max: 50, ttl: 600, strategy: 'hash', skipCache: [undefined] })
338368
private async findSimilarEvent(projectId: string, title: string): Promise<GroupedEventDBScheme | undefined> {
339369
/**
340370
* If no match by Levenshtein, try matching by patterns

0 commit comments

Comments
 (0)