@@ -175,13 +175,17 @@ impl RouteCache {
175175 #[ tracing:: instrument( skip_all) ]
176176 async fn insert ( & self , hostname : String , path : String , result : RouteConfig ) {
177177 self . cache . upsert_async ( ( hostname, path) , result) . await ;
178+
179+ metrics:: ROUTE_CACHE_SIZE . set ( self . cache . len ( ) as i64 ) ;
178180 }
179181
180182 #[ tracing:: instrument( skip_all) ]
181183 async fn purge ( & self , hostname : & str , path : & str ) {
182184 self . cache
183185 . remove_async ( & ( hostname. to_owned ( ) , path. to_owned ( ) ) )
184186 . await ;
187+
188+ metrics:: ROUTE_CACHE_SIZE . set ( self . cache . len ( ) as i64 ) ;
185189 }
186190}
187191
@@ -253,8 +257,8 @@ pub struct ProxyState {
253257 routing_fn : RoutingFn ,
254258 middleware_fn : MiddlewareFn ,
255259 route_cache : RouteCache ,
256- rate_limiters : SccHashMap < Uuid , RateLimiter > ,
257- in_flight_counters : SccHashMap < Uuid , InFlightCounter > ,
260+ rate_limiters : SccHashMap < std :: net :: IpAddr , RateLimiter > ,
261+ in_flight_counters : SccHashMap < std :: net :: IpAddr , InFlightCounter > ,
258262 port_type : PortType ,
259263}
260264
@@ -447,79 +451,119 @@ impl ProxyState {
447451 }
448452
449453 #[ tracing:: instrument( skip_all) ]
450- async fn check_rate_limit ( & self , actor_id : & Option < Uuid > ) -> GlobalResult < bool > {
451- match actor_id {
452- Some ( id) => {
453- let middleware_config = self . get_middleware_config ( id) . await ?;
454-
455- let entry = self
456- . rate_limiters
457- . entry_async ( * id)
458- . instrument ( tracing:: info_span!( "entry_async" ) )
459- . await ;
460- if let scc:: hash_map:: Entry :: Occupied ( mut entry) = entry {
461- // Key exists, get and mutate existing RateLimiter
462- let write_guard = entry. get_mut ( ) ;
463- Ok ( write_guard. try_acquire ( ) )
464- } else {
465- // Key doesn't exist, insert a new RateLimiter
466- let mut limiter = RateLimiter :: new (
467- middleware_config. rate_limit . requests ,
468- middleware_config. rate_limit . period ,
469- ) ;
470- let result = limiter. try_acquire ( ) ;
471- entry. insert_entry ( limiter) ;
472- Ok ( result)
473- }
474- }
454+ async fn check_rate_limit (
455+ & self ,
456+ ip_addr : std:: net:: IpAddr ,
457+ actor_id : & Option < Uuid > ,
458+ ) -> GlobalResult < bool > {
459+ // Use default middleware config, or actor-specific config if available
460+ let middleware_config = match actor_id {
461+ Some ( id) => self . get_middleware_config ( id) . await ?,
475462 None => {
476- // No actor ID means no rate limiting
477- Ok ( true )
463+ // Default middleware config
464+ MiddlewareConfig {
465+ rate_limit : RateLimitConfig {
466+ requests : 100 , // 100 requests
467+ period : 60 , // per 60 seconds
468+ } ,
469+ max_in_flight : MaxInFlightConfig {
470+ amount : 20 , // 20 concurrent requests
471+ } ,
472+ retry : RetryConfig {
473+ max_attempts : 3 , // 3 retry attempts
474+ initial_interval : 100 , // 100ms initial interval
475+ } ,
476+ timeout : TimeoutConfig {
477+ request_timeout : 30 , // 30 seconds for requests
478+ } ,
479+ }
478480 }
481+ } ;
482+
483+ let entry = self
484+ . rate_limiters
485+ . entry_async ( ip_addr)
486+ . instrument ( tracing:: info_span!( "entry_async" ) )
487+ . await ;
488+ if let scc:: hash_map:: Entry :: Occupied ( mut entry) = entry {
489+ // Key exists, get and mutate existing RateLimiter
490+ let write_guard = entry. get_mut ( ) ;
491+ Ok ( write_guard. try_acquire ( ) )
492+ } else {
493+ // Key doesn't exist, insert a new RateLimiter
494+ let mut limiter = RateLimiter :: new (
495+ middleware_config. rate_limit . requests ,
496+ middleware_config. rate_limit . period ,
497+ ) ;
498+ let result = limiter. try_acquire ( ) ;
499+ entry. insert_entry ( limiter) ;
500+
501+ metrics:: RATE_LIMITERS_COUNT . set ( self . rate_limiters . len ( ) as i64 ) ;
502+
503+ Ok ( result)
479504 }
480505 }
481506
482507 #[ tracing:: instrument( skip_all) ]
483- async fn acquire_in_flight ( & self , actor_id : & Option < Uuid > ) -> GlobalResult < bool > {
484- match actor_id {
485- Some ( id) => {
486- let middleware_config = self . get_middleware_config ( id) . await ?;
487-
488- let entry = self
489- . in_flight_counters
490- . entry_async ( * id)
491- . instrument ( tracing:: info_span!( "entry_async" ) )
492- . await ;
493- if let scc:: hash_map:: Entry :: Occupied ( mut entry) = entry {
494- // Key exists, get and mutate existing InFlightCounter
495- let write_guard = entry. get_mut ( ) ;
496- Ok ( write_guard. try_acquire ( ) )
497- } else {
498- // Key doesn't exist, insert a new InFlightCounter
499- let mut counter = InFlightCounter :: new ( middleware_config. max_in_flight . amount ) ;
500- let result = counter. try_acquire ( ) ;
501- entry. insert_entry ( counter) ;
502- Ok ( result)
503- }
504- }
508+ async fn acquire_in_flight (
509+ & self ,
510+ ip_addr : std:: net:: IpAddr ,
511+ actor_id : & Option < Uuid > ,
512+ ) -> GlobalResult < bool > {
513+ // Use default middleware config, or actor-specific config if available
514+ let middleware_config = match actor_id {
515+ Some ( id) => self . get_middleware_config ( id) . await ?,
505516 None => {
506- // No actor ID means no in-flight limiting
507- Ok ( true )
517+ // Default middleware config
518+ MiddlewareConfig {
519+ rate_limit : RateLimitConfig {
520+ requests : 100 , // 100 requests
521+ period : 60 , // per 60 seconds
522+ } ,
523+ max_in_flight : MaxInFlightConfig {
524+ amount : 20 , // 20 concurrent requests
525+ } ,
526+ retry : RetryConfig {
527+ max_attempts : 3 , // 3 retry attempts
528+ initial_interval : 100 , // 100ms initial interval
529+ } ,
530+ timeout : TimeoutConfig {
531+ request_timeout : 30 , // 30 seconds for requests
532+ } ,
533+ }
508534 }
535+ } ;
536+
537+ let entry = self
538+ . in_flight_counters
539+ . entry_async ( ip_addr)
540+ . instrument ( tracing:: info_span!( "entry_async" ) )
541+ . await ;
542+ if let scc:: hash_map:: Entry :: Occupied ( mut entry) = entry {
543+ // Key exists, get and mutate existing InFlightCounter
544+ let write_guard = entry. get_mut ( ) ;
545+ Ok ( write_guard. try_acquire ( ) )
546+ } else {
547+ // Key doesn't exist, insert a new InFlightCounter
548+ let mut counter = InFlightCounter :: new ( middleware_config. max_in_flight . amount ) ;
549+ let result = counter. try_acquire ( ) ;
550+ entry. insert_entry ( counter) ;
551+
552+ metrics:: IN_FLIGHT_COUNTERS_COUNT . set ( self . in_flight_counters . len ( ) as i64 ) ;
553+
554+ Ok ( result)
509555 }
510556 }
511557
512558 #[ tracing:: instrument( skip_all) ]
513- async fn release_in_flight ( & self , actor_id : & Option < Uuid > ) {
514- if let Some ( id) = actor_id {
515- if let Some ( mut counter) = self
516- . in_flight_counters
517- . get_async ( id)
518- . instrument ( tracing:: info_span!( "get_async" ) )
519- . await
520- {
521- counter. release ( ) ;
522- }
559+ async fn release_in_flight ( & self , ip_addr : std:: net:: IpAddr ) {
560+ if let Some ( mut counter) = self
561+ . in_flight_counters
562+ . get_async ( & ip_addr)
563+ . instrument ( tracing:: info_span!( "get_async" ) )
564+ . await
565+ {
566+ counter. release ( ) ;
523567 }
524568 }
525569}
@@ -607,8 +651,11 @@ impl ProxyService {
607651 let actor_id_str = actor_id. map_or_else ( || "none" . to_string ( ) , |id| id. to_string ( ) ) ;
608652 let server_id_str = server_id. map_or_else ( || "none" . to_string ( ) , |id| id. to_string ( ) ) ;
609653
654+ // Extract IP address from remote_addr
655+ let client_ip = self . remote_addr . ip ( ) ;
656+
610657 // Apply rate limiting
611- if !self . state . check_rate_limit ( & actor_id) . await ? {
658+ if !self . state . check_rate_limit ( client_ip , & actor_id) . await ? {
612659 metrics:: ACTOR_REQUEST_ERRORS
613660 . with_label_values ( & [ & actor_id_str, & server_id_str, "429" ] )
614661 . inc ( ) ;
@@ -619,7 +666,7 @@ impl ProxyService {
619666 }
620667
621668 // Check in-flight limit
622- if !self . state . acquire_in_flight ( & actor_id) . await ? {
669+ if !self . state . acquire_in_flight ( client_ip , & actor_id) . await ? {
623670 metrics:: ACTOR_REQUEST_ERRORS
624671 . with_label_values ( & [ & actor_id_str, & server_id_str, "429" ] )
625672 . inc ( ) ;
@@ -646,10 +693,9 @@ impl ProxyService {
646693
647694 // Prepare to release in-flight counter when done
648695 let state_clone = self . state . clone ( ) ;
649- let actor_id_clone = actor_id;
650696 crate :: defer! {
651697 tokio:: spawn( async move {
652- state_clone. release_in_flight( & actor_id_clone ) . await ;
698+ state_clone. release_in_flight( client_ip ) . await ;
653699 } . instrument( tracing:: info_span!( "release_in_flight_task" ) ) ) ;
654700 }
655701
@@ -1625,7 +1671,6 @@ impl ProxyService {
16251671 . observe ( duration. as_secs_f64 ( ) ) ;
16261672
16271673 // Decrement pending metric at the end
1628- info ! ( "Decrementing pending metric" ) ;
16291674 metrics:: ACTOR_REQUEST_PENDING
16301675 . with_label_values ( & [ & actor_id_str_clone, & server_id_str_clone, & method, & path] )
16311676 . dec ( ) ;
0 commit comments