@@ -20,7 +20,7 @@ use tracing::{debug, info, warn};
2020use crate :: auth:: { AuthAction , BackendAuth , FrontendAuthenticator } ;
2121use crate :: backend:: client:: { ClientId , FrontConnectionGuard } ;
2222use crate :: backend:: pool:: { BackendNode , ConnectionPool , Connector , SessionCommand } ;
23- use crate :: config:: ClusterConfig ;
23+ use crate :: config:: { ClusterConfig , ClusterRuntime , ConfigManager } ;
2424use crate :: info:: { InfoContext , ProxyMode } ;
2525use crate :: metrics;
2626use crate :: protocol:: redis:: {
@@ -50,27 +50,29 @@ pub struct ClusterProxy {
5050 slots : Arc < watch:: Sender < SlotMap > > ,
5151 pool : Arc < ConnectionPool < RedisCommand > > ,
5252 fetch_trigger : mpsc:: UnboundedSender < ( ) > ,
53- backend_timeout : Duration ,
53+ runtime : Arc < ClusterRuntime > ,
54+ config_manager : Arc < ConfigManager > ,
5455 listen_port : u16 ,
5556 seed_nodes : usize ,
5657}
5758
5859impl ClusterProxy {
59- pub async fn new ( config : & ClusterConfig ) -> Result < Self > {
60+ pub async fn new (
61+ config : & ClusterConfig ,
62+ runtime : Arc < ClusterRuntime > ,
63+ config_manager : Arc < ConfigManager > ,
64+ ) -> Result < Self > {
6065 let cluster: Arc < str > = config. name . clone ( ) . into ( ) ;
6166 let hash_tag = config. hash_tag . as_ref ( ) . map ( |tag| tag. as_bytes ( ) . to_vec ( ) ) ;
6267 let read_from_slave = config. read_from_slave . unwrap_or ( false ) ;
6368
6469 let ( slot_tx, _slot_rx) = watch:: channel ( SlotMap :: new ( ) ) ;
6570 let ( trigger_tx, trigger_rx) = mpsc:: unbounded_channel ( ) ;
6671
67- let timeout_ms = config
68- . read_timeout
69- . or ( config. write_timeout )
70- . unwrap_or ( REQUEST_TIMEOUT_MS ) ;
7172 let backend_auth = config. backend_auth_config ( ) . map ( BackendAuth :: from) ;
7273 let connector = Arc :: new ( ClusterConnector :: new (
73- Duration :: from_millis ( timeout_ms) ,
74+ runtime. clone ( ) ,
75+ REQUEST_TIMEOUT_MS ,
7476 backend_auth. clone ( ) ,
7577 ) ) ;
7678 let pool = Arc :: new ( ConnectionPool :: new ( cluster. clone ( ) , connector. clone ( ) ) ) ;
@@ -90,7 +92,8 @@ impl ClusterProxy {
9092 slots : Arc :: new ( slot_tx) ,
9193 pool : pool. clone ( ) ,
9294 fetch_trigger : trigger_tx. clone ( ) ,
93- backend_timeout : Duration :: from_millis ( timeout_ms) ,
95+ runtime,
96+ config_manager,
9497 listen_port,
9598 seed_nodes : config. servers . len ( ) ,
9699 } ;
@@ -268,6 +271,19 @@ impl ClusterProxy {
268271 }
269272 }
270273 }
274+ if let Some ( response) = self . try_handle_config( & cmd) . await {
275+ let kind_label = cmd. kind_label( ) ;
276+ let success = !response. is_error( ) ;
277+ metrics:: front_command(
278+ self . cluster. as_ref( ) ,
279+ kind_label,
280+ success,
281+ ) ;
282+ let fut = async move { response } ;
283+ pending. push_back( Box :: pin( fut) ) ;
284+ inflight += 1 ;
285+ continue ;
286+ }
271287 if let Some ( response) = self . try_handle_info( & cmd) {
272288 metrics:: front_command(
273289 self . cluster. as_ref( ) ,
@@ -319,6 +335,10 @@ impl ClusterProxy {
319335 Ok ( ( ) )
320336 }
321337
338+ async fn try_handle_config ( & self , command : & RedisCommand ) -> Option < RespValue > {
339+ self . config_manager . handle_command ( command) . await
340+ }
341+
322342 fn try_handle_info ( & self , command : & RedisCommand ) -> Option < RespValue > {
323343 if !command. command_name ( ) . eq_ignore_ascii_case ( b"INFO" ) {
324344 return None ;
@@ -600,15 +620,16 @@ impl ClusterProxy {
600620 node : & BackendNode ,
601621 ) -> Result < Framed < TcpStream , RespCodec > > {
602622 let addr = node. as_str ( ) . to_string ( ) ;
603- let stream = timeout ( self . backend_timeout , TcpStream :: connect ( & addr) )
623+ let timeout_duration = self . runtime . request_timeout ( REQUEST_TIMEOUT_MS ) ;
624+ let stream = timeout ( timeout_duration, TcpStream :: connect ( & addr) )
604625 . await
605626 . with_context ( || format ! ( "connect to {} timed out" , addr) ) ??;
606627 stream
607628 . set_nodelay ( true )
608629 . with_context ( || format ! ( "failed to set TCP_NODELAY on {}" , addr) ) ?;
609630 let mut framed = Framed :: new ( stream, RespCodec :: default ( ) ) ;
610631 if let Some ( auth) = & self . backend_auth {
611- auth. apply_to_stream ( & mut framed, self . backend_timeout , & addr)
632+ auth. apply_to_stream ( & mut framed, timeout_duration , & addr)
612633 . await ?;
613634 }
614635 Ok ( framed)
@@ -819,31 +840,33 @@ fn resp_value_to_bytes(value: &RespValue) -> Option<Bytes> {
819840
820841#[ derive( Clone ) ]
821842struct ClusterConnector {
822- timeout : Duration ,
843+ runtime : Arc < ClusterRuntime > ,
844+ default_timeout_ms : u64 ,
823845 backend_auth : Option < BackendAuth > ,
824846 heartbeat_interval : Duration ,
825- slow_response_threshold : Duration ,
826847 reconnect_base_delay : Duration ,
827848 max_reconnect_attempts : usize ,
828849}
829850
830851impl ClusterConnector {
831- fn new ( timeout : Duration , backend_auth : Option < BackendAuth > ) -> Self {
832- let slow_response_threshold = timeout
833- . checked_mul ( 3 )
834- . unwrap_or_else ( || Duration :: from_secs ( 5 ) ) ;
852+ fn new (
853+ runtime : Arc < ClusterRuntime > ,
854+ default_timeout_ms : u64 ,
855+ backend_auth : Option < BackendAuth > ,
856+ ) -> Self {
835857 Self {
836- timeout,
858+ runtime,
859+ default_timeout_ms,
837860 backend_auth,
838861 heartbeat_interval : Duration :: from_secs ( 30 ) ,
839- slow_response_threshold,
840862 reconnect_base_delay : Duration :: from_millis ( 50 ) ,
841863 max_reconnect_attempts : 3 ,
842864 }
843865 }
844866
845867 async fn open_stream ( & self , address : & str ) -> Result < Framed < TcpStream , RespCodec > > {
846- let stream = timeout ( self . timeout , TcpStream :: connect ( address) )
868+ let timeout_duration = self . current_timeout ( ) ;
869+ let stream = timeout ( timeout_duration, TcpStream :: connect ( address) )
847870 . await
848871 . with_context ( || format ! ( "connection to {} timed out" , address) ) ??;
849872 stream
@@ -864,7 +887,7 @@ impl ClusterConnector {
864887 }
865888 let mut framed = Framed :: new ( stream, RespCodec :: default ( ) ) ;
866889 if let Some ( auth) = & self . backend_auth {
867- auth. apply_to_stream ( & mut framed, self . timeout , address)
890+ auth. apply_to_stream ( & mut framed, timeout_duration , address)
868891 . await ?;
869892 }
870893 Ok ( framed)
@@ -881,7 +904,8 @@ impl ClusterConnector {
881904 info ! ( blocking = ?blocking, "cluster connector executing blocking candidate {name}" ) ;
882905 }
883906 }
884- timeout ( self . timeout , framed. send ( command. to_resp ( ) ) )
907+ let timeout_duration = self . current_timeout ( ) ;
908+ timeout ( timeout_duration, framed. send ( command. to_resp ( ) ) )
885909 . await
886910 . context ( "timed out sending command" ) ??;
887911
@@ -891,7 +915,7 @@ impl ClusterConnector {
891915 Some ( Err ( err) ) => Err ( err. into ( ) ) ,
892916 None => Err ( anyhow ! ( "backend closed connection" ) ) ,
893917 } ,
894- BlockingKind :: None => match timeout ( self . timeout , framed. next ( ) ) . await {
918+ BlockingKind :: None => match timeout ( timeout_duration , framed. next ( ) ) . await {
895919 Ok ( Some ( Ok ( value) ) ) => Ok ( value) ,
896920 Ok ( Some ( Err ( err) ) ) => Err ( err. into ( ) ) ,
897921 Ok ( None ) => Err ( anyhow ! ( "backend closed connection" ) ) ,
@@ -945,11 +969,12 @@ impl ClusterConnector {
945969 use RespValue :: { Array , BulkString , SimpleString } ;
946970
947971 let ping = Array ( vec ! [ BulkString ( Bytes :: from_static( b"PING" ) ) ] ) ;
948- timeout ( self . timeout , framed. send ( ping) )
972+ let timeout_duration = self . current_timeout ( ) ;
973+ timeout ( timeout_duration, framed. send ( ping) )
949974 . await
950975 . context ( "timed out sending heartbeat" ) ??;
951976
952- match timeout ( self . timeout , framed. next ( ) ) . await {
977+ match timeout ( timeout_duration , framed. next ( ) ) . await {
953978 Ok ( Some ( Ok ( resp) ) ) => match resp {
954979 SimpleString ( ref data) | BulkString ( ref data)
955980 if data. eq_ignore_ascii_case ( b"PONG" ) =>
@@ -963,6 +988,16 @@ impl ClusterConnector {
963988 Err ( _) => Err ( anyhow ! ( "timed out waiting for heartbeat reply" ) ) ,
964989 }
965990 }
991+
992+ fn current_timeout ( & self ) -> Duration {
993+ self . runtime . request_timeout ( self . default_timeout_ms )
994+ }
995+
996+ fn slow_response_threshold ( & self ) -> Duration {
997+ self . current_timeout ( )
998+ . checked_mul ( 3 )
999+ . unwrap_or_else ( || Duration :: from_secs ( 5 ) )
1000+ }
9661001}
9671002
9681003#[ async_trait]
@@ -1005,10 +1040,11 @@ impl Connector<RedisCommand> for ClusterConnector {
10051040 }
10061041
10071042 if let Some ( ref mut framed) = connection {
1043+ let slow_threshold = self . slow_response_threshold( ) ;
10081044 let started = Instant :: now( ) ;
10091045 let result = self . execute( framed, cmd. request) . await ;
10101046 let elapsed = started. elapsed( ) ;
1011- let is_slow = elapsed > self . slow_response_threshold ;
1047+ let is_slow = elapsed > slow_threshold ;
10121048
10131049 let mut should_drop = false ;
10141050 match result {
0 commit comments