Skip to content

Commit a276b9c

Browse files
authored
Merge branch 'master' into feat/grouper-slow-diagnostics
2 parents f6dfe05 + 8f68626 commit a276b9c

6 files changed

Lines changed: 209 additions & 60 deletions

File tree

workers/limiter/src/dbHelper.ts

Lines changed: 76 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Collection, Db, ObjectId } from 'mongodb';
22
import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
33
import { WorkspaceWithTariffPlan } from '../types';
44
import HawkCatcher from '@hawk.so/nodejs';
5-
import { CriticalError } from '../../../lib/workerErrors';
5+
import { CriticalError, NonCriticalError } from '../../../lib/workerErrors';
66

77
/**
88
* Class that implements methods used for interaction between limiter and db
@@ -35,49 +35,24 @@ export class DbHelper {
3535
}
3636

3737
/**
38-
* Method that returns all workspaces with their tariff plans
38+
* Method that yields all workspaces with their tariff plans
3939
*/
40-
public async getWorkspacesWithTariffPlans():Promise<WorkspaceWithTariffPlan[]>;
40+
public getWorkspacesWithTariffPlans(): AsyncGenerator<WorkspaceWithTariffPlan>;
4141
/**
4242
* Method that returns workspace with its tariff plan by its id
4343
*
4444
* @param id - id of the workspace to fetch
4545
*/
46-
public async getWorkspacesWithTariffPlans(id: string):Promise<WorkspaceWithTariffPlan>;
46+
public getWorkspacesWithTariffPlans(id: string): Promise<WorkspaceWithTariffPlan>;
4747
/**
48-
* Returns workspace with its tariff plan by its id
49-
*
50-
* @param id - workspace id
48+
* @param id - id of the workspace to fetch
5149
*/
52-
public async getWorkspacesWithTariffPlans(id?: string):Promise<WorkspaceWithTariffPlan[] | WorkspaceWithTariffPlan> {
53-
/* eslint-disable-next-line */
54-
const queue: any[] = [
55-
{
56-
$lookup: {
57-
from: 'plans',
58-
localField: 'tariffPlanId',
59-
foreignField: '_id',
60-
as: 'tariffPlan',
61-
},
62-
},
63-
{
64-
$unwind: {
65-
path: '$tariffPlan',
66-
},
67-
},
68-
];
69-
50+
public getWorkspacesWithTariffPlans(id?: string): AsyncGenerator<WorkspaceWithTariffPlan> | Promise<WorkspaceWithTariffPlan> {
7051
if (id !== undefined) {
71-
queue.unshift({
72-
$match: {
73-
_id: new ObjectId(id),
74-
},
75-
});
52+
return this.getOneWorkspaceWithTariffPlan(id);
7653
}
7754

78-
const workspacesArray = await this.workspacesCollection.aggregate<WorkspaceWithTariffPlan>(queue).toArray();
79-
80-
return (id !== undefined) ? workspacesArray[0] : workspacesArray;
55+
return this.yieldWorkspacesWithTariffPlans();
8156
}
8257

8358
/**
@@ -172,4 +147,72 @@ export class DbHelper {
172147

173148
return this.projectsCollection.find(query).toArray();
174149
}
150+
151+
/**
152+
* Returns a single workspace with its tariff plan by id
153+
*
154+
* @param id - workspace id
155+
*/
156+
private async getOneWorkspaceWithTariffPlan(id: string): Promise<WorkspaceWithTariffPlan> {
157+
const pipeline = [
158+
{
159+
$match: {
160+
_id: new ObjectId(id),
161+
},
162+
},
163+
...this.tariffPlanLookupPipeline(),
164+
];
165+
166+
const workspace = await this.workspacesCollection.aggregate<WorkspaceWithTariffPlan>(pipeline).next();
167+
168+
if (workspace === null) {
169+
throw new NonCriticalError(`Workspace ${id} not found`, {
170+
workspaceId: id,
171+
});
172+
}
173+
174+
return workspace;
175+
}
176+
177+
/**
178+
* Yields all workspaces with their tariff plans one by one
179+
*/
180+
private async * yieldWorkspacesWithTariffPlans(): AsyncGenerator<WorkspaceWithTariffPlan> {
181+
const pipeline = this.tariffPlanLookupPipeline();
182+
const cursor = this.workspacesCollection.aggregate<WorkspaceWithTariffPlan>(pipeline);
183+
184+
for await (const workspace of cursor) {
185+
yield workspace;
186+
}
187+
}
188+
189+
/* eslint-disable-next-line */
190+
private tariffPlanLookupPipeline(): any[] {
191+
return [
192+
{
193+
$lookup: {
194+
from: 'plans',
195+
localField: 'tariffPlanId',
196+
foreignField: '_id',
197+
as: 'tariffPlan',
198+
},
199+
},
200+
{
201+
$unwind: {
202+
path: '$tariffPlan',
203+
},
204+
},
205+
{
206+
$project: {
207+
_id: 1,
208+
name: 1,
209+
isBlocked: 1,
210+
blockedDate: 1,
211+
lastChargeDate: 1,
212+
billingPeriodEventsCount: 1,
213+
tariffPlan: 1,
214+
},
215+
},
216+
];
217+
}
175218
}

