Skip to content
This repository was archived by the owner on Feb 16, 2026. It is now read-only.

Commit f769156

Browse files
authored
refactor: rename fields, methods, and vars related to RetryBackoff (#310)
1 parent d8bfe57 commit f769156

4 files changed

Lines changed: 34 additions & 35 deletions

File tree

src/api.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -821,7 +821,7 @@ impl<'a> RequestBuilder<'a> {
821821
async fn send(self) -> Result<UnaryResponse, ApiError> {
822822
let request = self.request;
823823

824-
let mut retry_backoffs: Option<RetryBackoff> = self
824+
let mut retry_backoff: Option<RetryBackoff> = self
825825
.retry_enabled
826826
.then(|| self.client.retry_builder.build());
827827

@@ -873,13 +873,13 @@ impl<'a> RequestBuilder<'a> {
873873
};
874874

875875
if err.is_retryable()
876-
&& let Some(backoff) = retry_backoffs.as_mut().and_then(|b| b.next())
876+
&& let Some(backoff) = retry_backoff.as_mut().and_then(|b| b.next())
877877
{
878878
let backoff = retry_after.map_or(backoff, |ra| ra.max(backoff));
879879
debug!(
880880
%err,
881881
?backoff,
882-
num_retries_remaining = retry_backoffs.as_ref().map(|b| b.remaining()).unwrap_or(0),
882+
num_retries_remaining = retry_backoff.as_ref().map(|b| b.remaining()).unwrap_or(0),
883883
"retrying request"
884884
);
885885
tokio::time::sleep(backoff).await;
@@ -888,7 +888,7 @@ impl<'a> RequestBuilder<'a> {
888888
%err,
889889
is_retryable = err.is_retryable(),
890890
retry_enabled = self.retry_enabled,
891-
retries_exhausted = retry_backoffs.as_ref().is_none_or(|b| b.is_exhausted()),
891+
retries_exhausted = retry_backoff.as_ref().is_none_or(|b| b.is_exhausted()),
892892
"not retrying request"
893893
);
894894
return Err(err);

src/retry.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,53 +45,53 @@ impl RetryBackoffBuilder {
4545
RetryBackoff {
4646
min_base_delay: self.min_base_delay,
4747
max_base_delay: self.max_base_delay,
48-
max_attempts: self.max_retries,
49-
cur_attempt: 0,
48+
max_retries: self.max_retries,
49+
cur_retry: 0,
5050
}
5151
}
5252
}
5353

5454
pub struct RetryBackoff {
5555
min_base_delay: Duration,
5656
max_base_delay: Duration,
57-
max_attempts: u32,
58-
cur_attempt: u32,
57+
max_retries: u32,
58+
cur_retry: u32,
5959
}
6060

6161
impl RetryBackoff {
6262
pub fn remaining(&self) -> u32 {
63-
self.max_attempts.saturating_sub(self.cur_attempt)
63+
self.max_retries.saturating_sub(self.cur_retry)
6464
}
6565

6666
pub fn is_exhausted(&self) -> bool {
67-
self.cur_attempt >= self.max_attempts
67+
self.cur_retry >= self.max_retries
6868
}
6969

7070
pub fn reset(&mut self) {
71-
self.cur_attempt = 0;
71+
self.cur_retry = 0;
7272
}
7373

74-
pub fn attempts_used(&self) -> u32 {
75-
self.cur_attempt
74+
pub fn used(&self) -> u32 {
75+
self.cur_retry
7676
}
7777
}
7878

7979
impl Iterator for RetryBackoff {
8080
type Item = Duration;
8181

8282
fn next(&mut self) -> Option<Self::Item> {
83-
if self.cur_attempt == self.max_attempts {
83+
if self.cur_retry == self.max_retries {
8484
return None;
8585
}
8686
let base_delay = (self
8787
.min_base_delay
88-
.saturating_mul(2u32.saturating_pow(self.cur_attempt)))
88+
.saturating_mul(2u32.saturating_pow(self.cur_retry)))
8989
.min(self.max_base_delay);
9090
let jitter =
9191
Duration::try_from_secs_f64(base_delay.as_secs_f64() * rng().random_range(0.0..=1.0))
9292
.unwrap_or(Duration::MAX);
9393
let delay = base_delay + jitter;
94-
self.cur_attempt += 1;
94+
self.cur_retry += 1;
9595
Some(delay)
9696
}
9797
}
@@ -131,31 +131,31 @@ mod tests {
131131
fn backoff_with_reset() {
132132
let mut backoff = RetryBackoffBuilder::default().with_max_retries(3).build();
133133

134-
assert_eq!(backoff.attempts_used(), 0);
134+
assert_eq!(backoff.used(), 0);
135135
assert_eq!(backoff.remaining(), 3);
136136
assert!(!backoff.is_exhausted());
137137

138138
assert!(backoff.next().is_some());
139-
assert_eq!(backoff.attempts_used(), 1);
139+
assert_eq!(backoff.used(), 1);
140140
assert_eq!(backoff.remaining(), 2);
141141
assert!(!backoff.is_exhausted());
142142

143143
backoff.reset();
144144

145-
assert_eq!(backoff.attempts_used(), 0);
145+
assert_eq!(backoff.used(), 0);
146146
assert_eq!(backoff.remaining(), 3);
147147
assert!(!backoff.is_exhausted());
148148

149149
assert!(backoff.next().is_some());
150-
assert_eq!(backoff.attempts_used(), 1);
150+
assert_eq!(backoff.used(), 1);
151151
assert_eq!(backoff.remaining(), 2);
152152

153153
assert!(backoff.next().is_some());
154-
assert_eq!(backoff.attempts_used(), 2);
154+
assert_eq!(backoff.used(), 2);
155155
assert_eq!(backoff.remaining(), 1);
156156

157157
assert!(backoff.next().is_some());
158-
assert_eq!(backoff.attempts_used(), 3);
158+
assert_eq!(backoff.used(), 3);
159159
assert_eq!(backoff.remaining(), 0);
160160
assert!(backoff.is_exhausted());
161161

src/session/append.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ async fn run_session_with_retry(
409409
stashed_submission: None,
410410
};
411411
let mut prev_total_acked_records = 0;
412-
let mut retry_backoffs = retry_builder.build();
412+
let mut retry_backoff = retry_builder.build();
413413

414414
loop {
415415
let result = run_session(&client, &stream, &mut state, buffer_size).await;
@@ -421,7 +421,7 @@ async fn run_session_with_retry(
421421
Err(err) => {
422422
if prev_total_acked_records < state.total_acked_records {
423423
prev_total_acked_records = state.total_acked_records;
424-
retry_backoffs.reset();
424+
retry_backoff.reset();
425425
}
426426

427427
let retry_policy_compliant = retry_policy_compliant(
@@ -431,20 +431,20 @@ async fn run_session_with_retry(
431431

432432
if retry_policy_compliant
433433
&& err.is_retryable()
434-
&& let Some(backoff) = retry_backoffs.next()
434+
&& let Some(backoff) = retry_backoff.next()
435435
{
436436
debug!(
437437
%err,
438438
?backoff,
439-
num_retries_remaining = retry_backoffs.remaining(),
439+
num_retries_remaining = retry_backoff.remaining(),
440440
"retrying append session"
441441
);
442442
tokio::time::sleep(backoff).await;
443443
} else {
444444
debug!(
445445
%err,
446446
retry_policy_compliant,
447-
retries_exhausted = retry_backoffs.is_exhausted(),
447+
retries_exhausted = retry_backoff.is_exhausted(),
448448
"not retrying append session"
449449
);
450450

src/session/read.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ pub async fn read_session(
4747
mut end: ReadEnd,
4848
ignore_command_records: bool,
4949
) -> Result<Streaming<ReadBatch>, ReadSessionError> {
50-
let retry_builder = retry_builder(&client.config.retry);
51-
let mut retry_backoffs = retry_builder.build();
50+
let mut retry_backoff = retry_builder(&client.config.retry).build();
5251
let baseline_wait = end.wait;
5352
let mut last_tail_at: Option<Instant> = None;
5453

@@ -64,11 +63,11 @@ pub async fn read_session(
6463
.await
6564
{
6665
Ok(batches) => {
67-
retry_backoffs.reset();
66+
retry_backoff.reset();
6867
break batches;
6968
}
7069
Err(err) => {
71-
if can_retry(&err, &mut retry_backoffs).await {
70+
if can_retry(&err, &mut retry_backoff).await {
7271
continue;
7372
}
7473
return Err(err);
@@ -91,7 +90,7 @@ pub async fn read_session(
9190
).await {
9291
Ok(b) => batches = Some(b),
9392
Err(err) => {
94-
if can_retry(&err, &mut retry_backoffs).await {
93+
if can_retry(&err, &mut retry_backoff).await {
9594
continue;
9695
}
9796
yield Err(err);
@@ -107,8 +106,8 @@ pub async fn read_session(
107106
.await
108107
{
109108
Some(Ok(batch)) => {
110-
if retry_backoffs.attempts_used() > 0 {
111-
retry_backoffs.reset();
109+
if retry_backoff.used() > 0 {
110+
retry_backoff.reset();
112111
}
113112

114113
if batch.tail.is_some() {
@@ -136,7 +135,7 @@ pub async fn read_session(
136135
}
137136
Some(Err(err)) => {
138137
batches = None;
139-
if can_retry(&err, &mut retry_backoffs).await {
138+
if can_retry(&err, &mut retry_backoff).await {
140139
continue;
141140
}
142141
yield Err(err);

0 commit comments

Comments
 (0)