Skip to content

Commit fcbe44b

Browse files
committed
refactor(usage): streamline usage invalidation event emissions
- Replaced direct event emissions with a dedicated emitUsageInvalidated function across BackupUseCase, FileUseCases, and FolderUseCases for better consistency and maintainability. - Updated the UsageInvalidatedEvent structure to use an interface, enhancing type safety. - Adjusted event handler to utilize a deduplication strategy for usage events, improving performance and reliability.
1 parent a9a4329 commit fcbe44b

10 files changed

Lines changed: 1460 additions & 1623 deletions

File tree

src/modules/backups/backup.usecase.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import { type DevicePlatform } from './device.domain';
2222
import { type UpdateDeviceAndFolderDto } from './dto/update-device-and-folder.dto';
2323
import { SequelizeFolderRepository } from '../folder/folder.repository';
2424
import { EventEmitter2 } from '@nestjs/event-emitter';
25-
import { UsageInvalidatedEvent } from '../usage-queue/events/usage-invalidated.event';
25+
import { emitUsageInvalidated } from '../usage-queue/events/usage-invalidated.event';
2626

2727
@Injectable()
2828
export class BackupUseCase {
@@ -80,9 +80,11 @@ export class BackupUseCase {
8080
id: deviceId,
8181
});
8282

83-
this.eventEmitter.emit(
84-
'usage.backup.device_deleted',
85-
new UsageInvalidatedEvent(user.uuid, user.id, 'backup.device.delete'),
83+
emitUsageInvalidated(
84+
this.eventEmitter,
85+
user.uuid,
86+
user.id,
87+
'backup.device.delete',
8688
);
8789

8890
return result;
@@ -479,9 +481,11 @@ export class BackupUseCase {
479481
backupId,
480482
);
481483

482-
this.eventEmitter.emit(
483-
'usage.backup.deleted',
484-
new UsageInvalidatedEvent(user.uuid, user.id, 'backup.delete'),
484+
emitUsageInvalidated(
485+
this.eventEmitter,
486+
user.uuid,
487+
user.id,
488+
'backup.delete',
485489
);
486490

487491
return result;

src/modules/file/actions/create-file-version.action.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { SequelizeFileRepository } from '../file.repository';
66
import { SequelizeFileVersionRepository } from '../file-version.repository';
77
import { FileVersionStatus } from '../file-version.domain';
88
import { FeatureLimitService } from '../../feature-limit/feature-limit.service';
9-
import { UsageInvalidatedEvent } from '../../usage-queue/events/usage-invalidated.event';
9+
import { emitUsageInvalidated } from '../../usage-queue/events/usage-invalidated.event';
1010
import { Time } from '../../../lib/time';
1111

1212
@Injectable()
@@ -44,9 +44,11 @@ export class CreateFileVersionAction {
4444
}),
4545
]);
4646

47-
this.eventEmitter.emit(
48-
'usage.file.version_created',
49-
new UsageInvalidatedEvent(user.uuid, user.id, 'file.version.create'),
47+
emitUsageInvalidated(
48+
this.eventEmitter,
49+
user.uuid,
50+
user.id,
51+
'file.version.create',
5052
);
5153
}
5254

src/modules/file/actions/delete-file-version.action.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { SequelizeFileVersionRepository } from '../file-version.repository';
99
import { SequelizeFileRepository } from '../file.repository';
1010
import { type User } from '../../user/user.domain';
1111
import { FileVersionStatus } from '../file-version.domain';
12-
import { UsageInvalidatedEvent } from '../../usage-queue/events/usage-invalidated.event';
12+
import { emitUsageInvalidated } from '../../usage-queue/events/usage-invalidated.event';
1313

