Skip to content

Commit 2f0bad0

Browse files
authored
perf(paymaster): batch subscription check over workspaces (#550)
* perf(paymaster): batch subscription check over workspaces * test(paymaster): cover multi-batch dispatch in subscription check * refactor(paymaster): pass batch into flush and clear it explicitly
1 parent 5885e77 commit 2f0bad0

2 files changed

Lines changed: 108 additions & 13 deletions

File tree

workers/paymaster/src/index.ts

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,33 @@ const DAYS_LEFT_ALERT = [3, 2, 1];
3434
// eslint-disable-next-line @typescript-eslint/no-magic-numbers
3535
const DAYS_AFTER_BLOCK_TO_REMIND = [1, 2, 3, 5, 7, 30];
3636

37+
/**
38+
* Bounds concurrent updateOne / addTask calls per subscription check tick.
39+
*/
40+
const WORKSPACE_PROCESSING_CONCURRENCY = 25;
41+
42+
const WORKSPACE_CURSOR_BATCH_SIZE = 200;
43+
44+
/**
45+
* Keep in sync with fields read by `processWorkspaceSubscriptionCheck` and its helpers.
46+
*/
47+
const WORKSPACE_SUBSCRIPTION_PROJECTION = {
48+
_id: 1,
49+
name: 1,
50+
tariffPlanId: 1,
51+
lastChargeDate: 1,
52+
paidUntil: 1,
53+
isDebug: 1,
54+
isBlocked: 1,
55+
blockedDate: 1,
56+
subscriptionId: 1,
57+
} as const;
58+
59+
const PLAN_PROJECTION = {
60+
_id: 1,
61+
monthlyCharge: 1,
62+
} as const;
63+
3764
/**
3865
* Worker to check workspaces subscription status and ban workspaces without actual subscription
3966
*/
@@ -151,7 +178,10 @@ export default class PaymasterWorker extends Worker {
151178
throw new Error('Plans collection is not initialized');
152179
}
153180

154-
this.plans = await this.plansCollection.find({}).toArray();
181+
this.plans = await this.plansCollection
182+
.find({})
183+
.project<PlanDBScheme>(PLAN_PROJECTION)
184+
.toArray();
155185

156186
if (this.plans.length === 0) {
157187
throw new Error('Please add tariff plans to the database');
@@ -195,28 +225,45 @@ export default class PaymasterWorker extends Worker {
195225
* Called periodically, enumerate through workspaces and check if today is a payday for workspace subscription
196226
*/
197227
private async handleWorkspaceSubscriptionCheckEvent(): Promise<void> {
198-
const workspaces = await this.workspaces.find({}).toArray();
228+
const cursor = this.workspaces
229+
.find({})
230+
.project<WorkspaceDBScheme>(WORKSPACE_SUBSCRIPTION_PROJECTION)
231+
.batchSize(WORKSPACE_CURSOR_BATCH_SIZE);
232+
233+
let batch: WorkspaceDBScheme[] = [];
199234

200-
await Promise.all(workspaces
201-
.filter(workspace => {
235+
const flush = async (currentBatch: WorkspaceDBScheme[]): Promise<void> => {
236+
if (currentBatch.length === 0) {
237+
return;
238+
}
239+
240+
await Promise.all(currentBatch.map((workspace) => this.processWorkspaceSubscriptionCheck(workspace)));
241+
};
242+
243+
try {
244+
for await (const workspace of cursor) {
202245
/**
203246
* Skip workspace without lastChargeDate
204247
*/
205248
if (!workspace.lastChargeDate) {
206-
const error = new Error('[Paymaster] Workspace without lastChargeDate detected');
207-
208-
HawkCatcher.send(error, {
249+
HawkCatcher.send(new Error('[Paymaster] Workspace without lastChargeDate detected'), {
209250
workspaceId: workspace._id.toString(),
210251
});
252+
continue;
253+
}
254+
255+
batch.push(workspace);
211256

212-
return false;
257+
if (batch.length >= WORKSPACE_PROCESSING_CONCURRENCY) {
258+
await flush(batch);
259+
batch = [];
213260
}
261+
}
214262

215-
return true;
216-
})
217-
.map(
218-
(workspace) => this.processWorkspaceSubscriptionCheck(workspace)
219-
));
263+
await flush(batch);
264+
} finally {
265+
await cursor.close();
266+
}
220267
}
221268

222269
/**

workers/paymaster/tests/index.test.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,54 @@ describe('PaymasterWorker', () => {
794794
MockDate.reset();
795795
});
796796

797+
test('Should process every workspace when there are several batches', async () => {
798+
/**
799+
* 50 > WORKSPACE_PROCESSING_CONCURRENCY (25), so the subscription check
800+
* has to flush more than one batch.
801+
*/
802+
const WORKSPACES_COUNT = 50;
803+
const currentDate = new Date('2005-12-22');
804+
const plan = createPlanMock({
805+
monthlyCharge: 0,
806+
isDefault: true,
807+
});
808+
809+
const workspaces = Array.from({ length: WORKSPACES_COUNT }, () =>
810+
createWorkspaceMock({
811+
plan,
812+
subscriptionId: null,
813+
lastChargeDate: new Date('2005-11-22'),
814+
isBlocked: false,
815+
billingPeriodEventsCount: 0,
816+
})
817+
);
818+
819+
await tariffCollection.insertOne(plan);
820+
await workspacesCollection.insertMany(workspaces);
821+
822+
MockDate.set(currentDate);
823+
824+
const worker = new PaymasterWorker();
825+
const processSpy = jest
826+
.spyOn(worker as any, 'processWorkspaceSubscriptionCheck')
827+
.mockResolvedValue([null, false]);
828+
829+
await worker.start();
830+
await worker.handle(WORKSPACE_SUBSCRIPTION_CHECK);
831+
await worker.finish();
832+
833+
expect(processSpy).toHaveBeenCalledTimes(WORKSPACES_COUNT);
834+
835+
const calledIds = processSpy.mock.calls
836+
.map((call) => (call[0] as WorkspaceDBScheme)._id.toString())
837+
.sort();
838+
const expectedIds = workspaces.map((w) => w._id.toString()).sort();
839+
840+
expect(calledIds).toEqual(expectedIds);
841+
842+
MockDate.reset();
843+
});
844+
797845
afterAll(async () => {
798846
await connection.close();
799847
MockDate.reset();

0 commit comments

Comments
 (0)