Skip to content

Commit d2c6fae

Browse files
committed
perf(limiter): cache plans in memory instead of $lookup per workspace
1 parent 8f68626 commit d2c6fae

3 files changed

Lines changed: 114 additions & 50 deletions

File tree

workers/limiter/src/dbHelper.ts

Lines changed: 107 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,24 @@
11
import { Collection, Db, ObjectId } from 'mongodb';
2-
import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
2+
import { PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
33
import { WorkspaceWithTariffPlan } from '../types';
44
import HawkCatcher from '@hawk.so/nodejs';
55
import { CriticalError, NonCriticalError } from '../../../lib/workerErrors';
66

7+
const WORKSPACE_PROJECTION = {
8+
_id: 1,
9+
name: 1,
10+
isBlocked: 1,
11+
blockedDate: 1,
12+
lastChargeDate: 1,
13+
billingPeriodEventsCount: 1,
14+
tariffPlanId: 1,
15+
} as const;
16+
17+
type WorkspaceForLimiter = Pick<
18+
WorkspaceDBScheme,
19+
'_id' | 'name' | 'isBlocked' | 'blockedDate' | 'lastChargeDate' | 'billingPeriodEventsCount' | 'tariffPlanId'
20+
>;
21+
722
/**
823
* Class that implements methods used for interaction between limiter and db
924
*/
@@ -23,15 +38,43 @@ export class DbHelper {
2338
*/
2439
private workspacesCollection: Collection<WorkspaceDBScheme>;
2540

41+
/**
42+
* Collection with tariff plans
43+
*/
44+
private plansCollection: Collection<PlanDBScheme>;
45+
46+
/**
47+
* In-memory cache of tariff plans — avoids $lookup on the small plans collection per workspace
48+
*/
49+
private plans: PlanDBScheme[] = [];
50+
2651
/**
2752
* @param projects - projects collection
2853
* @param workspaces - workspaces collection
54+
* @param plans - plans collection
2955
* @param eventsDbConnection - connection to events DB
3056
*/
31-
constructor(projects: Collection<ProjectDBScheme>, workspaces: Collection<WorkspaceDBScheme>, eventsDbConnection: Db) {
57+
constructor(
58+
projects: Collection<ProjectDBScheme>,
59+
workspaces: Collection<WorkspaceDBScheme>,
60+
plans: Collection<PlanDBScheme>,
61+
eventsDbConnection: Db
62+
) {
3263
this.eventsDbConnection = eventsDbConnection;
3364
this.projectsCollection = projects;
3465
this.workspacesCollection = workspaces;
66+
this.plansCollection = plans;
67+
}
68+
69+
/**
70+
* Fetches tariff plans from database and keeps them cached
71+
*/
72+
public async fetchPlans(): Promise<void> {
73+
this.plans = await this.plansCollection.find({}).toArray();
74+
75+
if (this.plans.length === 0) {
76+
throw new CriticalError('Please add tariff plans to the database');
77+
}
3578
}
3679

3780
/**
@@ -148,71 +191,88 @@ export class DbHelper {
148191
return this.projectsCollection.find(query).toArray();
149192
}
150193

194+
/**
195+
* Returns plan from cache, refetches once on miss
196+
*
197+
* @param planId - id of the plan to find
198+
*/
199+
private async resolvePlan(planId: WorkspaceDBScheme['tariffPlanId']): Promise<PlanDBScheme | null> {
200+
let plan = this.findPlanById(planId);
201+
202+
if (plan) {
203+
return plan;
204+
}
205+
206+
await this.fetchPlans();
207+
plan = this.findPlanById(planId);
208+
209+
return plan ?? null;
210+
}
211+
212+
/**
213+
* @param planId - id of the plan to find
214+
*/
215+
private findPlanById(planId: WorkspaceDBScheme['tariffPlanId']): PlanDBScheme | undefined {
216+
return this.plans.find((plan) => plan._id.toString() === planId.toString());
217+
}
218+
151219
/**
152220
* Returns a single workspace with its tariff plan by id
153221
*
154222
* @param id - workspace id
155223
*/
156224
private async getOneWorkspaceWithTariffPlan(id: string): Promise<WorkspaceWithTariffPlan> {
157-
const pipeline = [
158-
{
159-
$match: {
160-
_id: new ObjectId(id),
161-
},
162-
},
163-
...this.tariffPlanLookupPipeline(),
164-
];
165-
166-
const workspace = await this.workspacesCollection.aggregate<WorkspaceWithTariffPlan>(pipeline).next();
225+
const workspace = await this.workspacesCollection
226+
.find({ _id: new ObjectId(id) })
227+
.project<WorkspaceForLimiter>(WORKSPACE_PROJECTION)
228+
.next();
167229

168230
if (workspace === null) {
169231
throw new NonCriticalError(`Workspace ${id} not found`, {
170232
workspaceId: id,
171233
});
172234
}
173235

174-
return workspace;
236+
const plan = await this.resolvePlan(workspace.tariffPlanId);
237+
238+
if (!plan) {
239+
throw new NonCriticalError(`Tariff plan ${workspace.tariffPlanId.toString()} not found for workspace ${id}`, {
240+
workspaceId: id,
241+
});
242+
}
243+
244+
return {
245+
...workspace,
246+
tariffPlan: plan,
247+
};
175248
}
176249

177250
/**
178251
* Yields all workspaces with their tariff plans one by one
179252
*/
180253
private async * yieldWorkspacesWithTariffPlans(): AsyncGenerator<WorkspaceWithTariffPlan> {
181-
const pipeline = this.tariffPlanLookupPipeline();
182-
const cursor = this.workspacesCollection.aggregate<WorkspaceWithTariffPlan>(pipeline);
254+
const cursor = this.workspacesCollection
255+
.find({})
256+
.project<WorkspaceForLimiter>(WORKSPACE_PROJECTION);
183257

184258
for await (const workspace of cursor) {
185-
yield workspace;
186-
}
187-
}
259+
const plan = await this.resolvePlan(workspace.tariffPlanId);
188260

189-
/* eslint-disable-next-line */
190-
private tariffPlanLookupPipeline(): any[] {
191-
return [
192-
{
193-
$lookup: {
194-
from: 'plans',
195-
localField: 'tariffPlanId',
196-
foreignField: '_id',
197-
as: 'tariffPlan',
198-
},
199-
},
200-
{
201-
$unwind: {
202-
path: '$tariffPlan',
203-
},
204-
},
205-
{
206-
$project: {
207-
_id: 1,
208-
name: 1,
209-
isBlocked: 1,
210-
blockedDate: 1,
211-
lastChargeDate: 1,
212-
billingPeriodEventsCount: 1,
213-
tariffPlan: 1,
214-
},
215-
},
216-
];
261+
if (!plan) {
262+
HawkCatcher.send(
263+
new Error(`[Limiter] Tariff plan not found for workspace`),
264+
{
265+
workspaceId: workspace._id.toString(),
266+
tariffPlanId: workspace.tariffPlanId?.toString(),
267+
}
268+
);
269+
continue;
270+
}
271+
272+
yield {
273+
...workspace,
274+
tariffPlan: plan,
275+
};
276+
}
217277
}
218-
}
278+
}

