@@ -16,32 +16,44 @@ const { instance: redisClient } = require('./client');
1616const { config } = require ( '../../../Config' ) ;
1717const { calculateInterval } = require ( './gcra' ) ;
1818
19- // Map of bucket name -> WorkerTokenBucket instance
19+ // Map of `${resourceClass}:${resourceId}:${measure}` -> WorkerTokenBucket instance
2020const tokenBuckets = new Map ( ) ;
2121
2222/**
23- * Per-bucket token bucket for a single worker
23+ * Per-resourceClass+resourceID+measure token bucket for a single worker
2424 */
2525class WorkerTokenBucket {
26- constructor ( bucketName , limitConfig , log ) {
27- this . bucketName = bucketName ;
26+ constructor ( resourceClass , resourceId , measure , limitConfig , log ) {
27+ this . resourceClass = resourceClass ;
28+ this . resourceId = resourceId ;
29+ this . measure = measure ;
2830 this . limitConfig = limitConfig ;
2931 this . log = log ;
3032
31- // Token buffer configuration
32- this . bufferSize = config . rateLimiting ?. tokenBucketBufferSize || 50 ; // Max tokens to hold
33- this . refillThreshold = config . rateLimiting ?. tokenBucketRefillThreshold || 20 ; // Trigger refill when below this
33+ this . bufferSize = config . rateLimiting ?. tokenBucketBufferSize ; // Max tokens to hold
34+ this . refillThreshold = config . rateLimiting ?. tokenBucketRefillThreshold ; // Trigger refill when below this
3435 this . tokens = this . bufferSize ; // Start with full buffer for fail-open at startup
35-
36- // Refill state
37- this . refillInProgress = false ;
36+ this . interval = calculateInterval ( this . limitConfig . limit , config . rateLimiting . nodes ) ;
3837 this . lastRefillTime = Date . now ( ) ;
3938 this . refillCount = 0 ;
40- this . requestCounter = 0 ;
41- this . lastRequestTime = 0 ;
39+ this . refillInProgress = false ;
40+ }
4241
43- // Track consumption rate for adaptive refill
44- this . requestTimestamps = new Set ( ) ;
42+ hasCapacity ( ) {
43+ return this . tokens > 0 ;
44+ }
45+
46+ updateLimit ( updatedConfig ) {
47+ if ( this . limitConfig . limit !== updatedConfig . limit ||
48+ this . limitConfig . burstCapacity !== updatedConfig . burstCapacity
49+ ) {
50+ const oldConfig = this . limitConfig ;
51+ this . limitConfig = updatedConfig ;
52+ this . interval = calculateInterval ( updatedConfig . limit , config . rateLimiting . nodes ) ;
53+ return { updated : true , oldConfig } ;
54+ }
55+
56+ return { updated : false } ;
4557 }
4658
4759 /**
@@ -50,11 +62,8 @@ class WorkerTokenBucket {
5062 * @returns {boolean } True if request allowed, false if throttled
5163 */
5264 tryConsume ( ) {
53- // Record request for rate calculation
54- this . recordRequest ( ) ;
55-
5665 if ( this . tokens > 0 ) {
57- this . tokens -- ;
66+ this . tokens -= 1 ;
5867 return true ; // ALLOWED
5968 }
6069
@@ -78,16 +87,6 @@ class WorkerTokenBucket {
7887 return ;
7988 }
8089
81- // Trigger async refill
82- await this . refill ( ) ;
83- }
84-
85- /**
86- * Request tokens from Redis using GCRA enforcement
87- *
88- * @returns {Promise<void> }
89- */
90- async refill ( ) {
9190 this . refillInProgress = true ;
9291 const startTime = Date . now ( ) ;
9392
@@ -100,29 +99,28 @@ class WorkerTokenBucket {
10099 }
101100
102101 // Calculate GCRA parameters
103- const nodes = config . rateLimiting . nodes || 1 ;
104- const workers = config . clusters || 1 ;
105- const interval = calculateInterval ( this . limitConfig . limit , nodes , workers ) ;
106- const burstCapacitySeconds =
107- config . rateLimiting . bucket ?. defaultConfig ?. requestsPerSecond ?. burstCapacity || 1 ;
108- const burstCapacity = burstCapacitySeconds * 1000 ;
109-
110102 let granted = requested ;
111103 if ( redisClient . isReady ( ) ) {
112104 // Request tokens from Redis (atomic GCRA enforcement)
113105 granted = await util . promisify ( redisClient . grantTokens . bind ( redisClient ) ) (
114- this . bucketName ,
106+ this . resourceClass ,
107+ this . resourceId ,
108+ this . measure ,
115109 requested ,
116- interval ,
117- burstCapacity ,
110+ this . interval ,
111+ this . limitConfig . burstCapacity ,
118112 ) ;
119113 } else {
120114 // Connection to redis has failed in some way.
121115 // Client will be reconnecting in the background.
122116 // We grant the requested amount of tokens anyway to avoid degrading service availability.
123117 this . log . warn (
124118 'rate limit redis client not connected. granting tokens anyway to avoid service degradation' ,
125- { bucketName : this . bucketName } ,
119+ {
120+ resourceClass : this . resourceClass ,
121+ resourceId : this . resourceId ,
122+ measure : this . measure ,
123+ } ,
126124 ) ;
127125 }
128126
@@ -134,7 +132,9 @@ class WorkerTokenBucket {
134132 const duration = this . lastRefillTime - startTime ;
135133
136134 this . log . debug ( 'Token refill completed' , {
137- bucketName : this . bucketName ,
135+ resourceClass : this . resourceClass ,
136+ resourceId : this . resourceId ,
137+ measure : this . measure ,
138138 requested,
139139 granted,
140140 newBalance : this . tokens ,
@@ -145,118 +145,67 @@ class WorkerTokenBucket {
145145 // Warn if refill took too long or granted too few
146146 if ( duration > 100 ) {
147147 this . log . warn ( 'Slow token refill detected' , {
148- bucketName : this . bucketName ,
148+ resourceClass : this . resourceClass ,
149+ resourceId : this . resourceId ,
150+ measure : this . measure ,
149151 durationMs : duration ,
150152 } ) ;
151153 }
152154
153155 if ( granted === 0 && requested > 0 ) {
154156 this . log . trace ( 'Token refill denied - quota exhausted' , {
155- bucketName : this . bucketName ,
157+ resourceClass : this . resourceClass ,
158+ resourceId : this . resourceId ,
159+ measure : this . measure ,
156160 requested,
157161 } ) ;
158162 }
159163
160164 } catch ( err ) {
161165 this . log . error ( 'Token refill failed' , {
162- bucketName : this . bucketName ,
166+ resourceClass : this . resourceClass ,
167+ resourceId : this . resourceId ,
168+ measure : this . measure ,
163169 error : err . message ,
164170 stack : err . stack ,
165171 } ) ;
166172 } finally {
167173 this . refillInProgress = false ;
168174 }
169175 }
170-
171- /**
172- * Record request timestamp for rate calculation
173- */
174- recordRequest ( ) {
175- const now = Date . now ( ) ;
176- if ( now == this . lastRequestTime ) {
177- this . requestCounter ++ ;
178- } else {
179- this . lastRequestTime = now ;
180- this . requestCounter = 0 ;
181- }
182-
183- this . requestTimestamps . add ( now * 1000 + this . requestCounter ) ;
184-
185- // Keep only last 1 second of timestamps
186- const cutoff = ( now - 1000 ) * 1000 ;
187- for ( const timestamp of this . requestTimestamps . values ( ) ) {
188- if ( timestamp < cutoff ) {
189- this . requestTimestamps . delete ( timestamp ) ;
190- }
191- }
192- }
193-
194- /**
195- * Get current request rate (requests per second)
196- *
197- * @returns {number }
198- */
199- getCurrentRate ( ) {
200- const now = Date . now ( ) ;
201- const cutoff = ( now - 1000 ) * 1000 ;
202-
203- let count = 0 ;
204- for ( const timestamp of this . requestTimestamps . values ( ) ) {
205- if ( timestamp >= cutoff ) {
206- count ++ ;
207- }
208- }
209- return count ;
210- }
211-
212- /**
213- * Get token bucket stats for monitoring
214- *
215- * @returns {object }
216- */
217- getStats ( ) {
218- return {
219- tokens : this . tokens ,
220- bufferSize : this . bufferSize ,
221- refillThreshold : this . refillThreshold ,
222- refillInProgress : this . refillInProgress ,
223- lastRefillTime : this . lastRefillTime ,
224- refillCount : this . refillCount ,
225- currentRate : this . getCurrentRate ( ) ,
226- } ;
227- }
228176}
229177
230178/**
231179 * Get or create token bucket for a bucket
232180 *
233- * @param {string } bucketName - Bucket name
181+ * @param {string } resourceClass - "bucket" or "account"
182+ * @param {string } resourceId - bucket name or account canonicalId
183+ * @param {string } measure - measure id e.g. "rps"
234184 * @param {object } limitConfig - Rate limit configuration
235185 * @param {object } log - Logger instance
236186 * @returns {WorkerTokenBucket }
237187 */
238- function getTokenBucket ( bucketName , limitConfig , log ) {
239- let bucket = tokenBuckets . get ( bucketName ) ;
240-
188+ function getTokenBucket ( resourceClass , resourceId , measure , limitConfig , log ) {
189+ const cacheKey = ` ${ resourceClass } : ${ resourceId } : ${ measure } ` ;
190+ let bucket = tokenBuckets . get ( cacheKey ) ;
241191 if ( ! bucket ) {
242- bucket = new WorkerTokenBucket ( bucketName , limitConfig , log ) ;
243- tokenBuckets . set ( bucketName , bucket ) ;
192+ bucket = new WorkerTokenBucket ( resourceClass , resourceId , measure , limitConfig , log ) ;
193+ tokenBuckets . set ( cacheKey , bucket ) ;
244194
245195 log . debug ( 'Created token bucket' , {
246- bucketName ,
196+ cacheKey ,
247197 bufferSize : bucket . bufferSize ,
248198 refillThreshold : bucket . refillThreshold ,
249199 } ) ;
250- } else if ( bucket . limitConfig . limit !== limitConfig . limit ) {
251- // Update limit config when it changes dynamically
252- const oldLimit = bucket . limitConfig . limit ;
253- bucket . limitConfig = limitConfig ;
254-
255- log . info ( 'Updated token bucket limit config' , {
256- bucketName,
257- oldLimit,
258- newLimit : limitConfig . limit ,
259- } ) ;
200+ } else {
201+ const { updated, oldConfig } = bucket . updateLimit ( limitConfig ) ;
202+ if ( updated ) {
203+ log . info ( 'Updated token bucket limit config' , {
204+ cacheKey,
205+ old : oldConfig ,
206+ new : limitConfig ,
207+ } ) ;
208+ }
260209 }
261210
262211 return bucket ;
@@ -282,15 +231,15 @@ function cleanupTokenBuckets(maxIdleMs = 60000) {
282231 const now = Date . now ( ) ;
283232 const toRemove = [ ] ;
284233
285- for ( const [ bucketName , bucket ] of tokenBuckets . entries ( ) ) {
234+ for ( const [ key , bucket ] of tokenBuckets . entries ( ) ) {
286235 const idleTime = now - bucket . lastRefillTime ;
287236 if ( idleTime > maxIdleMs && bucket . tokens === 0 ) {
288- toRemove . push ( bucketName ) ;
237+ toRemove . push ( key ) ;
289238 }
290239 }
291240
292- for ( const bucketName of toRemove ) {
293- tokenBuckets . delete ( bucketName ) ;
241+ for ( const key of toRemove ) {
242+ tokenBuckets . delete ( key ) ;
294243 }
295244
296245 return toRemove . length ;
@@ -299,11 +248,13 @@ function cleanupTokenBuckets(maxIdleMs = 60000) {
299248/**
300249 * Remove a specific token bucket (used when rate limit config is deleted)
301250 *
302- * @param {string } bucketName - Bucket name
251+ * @param {string } resourceClass - "bucket" or "account"
252+ * @param {string } resourceId - bucket name or account canonicalId
253+ * @param {string } measure - measure id e.g. "rps"
303254 * @returns {boolean } True if bucket was found and removed
304255 */
305- function removeTokenBucket ( bucketName ) {
306- return tokenBuckets . delete ( bucketName ) ;
256+ function removeTokenBucket ( resourceClass , resourceId , measure ) {
257+ return tokenBuckets . delete ( ` ${ resourceClass } : ${ resourceId } : ${ measure } ` ) ;
307258}
308259
309260module . exports = {
0 commit comments