11import { Collection , Db , ObjectId } from 'mongodb' ;
2- import { ProjectDBScheme , WorkspaceDBScheme } from '@hawk.so/types' ;
2+ import { PlanDBScheme , ProjectDBScheme , WorkspaceDBScheme } from '@hawk.so/types' ;
33import { WorkspaceWithTariffPlan } from '../types' ;
44import HawkCatcher from '@hawk.so/nodejs' ;
55import { CriticalError , NonCriticalError } from '../../../lib/workerErrors' ;
66
7+ const WORKSPACE_PROJECTION = {
8+ _id : 1 ,
9+ name : 1 ,
10+ isBlocked : 1 ,
11+ blockedDate : 1 ,
12+ lastChargeDate : 1 ,
13+ billingPeriodEventsCount : 1 ,
14+ tariffPlanId : 1 ,
15+ } as const ;
16+
17+ type WorkspaceForLimiter = Pick <
18+ WorkspaceDBScheme ,
19+ '_id' | 'name' | 'isBlocked' | 'blockedDate' | 'lastChargeDate' | 'billingPeriodEventsCount' | 'tariffPlanId'
20+ > ;
21+
722/**
823 * Class that implements methods used for interaction between limiter and db
924 */
@@ -23,15 +38,49 @@ export class DbHelper {
2338 */
2439 private workspacesCollection : Collection < WorkspaceDBScheme > ;
2540
41+ /**
42+ * Collection with tariff plans
43+ */
44+ private plansCollection : Collection < PlanDBScheme > ;
45+
46+ /**
47+ * In-memory cache of tariff plans — avoids $lookup on the small plans collection per workspace
48+ */
49+ private plans : PlanDBScheme [ ] = [ ] ;
50+
51+ /**
52+ * Plan ids that were still missing after a cache refresh — don't trigger more refreshes for them
53+ */
54+ private knownMissingPlanIds : Set < string > = new Set ( ) ;
55+
2656 /**
2757 * @param projects - projects collection
2858 * @param workspaces - workspaces collection
59+ * @param plans - plans collection
2960 * @param eventsDbConnection - connection to events DB
3061 */
31- constructor ( projects : Collection < ProjectDBScheme > , workspaces : Collection < WorkspaceDBScheme > , eventsDbConnection : Db ) {
62+ constructor (
63+ projects : Collection < ProjectDBScheme > ,
64+ workspaces : Collection < WorkspaceDBScheme > ,
65+ plans : Collection < PlanDBScheme > ,
66+ eventsDbConnection : Db
67+ ) {
3268 this . eventsDbConnection = eventsDbConnection ;
3369 this . projectsCollection = projects ;
3470 this . workspacesCollection = workspaces ;
71+ this . plansCollection = plans ;
72+ }
73+
74+ /**
75+ * Fetches tariff plans from database and keeps them cached
76+ */
77+ public async fetchPlans ( ) : Promise < void > {
78+ this . plans = await this . plansCollection . find ( { } ) . toArray ( ) ;
79+ this . knownMissingPlanIds . clear ( ) ;
80+
81+ if ( this . plans . length === 0 ) {
82+ throw new CriticalError ( 'Please add tariff plans to the database' ) ;
83+ }
3584 }
3685
3786 /**
@@ -148,71 +197,98 @@ export class DbHelper {
148197 return this . projectsCollection . find ( query ) . toArray ( ) ;
149198 }
150199
200+ /**
201+ * Returns plan from cache, refetches once on miss
202+ *
203+ * @param planId - id of the plan to find
204+ */
205+ private async resolvePlan ( planId : WorkspaceDBScheme [ 'tariffPlanId' ] ) : Promise < PlanDBScheme | null > {
206+ let plan = this . findPlanById ( planId ) ;
207+
208+ if ( plan ) {
209+ return plan ;
210+ }
211+
212+ const planIdStr = planId . toString ( ) ;
213+
214+ if ( this . knownMissingPlanIds . has ( planIdStr ) ) {
215+ return null ;
216+ }
217+
218+ await this . fetchPlans ( ) ;
219+ plan = this . findPlanById ( planId ) ;
220+
221+ if ( ! plan ) {
222+ this . knownMissingPlanIds . add ( planIdStr ) ;
223+ }
224+
225+ return plan ?? null ;
226+ }
227+
228+ /**
229+ * @param planId - id of the plan to find
230+ */
231+ private findPlanById ( planId : WorkspaceDBScheme [ 'tariffPlanId' ] ) : PlanDBScheme | undefined {
232+ return this . plans . find ( ( plan ) => plan . _id . toString ( ) === planId . toString ( ) ) ;
233+ }
234+
151235 /**
152236 * Returns a single workspace with its tariff plan by id
153237 *
154238 * @param id - workspace id
155239 */
156240 private async getOneWorkspaceWithTariffPlan ( id : string ) : Promise < WorkspaceWithTariffPlan > {
157- const pipeline = [
158- {
159- $match : {
160- _id : new ObjectId ( id ) ,
161- } ,
162- } ,
163- ...this . tariffPlanLookupPipeline ( ) ,
164- ] ;
165-
166- const workspace = await this . workspacesCollection . aggregate < WorkspaceWithTariffPlan > ( pipeline ) . next ( ) ;
241+ const workspace = await this . workspacesCollection
242+ . find ( { _id : new ObjectId ( id ) } )
243+ . project < WorkspaceForLimiter > ( WORKSPACE_PROJECTION )
244+ . next ( ) ;
167245
168246 if ( workspace === null ) {
169247 throw new NonCriticalError ( `Workspace ${ id } not found` , {
170248 workspaceId : id ,
171249 } ) ;
172250 }
173251
174- return workspace ;
252+ const plan = await this . resolvePlan ( workspace . tariffPlanId ) ;
253+
254+ if ( ! plan ) {
255+ throw new NonCriticalError ( `Tariff plan ${ workspace . tariffPlanId . toString ( ) } not found for workspace ${ id } ` , {
256+ workspaceId : id ,
257+ } ) ;
258+ }
259+
260+ return {
261+ ...workspace ,
262+ tariffPlan : plan ,
263+ } ;
175264 }
176265
177266 /**
178267 * Yields all workspaces with their tariff plans one by one
179268 */
180269 private async * yieldWorkspacesWithTariffPlans ( ) : AsyncGenerator < WorkspaceWithTariffPlan > {
181- const pipeline = this . tariffPlanLookupPipeline ( ) ;
182- const cursor = this . workspacesCollection . aggregate < WorkspaceWithTariffPlan > ( pipeline ) ;
270+ const cursor = this . workspacesCollection
271+ . find ( { } )
272+ . project < WorkspaceForLimiter > ( WORKSPACE_PROJECTION ) ;
183273
184274 for await ( const workspace of cursor ) {
185- yield workspace ;
186- }
187- }
275+ const plan = await this . resolvePlan ( workspace . tariffPlanId ) ;
188276
189- /* eslint-disable-next-line */
190- private tariffPlanLookupPipeline ( ) : any [ ] {
191- return [
192- {
193- $lookup : {
194- from : 'plans' ,
195- localField : 'tariffPlanId' ,
196- foreignField : '_id' ,
197- as : 'tariffPlan' ,
198- } ,
199- } ,
200- {
201- $unwind : {
202- path : '$tariffPlan' ,
203- } ,
204- } ,
205- {
206- $project : {
207- _id : 1 ,
208- name : 1 ,
209- isBlocked : 1 ,
210- blockedDate : 1 ,
211- lastChargeDate : 1 ,
212- billingPeriodEventsCount : 1 ,
213- tariffPlan : 1 ,
214- } ,
215- } ,
216- ] ;
277+ if ( ! plan ) {
278+ HawkCatcher . send (
279+ new Error ( `[Limiter] Tariff plan not found for workspace` ) ,
280+ {
281+ workspaceId : workspace . _id . toString ( ) ,
282+ tariffPlanId : workspace . tariffPlanId ?. toString ( ) ,
283+ }
284+ ) ;
285+ continue ;
286+ }
287+
288+ yield {
289+ ...workspace ,
290+ tariffPlan : plan ,
291+ } ;
292+ }
217293 }
218- }
294+ }
0 commit comments