workers/limiter/src/index.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { Worker } from '../../../lib/worker';
33
import * as pkg from '../package.json';
44
import * as path from 'path';
55
import * as dotenv from 'dotenv';
6-
import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
6+
import { PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
77
import HawkCatcher from '@hawk.so/nodejs';
88
import { MS_IN_SEC } from '../../../lib/utils/consts';
99
import LimiterEvent, { BlockWorkspaceEvent, UnblockWorkspaceEvent } from '../types/eventTypes';
@@ -68,8 +68,11 @@ export default class LimiterWorker extends Worker {
6868

6969
const projectsCollection = accountDbConnection.collection<ProjectDBScheme>('projects');
7070
const workspacesCollection = accountDbConnection.collection<WorkspaceDBScheme>('workspaces');
71+
const plansCollection = accountDbConnection.collection<PlanDBScheme>('plans');
7172

72-
this.dbHelper = new DbHelper(projectsCollection, workspacesCollection, eventsDbConnection);
73+
this.dbHelper = new DbHelper(projectsCollection, workspacesCollection, plansCollection, eventsDbConnection);
74+
75+
await this.dbHelper.fetchPlans();
7376

7477
await this.redis.initialize();
7578

workers/limiter/tests/dbHelper.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ describe('DbHelper', () => {
130130
await planCollection.deleteMany({});
131131
await planCollection.insertMany(Object.values(mockedPlans));
132132

133-
dbHelper = new DbHelper(projectCollection, workspaceCollection, db);
133+
dbHelper = new DbHelper(projectCollection, workspaceCollection, planCollection, db);
134+
await dbHelper.fetchPlans();
134135
}, 30000); // 30 seconds timeout for MongoDB connection and setup
135136

136137
beforeEach(async () => {

0 commit comments

Comments
 (0)