1414
@Injectable()
1515
export class DeleteFileVersionAction {
@@ -49,9 +49,11 @@ export class DeleteFileVersionAction {
4949
FileVersionStatus.DELETED,
5050
);
5151

52-
this.eventEmitter.emit(
53-
'usage.file.version_deleted',
54-
new UsageInvalidatedEvent(user.uuid, user.id, 'file.version.delete'),
52+
emitUsageInvalidated(
53+
this.eventEmitter,
54+
user.uuid,
55+
user.id,
56+
'file.version.delete',
5557
);
5658
}
5759
}

src/modules/file/file.usecase.ts

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ import { RedisService } from '../../externals/redis/redis.service';
5555
import { type Usage } from '../usage/usage.domain';
5656
import { CacheManagerService } from '../cache-manager/cache-manager.service';
5757
import { EventEmitter2 } from '@nestjs/event-emitter';
58-
import { UsageInvalidatedEvent } from '../usage-queue/events/usage-invalidated.event';
58+
import { emitUsageInvalidated } from '../usage-queue/events/usage-invalidated.event';
5959
import { PaymentRequiredException } from '../feature-limit/exceptions/payment-required.exception';
6060
import {
6161
DeleteFileVersionAction,
@@ -216,10 +216,7 @@ export class FileUseCases {
216216

217217
await this.fileRepository.deleteFilesByUser(user, [file]);
218218

219-
this.eventEmitter.emit(
220-
'usage.file.deleted',
221-
new UsageInvalidatedEvent(user.uuid, user.id, 'file.delete'),
222-
);
219+
emitUsageInvalidated(this.eventEmitter, user.uuid, user.id, 'file.delete');
223220

224221
return { id, uuid };
225222
}
@@ -368,10 +365,7 @@ export class FileUseCases {
368365
creationTime: newFileDto.creationTime || newFileDto.date || new Date(),
369366
});
370367

371-
this.eventEmitter.emit(
372-
'usage.file.created',
373-
new UsageInvalidatedEvent(user.uuid, user.id, 'file.create'),
374-
);
368+
emitUsageInvalidated(this.eventEmitter, user.uuid, user.id, 'file.create');
375369

376370
if (!hadFilesBeforeUpload) {
377371
const isUserFreeTier = tier?.label === PLAN_FREE_INDIVIDUAL_TIER_LABEL;
@@ -858,10 +852,7 @@ export class FileUseCases {
858852
),
859853
]);
860854

861-
this.eventEmitter.emit(
862-
'usage.file.trashed',
863-
new UsageInvalidatedEvent(user.uuid, user.id, 'file.trash'),
864-
);
855+
emitUsageInvalidated(this.eventEmitter, user.uuid, user.id, 'file.trash');
865856
}
866857

867858
async getEncryptionKeyFromFile(
@@ -988,10 +979,7 @@ export class FileUseCases {
988979
}
989980
}
990981

991-
this.eventEmitter.emit(
992-
'usage.file.replaced',
993-
new UsageInvalidatedEvent(user.uuid, user.id, 'file.replace'),
994-
);
982+
emitUsageInvalidated(this.eventEmitter, user.uuid, user.id, 'file.replace');
995983

996984
return {
997985
...file.toJSON(),
@@ -1037,9 +1025,11 @@ export class FileUseCases {
10371025
);
10381026

10391027
if (deleted > 0) {
1040-
this.eventEmitter.emit(
1041-
'usage.file.batch_deleted',
1042-
new UsageInvalidatedEvent(user.uuid, user.id, 'file.batch_delete'),
1028+
emitUsageInvalidated(
1029+
this.eventEmitter,
1030+
user.uuid,
1031+
user.id,
1032+
'file.batch_delete',
10431033
);
10441034
}
10451035

src/modules/folder/folder.usecase.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import { type FolderModel } from './folder.model';
3636
import { type MoveFolderDto } from './dto/move-folder.dto';
3737
import { FeatureLimitService } from '../feature-limit/feature-limit.service';
3838
import { EventEmitter2 } from '@nestjs/event-emitter';
39-
import { UsageInvalidatedEvent } from '../usage-queue/events/usage-invalidated.event';
39+
import { emitUsageInvalidated } from '../usage-queue/events/usage-invalidated.event';
4040

4141
const invalidName = /[\\/]|^\s*$/;
4242

@@ -529,10 +529,7 @@ export class FolderUseCases {
529529
),
530530
]);
531531

