Skip to content

Commit 3713f05

Browse files
authored
fix(trogon-nats): remove unnecessary Box::pin in flush retry closure (#72)
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
1 parent 1d8a489 commit 3713f05

2 files changed

Lines changed: 35 additions & 38 deletions

File tree

rsworkspace/crates/trogon-nats/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ uuid = { workspace = true }
2121

2222
[dev-dependencies]
2323
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
24+
tracing-subscriber = { workspace = true }
2425
trogon-std = { workspace = true, features = ["test-support"] }
2526

2627
[features]

rsworkspace/crates/trogon-nats/src/messaging.rs

Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -115,67 +115,61 @@ impl RetryPolicy {
115115
F: FnMut() -> Fut,
116116
Fut: std::future::Future<Output = Result<(), PublishOperationError>>,
117117
{
118-
let mut attempts = 0;
119-
let mut last_error: Option<PublishOperationError> = None;
118+
let mut last_error = match operation().await {
119+
Ok(()) => return Ok(()),
120+
Err(e) => e,
121+
};
122+
123+
for attempt in 1..=self.max_retries {
124+
let exp = (attempt - 1).min(31);
125+
let delay = self.initial_retry_delay * (1u32 << exp);
126+
tracing::debug!(
127+
error = %last_error,
128+
operation = operation_name,
129+
subject = %subject,
130+
attempt,
131+
max_retries = self.max_retries,
132+
delay_ms = delay.as_millis(),
133+
"Operation failed, retrying"
134+
);
135+
tokio::time::sleep(delay).await;
120136

121-
while attempts <= self.max_retries {
122-
attempts += 1;
123137
match operation().await {
124138
Ok(()) => {
125-
if attempts > 1 {
126-
tracing::info!(
127-
operation = operation_name,
128-
subject = %subject,
129-
attempts,
130-
"Operation succeeded after retries"
131-
);
132-
}
139+
tracing::info!(
140+
operation = operation_name,
141+
subject = %subject,
142+
attempts = attempt + 1,
143+
"Operation succeeded after retries"
144+
);
133145
return Ok(());
134146
}
135-
Err(e) => {
136-
last_error = Some(e);
137-
if attempts <= self.max_retries {
138-
let exp = (attempts - 1).min(31);
139-
let delay = self.initial_retry_delay * (1u32 << exp);
140-
tracing::debug!(
141-
error = %last_error.as_ref().unwrap(),
142-
operation = operation_name,
143-
subject = %subject,
144-
attempt = attempts,
145-
max_retries = self.max_retries,
146-
delay_ms = delay.as_millis(),
147-
"Operation failed, retrying"
148-
);
149-
tokio::time::sleep(delay).await;
150-
}
151-
}
147+
Err(e) => last_error = e,
152148
}
153149
}
154150

155-
let final_error = last_error.unwrap_or_else(|| {
156-
PublishOperationError("No error recorded in retry loop".to_string())
157-
});
151+
let attempts = self.max_retries + 1;
158152
if self.max_retries > 0 {
159153
tracing::warn!(
160-
error = %final_error,
154+
error = %last_error,
161155
operation = operation_name,
162156
subject = %subject,
163157
total_attempts = attempts,
164158
"Operation failed after all retry attempts"
165159
);
166160
Err(NatsError::PublishOperationExhausted {
167-
error: final_error,
161+
error: last_error,
168162
subject: subject.to_string(),
169163
attempts,
170164
})
171165
} else {
172166
tracing::warn!(
173-
error = %final_error,
167+
error = %last_error,
174168
operation = operation_name,
175169
subject = %subject,
176170
"Operation failed"
177171
);
178-
Err(NatsError::PublishOperation(final_error))
172+
Err(NatsError::PublishOperation(last_error))
179173
}
180174
}
181175
}
@@ -291,12 +285,12 @@ where
291285
.execute(
292286
|| {
293287
let client = client.clone();
294-
Box::pin(async move {
288+
async move {
295289
client
296290
.flush()
297291
.await
298292
.map_err(|e| PublishOperationError(e.to_string()))
299-
})
293+
}
300294
},
301295
"flush",
302296
subject,
@@ -705,6 +699,7 @@ mod tests {
705699
async fn test_retry_policy_execute_success_after_retries() {
706700
use std::sync::{Arc, Mutex};
707701

702+
let _ = tracing_subscriber::fmt().with_test_writer().try_init();
708703
let policy = RetryPolicy::standard();
709704
let call_count = Arc::new(Mutex::new(0));
710705

@@ -739,6 +734,7 @@ mod tests {
739734
async fn test_retry_policy_execute_exhausted() {
740735
use std::sync::{Arc, Mutex};
741736

737+
let _ = tracing_subscriber::fmt().with_test_writer().try_init();
742738
let policy = RetryPolicy::standard();
743739
let call_count = Arc::new(Mutex::new(0));
744740

0 commit comments

Comments
 (0)