Skip to content

Commit 5e625ea

Browse files
committed
Fix sub-second queue delays in TWMQ
Sub-second delayed requeues in TWMQ were being truncated to 0s because queue scheduling used Duration::as_secs(). That meant values like 200ms were effectively treated as immediate retries, which could cause hot-looping at queue poll cadence. This change adds a small helper that rounds any non-zero sub-second delay up to 1s and uses it consistently in: - queue push scheduling - hook scheduling - multilane queue scheduling
1 parent e146fa8 commit 5e625ea

4 files changed

Lines changed: 20 additions & 9 deletions

File tree

executors/src/solana_executor/worker.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ impl DurableExecution for SolanaExecutorJobHandler {
221221
type JobData = SolanaExecutorJobData;
222222

223223
#[tracing::instrument(
224-
skip(self, job),
224+
skip(self, job),
225225
fields(
226226
transaction_id = job.job.id,
227227
stage = Self::stage_name()
@@ -648,7 +648,6 @@ impl SolanaExecutorJobHandler {
648648
let has_signatures = versioned_tx.signatures.iter().any(|sig| {
649649
sig.as_ref() != [0u8; 64]
650650
});
651-
652651
if has_signatures {
653652
error!(
654653
transaction_id = %transaction_id,

twmq/src/hooks.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::time::{SystemTime, UNIX_EPOCH};
22

3-
use crate::{DurableExecution, error::TwmqError, job::PushableJob};
3+
use crate::{DurableExecution, delay_to_queue_seconds, error::TwmqError, job::PushableJob};
44

55
// A minimal transaction context that hooks can use
66
pub struct TransactionContext<'a> {
@@ -51,7 +51,7 @@ impl<'a> TransactionContext<'a> {
5151
.sadd(job.queue.dedupe_set_name(), &job.options.id);
5252

5353
if let Some(delay_options) = job.options.delay {
54-
let process_at = now + delay_options.delay.as_secs();
54+
let process_at = now + delay_to_queue_seconds(delay_options.delay);
5555
self.pipeline
5656
.hset(
5757
job.queue.job_meta_hash_name(&job.options.id),

twmq/src/lib.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,18 @@ pub use queue::IdempotencyMode;
2828
pub use redis;
2929
use tracing::Instrument;
3030

31+
pub(crate) fn delay_to_queue_seconds(delay: Duration) -> u64 {
32+
let delay_secs = delay.as_secs();
33+
34+
if delay.is_zero() {
35+
0
36+
} else if delay_secs == 0 {
37+
1
38+
} else {
39+
delay_secs
40+
}
41+
}
42+
3143
// Trait for error types to implement user cancellation
3244
pub trait UserCancellable {
3345
fn user_cancelled() -> Self;
@@ -297,7 +309,7 @@ impl<H: DurableExecution> Queue<H> {
297309
position: RequeuePosition::Last,
298310
});
299311

300-
let delay_secs = delay.delay.as_secs();
312+
let delay_secs = delay_to_queue_seconds(delay.delay);
301313
let position_string = delay.position.to_string();
302314

303315
let _result: (i32, String) = script
@@ -1049,7 +1061,7 @@ impl<H: DurableExecution> Queue<H> {
10491061

10501062
// Add to proper queue based on delay and position
10511063
if let Some(delay_duration) = delay {
1052-
let delay_until = now + delay_duration.as_secs();
1064+
let delay_until = now + delay_to_queue_seconds(delay_duration);
10531065
let pos_str = position.to_string();
10541066

10551067
pipeline

twmq/src/multilane.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use tracing::Instrument;
99

1010
use crate::{
1111
CancelResult, DurableExecution, FailHookData, NackHookData, QueueInternalErrorHookData,
12-
SuccessHookData, UserCancellable,
12+
SuccessHookData, UserCancellable, delay_to_queue_seconds,
1313
error::TwmqError,
1414
hooks::TransactionContext,
1515
job::{
@@ -219,7 +219,7 @@ impl<H: DurableExecution> MultilaneQueue<H> {
219219
position: RequeuePosition::Last,
220220
});
221221

222-
let delay_secs = delay.delay.as_secs();
222+
let delay_secs = delay_to_queue_seconds(delay.delay);
223223
let position_string = delay.position.to_string();
224224

225225
let _result: (i32, String) = script
@@ -1226,7 +1226,7 @@ impl<H: DurableExecution> MultilaneQueue<H> {
12261226
.duration_since(UNIX_EPOCH)
12271227
.unwrap()
12281228
.as_secs();
1229-
let delay_until = now + delay_duration.as_secs();
1229+
let delay_until = now + delay_to_queue_seconds(*delay_duration);
12301230
let pos_str = position.to_string();
12311231

12321232
hook_pipeline

0 commit comments

Comments
 (0)