532-
this.eventEmitter.emit(
533-
'usage.folder.trashed',
534-
new UsageInvalidatedEvent(user.uuid, user.id, 'folder.trash'),
535-
);
532+
emitUsageInvalidated(this.eventEmitter, user.uuid, user.id, 'folder.trash');
536533
}
537534

538535
async getFoldersByParentId(
Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,18 @@
1-
export class UsageInvalidatedEvent {
2-
constructor(
3-
public readonly userUuid: string,
4-
public readonly userId: number,
5-
public readonly source: string,
6-
) {}
1+
import { type EventEmitter2 } from '@nestjs/event-emitter';
2+
3+
export interface UsageInvalidatedEvent {
4+
userUuid: string;
5+
userId: number;
6+
source: string;
7+
}
8+
9+
export const USAGE_INVALIDATED_EVENT = 'usage.invalidated';
10+
11+
export function emitUsageInvalidated(
12+
eventEmitter: EventEmitter2,
13+
userUuid: string,
14+
userId: number,
15+
source: string,
16+
): void {
17+
eventEmitter.emit(USAGE_INVALIDATED_EVENT, { userUuid, userId, source });
718
}

src/modules/usage-queue/handlers/usage-event.handler.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,14 @@ import { Injectable, Logger } from '@nestjs/common';
22
import { OnEvent } from '@nestjs/event-emitter';
33
import { InjectQueue } from '@nestjs/bullmq';
44
import { type Queue } from 'bullmq';
5-
import { type UsageInvalidatedEvent } from '../events/usage-invalidated.event';
5+
import {
6+
type UsageInvalidatedEvent,
7+
USAGE_INVALIDATED_EVENT,
8+
} from '../events/usage-invalidated.event';
69
import { USAGE_QUEUE_NAME } from '../usage-queue.constants';
710

11+
const DEDUPLICATION_WINDOW_MS = 3000;
12+
813
@Injectable()
914
export class UsageEventHandler {
1015
private readonly logger = new Logger(UsageEventHandler.name);
@@ -13,7 +18,7 @@ export class UsageEventHandler {
1318
@InjectQueue(USAGE_QUEUE_NAME) private readonly usageQueue: Queue,
1419
) {}
1520

16-
@OnEvent('usage.**')
21+
@OnEvent(USAGE_INVALIDATED_EVENT)
1722
async handleUsageInvalidated(event: UsageInvalidatedEvent) {
1823
try {
1924
await this.usageQueue.add(
@@ -24,8 +29,12 @@ export class UsageEventHandler {
2429
source: event.source,
2530
},
2631
{
27-
jobId: event.userUuid,
28-
delay: 3000,
32+
deduplication: {
33+
id: event.userUuid,
34+
ttl: DEDUPLICATION_WINDOW_MS,
35+
extend: true,
36+
},
37+
delay: DEDUPLICATION_WINDOW_MS,
2938
},
3039
);
3140
} catch (error) {

src/modules/user/user.usecase.spec.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ import { SharingInvite } from '../sharing/sharing.domain';
8787
import { aes } from '@internxt/lib';
8888
import { WorkspacesUsecases } from '../workspaces/workspaces.usecase';
8989
import { PaymentsService } from '../../externals/payments/payments.service';
90+
import { EventEmitter2 } from '@nestjs/event-emitter';
9091
import * as jwtLibrary from '../../lib/jwt';
9192
import { JsonWebTokenError } from 'jsonwebtoken';
9293
import { type LegacyRecoverAccountDto } from './dto/legacy-recover-account.dto';
@@ -141,6 +142,7 @@ describe('User use cases', () => {
141142
let referralsRepository: SequelizeReferralRepository;
142143
let userReferralsRepository: SequelizeUserReferralsRepository;
143144
let notificationService: NotificationService;
145+
let eventEmitter: EventEmitter2;
144146

145147
const user = User.build({
146148
id: 1,
@@ -246,6 +248,7 @@ describe('User use cases', () => {
246248
);
247249
notificationService =
248250
moduleRef.get<NotificationService>(NotificationService);
251+
eventEmitter = moduleRef.get<EventEmitter2>(EventEmitter2);
249252
});
250253

251254
describe('Resetting a user', () => {
@@ -2266,17 +2269,14 @@ describe('User use cases', () => {
22662269
expect(result).toEqual(cachedUsage);
22672270
});
22682271

2269-
it('When cache does not have user usage data, then it should get data from database and cache it', async () => {
2272+
it('When cache does not have user usage data, then it should compute from database and emit event for cache fill', async () => {
22702273
const driveUsage = 2048;
22712274
const backupUsage = 1024;
22722275
const totalUsage = driveUsage + backupUsage;
22732276
jest.spyOn(cacheManagerService, 'getUserUsage').mockResolvedValue(null);
22742277
jest
22752278
.spyOn(fileUseCases, 'getUserUsedStorage')
22762279
.mockResolvedValue(driveUsage);
2277-
jest
2278-
.spyOn(cacheManagerService, 'setUserUsage')
2279-
.mockResolvedValue(undefined);
22802280
jest
22812281
.spyOn(backupUseCases, 'sumExistentBackupSizes')
22822282
.mockResolvedValue(backupUsage);
@@ -2288,10 +2288,13 @@ describe('User use cases', () => {
22882288
expect(backupUseCases.sumExistentBackupSizes).toHaveBeenCalledWith(
22892289
user.id,
22902290
);
2291-
expect(cacheManagerService.setUserUsage).toHaveBeenCalledWith(
2292-
user.uuid,
2293-
driveUsage,
2294-
backupUsage,
2291+
expect(eventEmitter.emit).toHaveBeenCalledWith(
2292+
'usage.cache_miss',
2293+
expect.objectContaining({
2294+
userUuid: user.uuid,
2295+
userId: user.id,
2296+
source: 'cache_miss',
2297+
}),
22952298
);
22962299
expect(result).toEqual({
22972300
drive: driveUsage,

src/modules/user/user.usecase.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import {
1212
forwardRef,
1313
} from '@nestjs/common';
1414
import { ConfigService } from '@nestjs/config';
15+
import { EventEmitter2 } from '@nestjs/event-emitter';
16+
import { emitUsageInvalidated } from '../usage-queue/events/usage-invalidated.event';
1517
import { v4, validate } from 'uuid';
1618
import { generateMnemonic } from 'bip39';
1719
import * as speakeasy from 'speakeasy';
@@ -167,6 +169,7 @@ export class UserUseCases {
167169
private readonly backupUseCases: BackupUseCase,
168170
private readonly cacheManager: CacheManagerService,
169171
private readonly asymmetricEncryptionService: AsymmetricEncryptionService,
172+
private readonly eventEmitter: EventEmitter2,
170173
) {}
171174

172175
async getCachedAvatar(user: User): Promise<string | null> {
@@ -1890,7 +1893,9 @@ export class UserUseCases {
18901893
this.backupUseCases.sumExistentBackupSizes(user.id),
18911894
]);
18921895

1893-
await this.cacheManager.setUserUsage(user.uuid, driveUsage, backupUsage);
1896+
// Let the queue processor be the sole cache writer to avoid race
1897+
// conditions where this read-path write overwrites a fresher value.
1898+
emitUsageInvalidated(this.eventEmitter, user.uuid, user.id, 'cache_miss');
18941899

18951900
return {
18961901
drive: driveUsage,

0 commit comments

Comments
 (0)