Skip to content

Commit ebb98c4

Browse files
github-actions[bot]Kuchizue11syЕгор Коновалов
authored
Update prod (#555)
* feat(metrics): expose worker metrics over HTTP (#547) * perf(notifier): project only notifications field when loading rules (#551) * perf(notifier): project only notifications field when loading rules * test(notifier): update findOne assertion for projection arg * perf(paymaster): batch subscription check over workspaces (#550) * perf(paymaster): batch subscription check over workspaces * test(paymaster): cover multi-batch dispatch in subscription check * refactor(paymaster): pass batch into flush and clear it explicitly * Imp(limiter): fetch workspaces with cursor (#554) * imp(): fetch workspaces one by one * imp(): add workspace projection * imp(): fix function overload * fix(): eslint and types * fix(): missing await --------- Co-authored-by: Егор Коновалов <egorkonovalov@NB060201N01P.local> --------- Co-authored-by: Kuchizu <70284260+Kuchizu@users.noreply.github.com> Co-authored-by: e11sy <130844513+e11sy@users.noreply.github.com> Co-authored-by: Егор Коновалов <egorkonovalov@NB060201N01P.local>
1 parent 3ed9a87 commit ebb98c4

4 files changed

Lines changed: 101 additions & 47 deletions

File tree

workers/limiter/src/dbHelper.ts

Lines changed: 76 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Collection, Db, ObjectId } from 'mongodb';
22
import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
33
import { WorkspaceWithTariffPlan } from '../types';
44
import HawkCatcher from '@hawk.so/nodejs';
5-
import { CriticalError } from '../../../lib/workerErrors';
5+
import { CriticalError, NonCriticalError } from '../../../lib/workerErrors';
66

77
/**
88
* Class that implements methods used for interaction between limiter and db
@@ -35,49 +35,24 @@ export class DbHelper {
3535
}
3636

3737
/**
38-
* Method that returns all workspaces with their tariff plans
38+
* Method that yields all workspaces with their tariff plans
3939
*/
40-
public async getWorkspacesWithTariffPlans():Promise<WorkspaceWithTariffPlan[]>;
40+
public getWorkspacesWithTariffPlans(): AsyncGenerator<WorkspaceWithTariffPlan>;
4141
/**
4242
* Method that returns workspace with its tariff plan by its id
4343
*
4444
* @param id - id of the workspace to fetch
4545
*/
46-
public async getWorkspacesWithTariffPlans(id: string):Promise<WorkspaceWithTariffPlan>;
46+
public getWorkspacesWithTariffPlans(id: string): Promise<WorkspaceWithTariffPlan>;
4747
/**
48-
* Returns workspace with its tariff plan by its id
49-
*
50-
* @param id - workspace id
48+
* @param id - id of the workspace to fetch
5149
*/
52-
public async getWorkspacesWithTariffPlans(id?: string):Promise<WorkspaceWithTariffPlan[] | WorkspaceWithTariffPlan> {
53-
/* eslint-disable-next-line */
54-
const queue: any[] = [
55-
{
56-
$lookup: {
57-
from: 'plans',
58-
localField: 'tariffPlanId',
59-
foreignField: '_id',
60-
as: 'tariffPlan',
61-
},
62-
},
63-
{
64-
$unwind: {
65-
path: '$tariffPlan',
66-
},
67-
},
68-
];
69-
50+
public getWorkspacesWithTariffPlans(id?: string): AsyncGenerator<WorkspaceWithTariffPlan> | Promise<WorkspaceWithTariffPlan> {
7051
if (id !== undefined) {
71-
queue.unshift({
72-
$match: {
73-
_id: new ObjectId(id),
74-
},
75-
});
52+
return this.getOneWorkspaceWithTariffPlan(id);
7653
}
7754

78-
const workspacesArray = await this.workspacesCollection.aggregate<WorkspaceWithTariffPlan>(queue).toArray();
79-
80-
return (id !== undefined) ? workspacesArray[0] : workspacesArray;
55+
return this.yieldWorkspacesWithTariffPlans();
8156
}
8257

8358
/**
@@ -172,4 +147,72 @@ export class DbHelper {
172147

173148
return this.projectsCollection.find(query).toArray();
174149
}
150+
151+
/**
152+
* Returns a single workspace with its tariff plan by id
153+
*
154+
* @param id - workspace id
155+
*/
156+
private async getOneWorkspaceWithTariffPlan(id: string): Promise<WorkspaceWithTariffPlan> {
157+
const pipeline = [
158+
{
159+
$match: {
160+
_id: new ObjectId(id),
161+
},
162+
},
163+
...this.tariffPlanLookupPipeline(),
164+
];
165+
166+
const workspace = await this.workspacesCollection.aggregate<WorkspaceWithTariffPlan>(pipeline).next();
167+
168+
if (workspace === null) {
169+
throw new NonCriticalError(`Workspace ${id} not found`, {
170+
workspaceId: id,
171+
});
172+
}
173+
174+
return workspace;
175+
}
176+
177+
/**
178+
* Yields all workspaces with their tariff plans one by one
179+
*/
180+
private async * yieldWorkspacesWithTariffPlans(): AsyncGenerator<WorkspaceWithTariffPlan> {
181+
const pipeline = this.tariffPlanLookupPipeline();
182+
const cursor = this.workspacesCollection.aggregate<WorkspaceWithTariffPlan>(pipeline);
183+
184+
for await (const workspace of cursor) {
185+
yield workspace;
186+
}
187+
}
188+
189+
/* eslint-disable-next-line */
190+
private tariffPlanLookupPipeline(): any[] {
191+
return [
192+
{
193+
$lookup: {
194+
from: 'plans',
195+
localField: 'tariffPlanId',
196+
foreignField: '_id',
197+
as: 'tariffPlan',
198+
},
199+
},
200+
{
201+
$unwind: {
202+
path: '$tariffPlan',
203+
},
204+
},
205+
{
206+
$project: {
207+
_id: 1,
208+
name: 1,
209+
isBlocked: 1,
210+
blockedDate: 1,
211+
lastChargeDate: 1,
212+
billingPeriodEventsCount: 1,
213+
tariffPlan: 1,
214+
},
215+
},
216+
];
217+
}
175218
}

workers/limiter/src/index.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -189,16 +189,16 @@ export default class LimiterWorker extends Worker {
189189
private async handleRegularWorkspacesCheck(): Promise<void> {
190190
let message = '';
191191

192-
const workspaces = await this.dbHelper.getWorkspacesWithTariffPlans();
192+
const workspaces = this.dbHelper.getWorkspacesWithTariffPlans();
193193

194194
const updatedWorkspaces: WorkspaceWithTariffPlan[] = [];
195195

196-
await Promise.all(workspaces.map(async (workspace) => {
196+
for await (const workspace of workspaces) {
197197
/**
198198
* If workspace is already blocked - do nothing
199199
*/
200200
if (workspace.isBlocked) {
201-
return;
201+
continue;
202202
}
203203

204204
const workspaceProjects = await this.dbHelper.getProjects(workspace._id.toString());
@@ -211,7 +211,7 @@ export default class LimiterWorker extends Worker {
211211
* If there are no projects to update - move on to next workspace
212212
*/
213213
if (projectsToUpdate.length === 0) {
214-
return;
214+
continue;
215215
}
216216

217217
/**
@@ -223,12 +223,12 @@ export default class LimiterWorker extends Worker {
223223
updatedWorkspace.isBlocked = true;
224224
updatedWorkspace.blockedDate = new Date();
225225

226-
this.redis.appendBannedProjects(projectIds);
226+
await this.redis.appendBannedProjects(projectIds);
227227
message += this.formSingleWorkspaceMessage(updatedWorkspace, projectsToUpdate, 'blocked');
228228
}
229-
}));
229+
}
230230

231-
this.dbHelper.updateWorkspacesEventsCountAndIsBlocked(updatedWorkspaces);
231+
await this.dbHelper.updateWorkspacesEventsCountAndIsBlocked(updatedWorkspaces);
232232

233233
this.sendRegularReport(message);
234234
}

workers/limiter/tests/dbHelper.test.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,16 +159,22 @@ describe('DbHelper', () => {
159159
/**
160160
* Act
161161
*/
162-
const result = await dbHelper.getWorkspacesWithTariffPlans();
162+
const cursor = dbHelper.getWorkspacesWithTariffPlans();
163+
164+
const workspaces = [];
165+
166+
for await (const workspace of cursor) {
167+
workspaces.push(workspace);
168+
}
163169

164170
/**
165171
* Assert
166172
*/
167-
expect(result).toHaveLength(2);
168-
expect(result[0].tariffPlan).toBeDefined();
169-
expect(result[1].tariffPlan).toBeDefined();
170-
expect(result[0].tariffPlan.eventsLimit).toBe(10);
171-
expect(result[1].tariffPlan.eventsLimit).toBe(10000);
173+
expect(workspaces).toHaveLength(2);
174+
expect(workspaces[0].tariffPlan).toBeDefined();
175+
expect(workspaces[1].tariffPlan).toBeDefined();
176+
expect(workspaces[0].tariffPlan.eventsLimit).toBe(10);
177+
expect(workspaces[1].tariffPlan.eventsLimit).toBe(10000);
172178
});
173179

174180
test('Should return single workspace with its tariff plan by id', async () => {

workers/limiter/types/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,9 @@ import { PlanDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
33
/**
44
* Workspace with its tariff plan
55
*/
6-
export type WorkspaceWithTariffPlan = WorkspaceDBScheme & {tariffPlan: PlanDBScheme};
6+
export type WorkspaceWithTariffPlan = Pick<
7+
WorkspaceDBScheme,
8+
'_id' | 'name' | 'isBlocked' | 'blockedDate' | 'lastChargeDate' | 'billingPeriodEventsCount'
9+
> & {
10+
tariffPlan: PlanDBScheme;
11+
};

0 commit comments

Comments
 (0)