workers/limiter/src/index.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -189,16 +189,16 @@ export default class LimiterWorker extends Worker {
189189
private async handleRegularWorkspacesCheck(): Promise<void> {
190190
let message = '';
191191

192-
const workspaces = await this.dbHelper.getWorkspacesWithTariffPlans();
192+
const workspaces = this.dbHelper.getWorkspacesWithTariffPlans();
193193

194194
const updatedWorkspaces: WorkspaceWithTariffPlan[] = [];
195195

196-
await Promise.all(workspaces.map(async (workspace) => {
196+
for await (const workspace of workspaces) {
197197
/**
198198
* If workspace is already blocked - do nothing
199199
*/
200200
if (workspace.isBlocked) {
201-
return;
201+
continue;
202202
}
203203

204204
const workspaceProjects = await this.dbHelper.getProjects(workspace._id.toString());
@@ -211,7 +211,7 @@ export default class LimiterWorker extends Worker {
211211
* If there are no projects to update - move on to next workspace
212212
*/
213213
if (projectsToUpdate.length === 0) {
214-
return;
214+
continue;
215215
}
216216

217217
/**
@@ -223,12 +223,12 @@ export default class LimiterWorker extends Worker {
223223
updatedWorkspace.isBlocked = true;
224224
updatedWorkspace.blockedDate = new Date();
225225

226-
this.redis.appendBannedProjects(projectIds);
226+
await this.redis.appendBannedProjects(projectIds);
227227
message += this.formSingleWorkspaceMessage(updatedWorkspace, projectsToUpdate, 'blocked');
228228
}
229-
}));
229+
}
230230

231-
this.dbHelper.updateWorkspacesEventsCountAndIsBlocked(updatedWorkspaces);
231+
await this.dbHelper.updateWorkspacesEventsCountAndIsBlocked(updatedWorkspaces);
232232

233233
this.sendRegularReport(message);
234234
}

workers/limiter/tests/dbHelper.test.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,16 +159,22 @@ describe('DbHelper', () => {
159159
/**
160160
* Act
161161
*/
162-
const result = await dbHelper.getWorkspacesWithTariffPlans();
162+
const cursor = dbHelper.getWorkspacesWithTariffPlans();
163+
164+
const workspaces = [];
165+
166+
for await (const workspace of cursor) {
167+
workspaces.push(workspace);
168+
}
163169

164170
/**
165171
* Assert
166172
*/
167-
expect(result).toHaveLength(2);
168-
expect(result[0].tariffPlan).toBeDefined();
169-
expect(result[1].tariffPlan).toBeDefined();
170-
expect(result[0].tariffPlan.eventsLimit).toBe(10);
171-
expect(result[1].tariffPlan.eventsLimit).toBe(10000);
173+
expect(workspaces).toHaveLength(2);
174+
expect(workspaces[0].tariffPlan).toBeDefined();
175+
expect(workspaces[1].tariffPlan).toBeDefined();
176+
expect(workspaces[0].tariffPlan.eventsLimit).toBe(10);
177+
expect(workspaces[1].tariffPlan.eventsLimit).toBe(10000);
172178
});
173179

174180
test('Should return single workspace with its tariff plan by id', async () => {

workers/limiter/types/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,9 @@ import { PlanDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
33
/**
44
* Workspace with its tariff plan
55
*/
6-
export type WorkspaceWithTariffPlan = WorkspaceDBScheme & {tariffPlan: PlanDBScheme};
6+
export type WorkspaceWithTariffPlan = Pick<
7+
WorkspaceDBScheme,
8+
'_id' | 'name' | 'isBlocked' | 'blockedDate' | 'lastChargeDate' | 'billingPeriodEventsCount'
9+
> & {
10+
tariffPlan: PlanDBScheme;
11+
};

workers/paymaster/src/index.ts

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,33 @@ const DAYS_LEFT_ALERT = [3, 2, 1];
3434
// eslint-disable-next-line @typescript-eslint/no-magic-numbers
3535
const DAYS_AFTER_BLOCK_TO_REMIND = [1, 2, 3, 5, 7, 30];
3636

37+
/**
38+
* Bounds concurrent updateOne / addTask calls per subscription check tick.
39+
*/
40+
const WORKSPACE_PROCESSING_CONCURRENCY = 25;
41+
42+
const WORKSPACE_CURSOR_BATCH_SIZE = 200;
43+
44+
/**
45+
* Keep in sync with fields read by `processWorkspaceSubscriptionCheck` and its helpers.
46+
*/
47+
const WORKSPACE_SUBSCRIPTION_PROJECTION = {
48+
_id: 1,
49+
name: 1,
50+
tariffPlanId: 1,
51+
lastChargeDate: 1,
52+
paidUntil: 1,
53+
isDebug: 1,
54+
isBlocked: 1,
55+
blockedDate: 1,
56+
subscriptionId: 1,
57+
} as const;
58+
59+
const PLAN_PROJECTION = {
60+
_id: 1,
61+
monthlyCharge: 1,
62+
} as const;
63+
3764
/**
3865
* Worker to check workspaces subscription status and ban workspaces without actual subscription
3966
*/
@@ -151,7 +178,10 @@ export default class PaymasterWorker extends Worker {
151178
throw new Error('Plans collection is not initialized');
152179
}
153180

154-
this.plans = await this.plansCollection.find({}).toArray();
181+
this.plans = await this.plansCollection
182+
.find({})
183+
.project<PlanDBScheme>(PLAN_PROJECTION)
184+
.toArray();
155185

156186
if (this.plans.length === 0) {
157187
throw new Error('Please add tariff plans to the database');
@@ -195,28 +225,45 @@ export default class PaymasterWorker extends Worker {
195225
* Called periodically, enumerate through workspaces and check if today is a payday for workspace subscription
196226
*/
197227
private async handleWorkspaceSubscriptionCheckEvent(): Promise<void> {
198-
const workspaces = await this.workspaces.find({}).toArray();
228+
const cursor = this.workspaces
229+
.find({})
230+
.project<WorkspaceDBScheme>(WORKSPACE_SUBSCRIPTION_PROJECTION)
231+
.batchSize(WORKSPACE_CURSOR_BATCH_SIZE);
232+
233+
let batch: WorkspaceDBScheme[] = [];
199234

200-
await Promise.all(workspaces
201-
.filter(workspace => {
235+
const flush = async (currentBatch: WorkspaceDBScheme[]): Promise<void> => {
236+
if (currentBatch.length === 0) {
237+
return;
238+
}
239+
240+
await Promise.all(currentBatch.map((workspace) => this.processWorkspaceSubscriptionCheck(workspace)));
241+
};
242+
243+
try {
244+
for await (const workspace of cursor) {
202245
/**
203246
* Skip workspace without lastChargeDate
204247
*/
205248
if (!workspace.lastChargeDate) {
206-
const error = new Error('[Paymaster] Workspace without lastChargeDate detected');
207-
208-
HawkCatcher.send(error, {
249+
HawkCatcher.send(new Error('[Paymaster] Workspace without lastChargeDate detected'), {
209250
workspaceId: workspace._id.toString(),
210251
});
252+
continue;
253+
}
254+
255+
batch.push(workspace);
211256

212-
return false;
257+
if (batch.length >= WORKSPACE_PROCESSING_CONCURRENCY) {
258+
await flush(batch);
259+
batch = [];
213260
}
261+
}
214262

215-
return true;
216-
})
217-
.map(
218-
(workspace) => this.processWorkspaceSubscriptionCheck(workspace)
219-
));
263+
await flush(batch);
264+
} finally {
265+
await cursor.close();
266+
}
220267
}
221268

222269
/**

workers/paymaster/tests/index.test.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,54 @@ describe('PaymasterWorker', () => {
794794
MockDate.reset();
795795
});
796796

797+
test('Should process every workspace when there are several batches', async () => {
798+
/**
799+
* 50 > WORKSPACE_PROCESSING_CONCURRENCY (25), so the subscription check
800+
* has to flush more than one batch.
801+
*/
802+
const WORKSPACES_COUNT = 50;
803+
const currentDate = new Date('2005-12-22');
804+
const plan = createPlanMock({
805+
monthlyCharge: 0,
806+
isDefault: true,
807+
});
808+
809+
const workspaces = Array.from({ length: WORKSPACES_COUNT }, () =>
810+
createWorkspaceMock({
811+
plan,
812+
subscriptionId: null,
813+
lastChargeDate: new Date('2005-11-22'),
814+
isBlocked: false,
815+
billingPeriodEventsCount: 0,
816+
})
817+
);
818+
819+
await tariffCollection.insertOne(plan);
820+
await workspacesCollection.insertMany(workspaces);
821+
822+
MockDate.set(currentDate);
823+
824+
const worker = new PaymasterWorker();
825+
const processSpy = jest
826+
.spyOn(worker as any, 'processWorkspaceSubscriptionCheck')
827+
.mockResolvedValue([null, false]);
828+
829+
await worker.start();
830+
await worker.handle(WORKSPACE_SUBSCRIPTION_CHECK);
831+
await worker.finish();
832+
833+
expect(processSpy).toHaveBeenCalledTimes(WORKSPACES_COUNT);
834+
835+
const calledIds = processSpy.mock.calls
836+
.map((call) => (call[0] as WorkspaceDBScheme)._id.toString())
837+
.sort();
838+
const expectedIds = workspaces.map((w) => w._id.toString()).sort();
839+
840+
expect(calledIds).toEqual(expectedIds);
841+
842+
MockDate.reset();
843+
});
844+
797845
afterAll(async () => {
798846
await connection.close();
799847
MockDate.reset();

0 commit comments

Comments
 (0)