Skip to content

Commit 655ad24

Browse files
committed
feat: fixed bugs
1 parent 533406f commit 655ad24

2 files changed

Lines changed: 57 additions & 12 deletions

File tree

src/cluster/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::protocol::redis::{
3434
use crate::slowlog::Slowlog;
3535
use crate::utils::{crc16, trim_hash_tag};
3636

37-
const FETCH_INTERVAL: Duration = Duration::from_secs(10);
37+
const DEFAULT_FETCH_INTERVAL: Duration = Duration::from_secs(10);
3838
const REQUEST_TIMEOUT_MS: u64 = 1_000;
3939
const MAX_REDIRECTS: u8 = 5;
4040
const PIPELINE_LIMIT: usize = 32;
@@ -136,13 +136,18 @@ impl ClusterProxy {
136136

137137
// trigger an immediate topology fetch
138138
trigger_tx.send(()).ok();
139+
let fetch_interval = config
140+
.fetch_interval
141+
.and_then(|value| (value > 0).then(|| Duration::from_millis(value)))
142+
.unwrap_or(DEFAULT_FETCH_INTERVAL);
139143
tokio::spawn(fetch_topology(
140144
cluster,
141145
config.servers.clone(),
142146
connector,
143147
proxy.slots.clone(),
144148
trigger_rx,
145149
Some(cache_trackers),
150+
fetch_interval,
146151
));
147152

148153
Ok(proxy)
@@ -1394,8 +1399,9 @@ async fn fetch_topology(
13941399
slots: Arc<watch::Sender<SlotMap>>,
13951400
mut trigger: mpsc::UnboundedReceiver<()>,
13961401
tracker: Option<Arc<CacheTrackerSet>>,
1402+
fetch_interval: Duration,
13971403
) {
1398-
let mut ticker = tokio::time::interval(FETCH_INTERVAL);
1404+
let mut ticker = tokio::time::interval(fetch_interval);
13991405
loop {
14001406
tokio::select! {
14011407
_ = ticker.tick() => {},

src/standalone/mod.rs

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use md5::Digest;
1313
use socket2::{SockRef, TcpKeepalive};
1414
use tokio::net::TcpStream;
1515
use tokio::sync::mpsc;
16-
use tokio::time::{interval, sleep, timeout, MissedTickBehavior};
16+
use tokio::time::{sleep, timeout};
1717
use tokio_util::codec::{Framed, FramedParts};
1818
use tracing::{debug, info, warn};
1919

@@ -36,6 +36,12 @@ use crate::utils::trim_hash_tag;
3636
const DEFAULT_TIMEOUT_MS: u64 = 1_000;
3737
const VIRTUAL_NODE_FACTOR: usize = 40;
3838
const FRONT_TCP_KEEPALIVE: Duration = Duration::from_secs(60);
39+
const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20);
40+
const DEFAULT_HEARTBEAT_FAIL_LIMIT: u8 = 1;
41+
42+
fn duration_from_millis(value: Option<u64>) -> Option<Duration> {
43+
value.and_then(|millis| (millis > 0).then(|| Duration::from_millis(millis)))
44+
}
3945

4046
#[derive(Clone)]
4147
struct NodeEntry {
@@ -93,11 +99,21 @@ impl StandaloneProxy {
9399
if let Some(backend) = backend_override {
94100
backend
95101
} else {
102+
let heartbeat_interval_ok = duration_from_millis(config.ping_succ_interval)
103+
.or_else(|| duration_from_millis(config.ping_interval))
104+
.unwrap_or(DEFAULT_HEARTBEAT_INTERVAL);
105+
let heartbeat_interval_fail =
106+
duration_from_millis(config.ping_interval).unwrap_or(heartbeat_interval_ok);
107+
let heartbeat_fail_limit =
108+
config.ping_fail_limit.unwrap_or(DEFAULT_HEARTBEAT_FAIL_LIMIT);
96109
let connector = Arc::new(RedisConnector::new(
97110
runtime.clone(),
98111
DEFAULT_TIMEOUT_MS,
99112
backend_auth.clone(),
100113
config.backend_resp_version,
114+
heartbeat_interval_ok,
115+
heartbeat_interval_fail,
116+
heartbeat_fail_limit,
101117
));
102118
let pool = Arc::new(ConnectionPool::new(cluster.clone(), connector));
103119
Arc::new(PoolBackendExecutor::new(pool))
@@ -966,7 +982,9 @@ struct RedisConnector {
966982
default_timeout_ms: u64,
967983
reconnect_delay: Duration,
968984
max_reconnect_delay: Duration,
969-
heartbeat_interval: Duration,
985+
heartbeat_interval_ok: Duration,
986+
heartbeat_interval_fail: Duration,
987+
heartbeat_fail_limit: u8,
970988
backend_auth: Option<BackendAuth>,
971989
backend_resp_version: RespVersion,
972990
}
@@ -977,13 +995,18 @@ impl RedisConnector {
977995
default_timeout_ms: u64,
978996
backend_auth: Option<BackendAuth>,
979997
backend_resp_version: RespVersion,
998+
heartbeat_interval_ok: Duration,
999+
heartbeat_interval_fail: Duration,
1000+
heartbeat_fail_limit: u8,
9801001
) -> Self {
9811002
Self {
9821003
runtime,
9831004
default_timeout_ms,
9841005
reconnect_delay: Duration::from_millis(100),
9851006
max_reconnect_delay: Duration::from_secs(2),
986-
heartbeat_interval: Duration::from_secs(20),
1007+
heartbeat_interval_ok,
1008+
heartbeat_interval_fail,
1009+
heartbeat_fail_limit,
9871010
backend_auth,
9881011
backend_resp_version,
9891012
}
@@ -1001,8 +1024,8 @@ impl RedisConnector {
10011024
#[cfg(any(unix, windows))]
10021025
{
10031026
let keepalive = TcpKeepalive::new()
1004-
.with_time(self.heartbeat_interval)
1005-
.with_interval(self.heartbeat_interval);
1027+
.with_time(self.heartbeat_interval_ok)
1028+
.with_interval(self.heartbeat_interval_ok);
10061029
if let Err(err) = SockRef::from(&stream).set_tcp_keepalive(&keepalive) {
10071030
warn!(
10081031
backend = %connect_target,
@@ -1185,9 +1208,11 @@ impl Connector<RedisCommand> for RedisConnector {
11851208
) {
11861209
info!(cluster = %cluster, backend = %node.as_str(), "starting backend session");
11871210
let mut connection: Option<Framed<TcpStream, RespCodec>> = None;
1188-
let mut heartbeat = interval(self.heartbeat_interval);
1189-
heartbeat.set_missed_tick_behavior(MissedTickBehavior::Delay);
1211+
let mut heartbeat_interval = self.heartbeat_interval_ok;
1212+
let heartbeat = sleep(heartbeat_interval);
1213+
tokio::pin!(heartbeat);
11901214
let mut current_delay = self.reconnect_delay;
1215+
let mut heartbeat_failures: u8 = 0;
11911216

11921217
loop {
11931218
tokio::select! {
@@ -1265,6 +1290,7 @@ impl Connector<RedisCommand> for RedisConnector {
12651290

12661291
match result {
12671292
Ok(resp) => {
1293+
heartbeat_failures = 0;
12681294
let mut result_label = if matches!(resp, RespValue::Error(_)) {
12691295
"resp_error"
12701296
} else {
@@ -1305,13 +1331,14 @@ impl Connector<RedisCommand> for RedisConnector {
13051331
metrics::backend_error(&cluster, node.as_str(), kind);
13061332
let _ = respond_to.send(Err(err));
13071333
connection = None;
1334+
heartbeat_failures = 0;
13081335
current_delay = self.increase_delay(current_delay);
13091336
sleep(current_delay).await;
13101337
}
13111338
}
13121339
}
13131340
}
1314-
_ = heartbeat.tick(), if connection.is_some() => {
1341+
_ = &mut heartbeat, if connection.is_some() => {
13151342
if let Some(ref mut framed) = connection {
13161343
let start = Instant::now();
13171344
let result = self.heartbeat(framed).await;
@@ -1323,6 +1350,8 @@ impl Connector<RedisCommand> for RedisConnector {
13231350
);
13241351
match result {
13251352
Ok(()) => {
1353+
heartbeat_failures = 0;
1354+
heartbeat_interval = self.heartbeat_interval_ok;
13261355
metrics::backend_heartbeat(&cluster, node.as_str(), true);
13271356
current_delay = self.reconnect_delay;
13281357
}
@@ -1335,10 +1364,20 @@ impl Connector<RedisCommand> for RedisConnector {
13351364
error = %err,
13361365
"standalone backend heartbeat failed"
13371366
);
1338-
connection = None;
1339-
current_delay = self.increase_delay(current_delay);
1367+
heartbeat_interval = self.heartbeat_interval_fail;
1368+
if self.heartbeat_fail_limit > 0 {
1369+
heartbeat_failures = heartbeat_failures.saturating_add(1);
1370+
if heartbeat_failures >= self.heartbeat_fail_limit {
1371+
connection = None;
1372+
heartbeat_failures = 0;
1373+
current_delay = self.increase_delay(current_delay);
1374+
}
1375+
}
13401376
}
13411377
}
1378+
heartbeat
1379+
.as_mut()
1380+
.reset((Instant::now() + heartbeat_interval).into());
13421381
}
13431382
}
13441383
}

0 commit comments

Comments
 (0)