@@ -26,6 +26,7 @@ import { rightTrim } from '../../../lib/utils/string';
2626import { hasValue } from '../../../lib/utils/hasValue' ;
2727/* eslint-disable-next-line no-unused-vars */
2828import { memoize } from '../../../lib/memoize' ;
29+ import TimeMs from '../../../lib/utils/time' ;
2930
3031/**
3132 * eslint does not count decorators as a variable usage
@@ -270,6 +271,186 @@ export default class GrouperWorker extends Worker {
270271 } ) ;
271272 }
272273 }
274+
275+ await this . incrementRateLimitCounter ( task . projectId ) ;
276+ await this . recordProjectMetrics ( task . projectId , 'events-stored' ) ;
277+ }
278+
279+ /**
280+ * Build RedisTimeSeries key for project metrics.
281+ *
282+ * @param projectId - id of the project
283+ * @param metricType - metric type identifier
284+ * @param granularity - time granularity
285+ */
286+ private getTimeSeriesKey (
287+ projectId : string ,
288+ metricType : string ,
289+ granularity : 'minutely' | 'hourly' | 'daily'
290+ ) : string {
291+ return `ts:project-${ metricType } :${ projectId } :${ granularity } ` ;
292+ }
293+
294+ /**
295+ * Record project metrics to Redis TimeSeries.
296+ *
297+ * @param projectId - id of the project
298+ * @param metricType - metric type identifier
299+ */
300+ private async recordProjectMetrics ( projectId : string , metricType : string ) : Promise < void > {
301+ const minutelyKey = this . getTimeSeriesKey ( projectId , metricType , 'minutely' ) ;
302+ const hourlyKey = this . getTimeSeriesKey ( projectId , metricType , 'hourly' ) ;
303+ const dailyKey = this . getTimeSeriesKey ( projectId , metricType , 'daily' ) ;
304+
305+ const labels : Record < string , string > = {
306+ type : 'error' ,
307+ status : metricType ,
308+ project : projectId ,
309+ } ;
310+
311+ const series = [
312+ { key : minutelyKey , label : 'minutely' , retentionMs : TimeMs . DAY } ,
313+ { key : hourlyKey , label : 'hourly' , retentionMs : TimeMs . WEEK } ,
314+ { key : dailyKey , label : 'daily' , retentionMs : 90 * TimeMs . DAY } ,
315+ ] ;
316+
317+ for ( const { key, label, retentionMs } of series ) {
318+ try {
319+ await this . redis . safeTsAdd ( key , 1 , labels , retentionMs ) ;
320+ } catch ( error ) {
321+ this . logger . error ( `Failed to add ${ label } TS for ${ metricType } ` , error ) ;
322+ }
323+ }
324+ }
325+
326+ /**
327+ * Increment rate limit counters for the project.
328+ *
329+ * @param projectId - id of the project
330+ */
331+ private async incrementRateLimitCounter ( projectId : string ) : Promise < void > {
332+ try {
333+ const settings = await this . getProjectRateLimitSettings ( projectId ) ;
334+
335+ if ( ! settings ) {
336+ return ;
337+ }
338+
339+ await this . redis . incrementRateLimitCounterForCurrentEvent (
340+ projectId ,
341+ settings . eventsPeriod ,
342+ settings . eventsLimit
343+ ) ;
344+ } catch ( error ) {
345+ this . logger . error ( `Failed to increment rate limit counter for project ${ projectId } ` , error ) ;
346+ }
347+ }
348+
349+ /**
350+ * Fetch and normalize rate limit settings
351+ * Rate limit settings could appear in tarifPlan, workspace and project.
352+ * All rateLimits have different priority.
353+ *
354+ * @param projectId - id of the project
355+ */
356+ @memoize ( { max : 200 , ttl : MEMOIZATION_TTL , strategy : 'concat' , skipCache : [ null ] } )
357+ private async getProjectRateLimitSettings ( projectId : string ) : Promise < { eventsLimit : number ; eventsPeriod : number } | null > {
358+ if ( ! projectId || ! mongodb . ObjectID . isValid ( projectId ) ) {
359+ return null ;
360+ }
361+
362+ const accountsDb = this . accountsDb . getConnection ( ) ;
363+
364+ /**
365+ * Fetch project from the db
366+ */
367+ const project = await accountsDb
368+ . collection ( 'projects' )
369+ . findOne (
370+ { _id : new mongodb . ObjectId ( projectId ) } ,
371+ { projection : { rateLimitSettings : 1 , workspaceId : 1 } }
372+ ) ;
373+
374+ if ( ! project ) {
375+ return null ;
376+ }
377+
378+ const projectRateLimitSettings = project . rateLimitSettings as { N : number , T : number } ;
379+ const workspaceId = new mongodb . ObjectID ( project . workspaceId ) ;
380+
381+ let planRateLimitSettings : { N : number , T : number } ;
382+ let workspaceRateLimitSettings : { N : number , T : number } ;
383+
384+ /**
385+ * Fetch workspace from the db
386+ */
387+ if ( workspaceId ) {
388+ const workspace = await accountsDb
389+ . collection ( 'workspaces' )
390+ . findOne (
391+ { _id : workspaceId } ,
392+ { projection : { rateLimitSettings : 1 , tariffPlanId : 1 } }
393+ ) ;
394+
395+ workspaceRateLimitSettings = workspace ?. rateLimitSettings as { N : number , T : number } ;
396+
397+ const planId = new mongodb . ObjectId ( workspace ?. tariffPlanId ) ;
398+
399+ /**
400+ * Tarif plan from the db
401+ */
402+ if ( planId ) {
403+ const plan = await accountsDb
404+ . collection ( 'plans' )
405+ . findOne (
406+ { _id : planId } ,
407+ { projection : { rateLimitSettings : 1 } }
408+ ) ;
409+
410+ planRateLimitSettings = plan ?. rateLimitSettings ;
411+ }
412+ }
413+
414+ return this . normalizeRateLimitSettings (
415+ planRateLimitSettings ,
416+ workspaceRateLimitSettings ,
417+ projectRateLimitSettings
418+ ) ;
419+ }
420+
421+ /**
422+ * Normalize rate limit settings shape from database.
423+ *
424+ * @param rateLimitLayers - raw settings documents in priority order
425+ */
426+ private normalizeRateLimitSettings (
427+ ...rateLimitLayers : { N : number , T : number } [ ]
428+ ) : { eventsLimit : number ; eventsPeriod : number } | null {
429+ let eventsLimit = 0 ;
430+ let eventsPeriod = 0 ;
431+
432+ for ( const layer of rateLimitLayers ) {
433+ if ( ! layer ) {
434+ continue ;
435+ }
436+
437+ const limit = layer . N as number ;
438+ const period = layer . T as number ;
439+
440+ if ( limit !== undefined && limit > 0 ) {
441+ eventsLimit = limit ;
442+ }
443+
444+ if ( period !== undefined && period > 0 ) {
445+ eventsPeriod = period ;
446+ }
447+ }
448+
449+ if ( eventsLimit <= 0 || eventsPeriod <= 0 ) {
450+ return null ;
451+ }
452+
453+ return { eventsLimit, eventsPeriod } ;
273454 }
274455
275456 /**
0 commit comments