Skip to content

Commit b6f39f4

Browse files
authored
Fail stale jobs and cull zombie pending txns (#106)
* Fail stale jobs and cull zombie pending txns Bound job lifetimes and remove stale pending transactions to prevent zombie retries and unbounded resource growth. Adds a 24h max age check for confirmation and send jobs (EIP-7702 and external bundler), including a job_age_seconds helper and logging, so long-lived retrying jobs are permanently failed. Implements peek_pending_transactions_older_than in the EOA store to fetch pending entries older than a cutoff (and clean up missing data). Adds EOA worker logic to cull stale pending transactions (24h cutoff, max 500 per cycle) by batch-failing them and enqueuing failure webhooks. Small logging/error messages added to surface these events. * Add StaleJob error and use inspect_err Replace manual if-let logging with combinators in EoaExecutorWorker: call cull_stale_pending_transactions().await.inspect_err(...).map_err(|e| e.handle()) to log errors and convert them via handle(). Introduce a new UserOpConfirmationError::StaleJob variant carrying user_op_hash, attempt_number, and age_seconds to represent confirmation jobs that aged out; return this variant when a job exceeds MAX_CONFIRMATION_JOB_AGE_SECONDS and update the error-to-message mapping to include a message for StaleJob. * Extract hydrate_pending_transactions helper Introduce hydrate_pending_transactions to centralize the logic for pipelined HGET of transaction user_request fields, JSON deserialization into PendingTransaction, and cleanup (ZREM) of orphaned zset entries. Replace duplicated hydration code in several peek_pending_transactions* methods with calls to the new helper, allocate the result Vec with capacity, and adjust pipeline query calls accordingly. This refactor reduces duplication and consolidates deserialization/error handling and cleanup in one place for easier maintenance.
1 parent 8d18867 commit b6f39f4

6 files changed

Lines changed: 239 additions & 71 deletions

File tree

executors/src/eip7702_executor/confirm.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,17 @@ fn transaction_hash_retry_delay(attempts: u32) -> Duration {
3939
}
4040
}
4141

42+
/// Maximum age (in seconds) of a confirmation job before it is permanently failed.
43+
const MAX_CONFIRMATION_JOB_AGE_SECONDS: u64 = 24 * 60 * 60;
44+
45+
fn job_age_seconds<T: Clone>(job: &BorrowedJob<T>) -> u64 {
46+
let now = std::time::SystemTime::now()
47+
.duration_since(std::time::UNIX_EPOCH)
48+
.map(|d| d.as_secs())
49+
.unwrap_or(0);
50+
now.saturating_sub(job.job.created_at)
51+
}
52+
4253
// --- Job Payload ---
4354
#[derive(Serialize, Deserialize, Debug, Clone)]
4455
#[serde(rename_all = "camelCase")]
@@ -186,6 +197,25 @@ where
186197
let job_data = &job.job.data;
187198
let transaction_hash_delay = transaction_hash_retry_delay(job.attempts());
188199

200+
let age_seconds = job_age_seconds(job);
201+
if age_seconds > MAX_CONFIRMATION_JOB_AGE_SECONDS {
202+
tracing::error!(
203+
bundler_transaction_id = job_data.bundler_transaction_id,
204+
transaction_id = job_data.transaction_id,
205+
attempts = job.attempts(),
206+
age_seconds,
207+
max_age_seconds = MAX_CONFIRMATION_JOB_AGE_SECONDS,
208+
"EIP-7702 confirmation job exceeded max age, failing permanently"
209+
);
210+
return Err(Eip7702ConfirmationError::TransactionHashError {
211+
message: format!(
212+
"Job exceeded maximum retry age of {MAX_CONFIRMATION_JOB_AGE_SECONDS}s (current age: {age_seconds}s, attempts: {attempts}); failing to prevent zombie retries",
213+
attempts = job.attempts()
214+
),
215+
})
216+
.map_err_fail();
217+
}
218+
189219
// 1. Get Chain
190220
let chain = self
191221
.chain_service

executors/src/eip7702_executor/send.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,27 @@ where
182182
) -> JobResult<Self::Output, Self::ErrorData> {
183183
let job_data = &job.job.data;
184184

185+
let now_secs = std::time::SystemTime::now()
186+
.duration_since(std::time::UNIX_EPOCH)
187+
.map(|d| d.as_secs())
188+
.unwrap_or(0);
189+
let age_seconds = now_secs.saturating_sub(job.job.created_at);
190+
if age_seconds > 24 * 60 * 60 {
191+
tracing::error!(
192+
transaction_id = job_data.transaction_id,
193+
attempts = job.attempts(),
194+
age_seconds,
195+
"EIP-7702 send job exceeded max age, failing permanently"
196+
);
197+
return Err(Eip7702SendError::InternalError {
198+
message: format!(
199+
"Send job exceeded maximum retry age of 24h (current age: {age_seconds}s, attempts: {attempts}); failing to prevent zombie retries",
200+
attempts = job.attempts()
201+
),
202+
})
203+
.map_err_fail();
204+
}
205+
185206
// 1. Get Chain
186207
let chain = self
187208
.chain_service

executors/src/eoa/store/mod.rs

Lines changed: 62 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -549,10 +549,11 @@ impl EoaExecutorStore {
549549
self.peek_pending_transactions_paginated(0, limit).await
550550
}
551551

552-
/// Peek at pending transactions with pagination support
553-
pub async fn peek_pending_transactions_paginated(
552+
/// Peek at pending transactions whose `queued_at` (unix ms) is strictly older than
553+
/// `older_than_unix_ms`.
554+
pub async fn peek_pending_transactions_older_than(
554555
&self,
555-
offset: u64,
556+
older_than_unix_ms: u64,
556557
limit: u64,
557558
) -> Result<Vec<PendingTransaction>, TransactionStoreError> {
558559
if limit == 0 {
@@ -562,27 +563,48 @@ impl EoaExecutorStore {
562563
let pending_key = self.pending_transactions_zset_name();
563564
let mut conn = self.redis.clone();
564565

565-
// Use ZRANGE to peek without removing, with offset support
566-
let start = offset as isize;
567-
let stop = (offset + limit - 1) as isize;
568-
566+
let max_exclusive = format!("({older_than_unix_ms}");
569567
let transaction_ids: Vec<PendingTransactionStringWithQueuedAt> =
570-
conn.zrange_withscores(&pending_key, start, stop).await?;
568+
twmq::redis::cmd("ZRANGEBYSCORE")
569+
.arg(&pending_key)
570+
.arg(0)
571+
.arg(&max_exclusive)
572+
.arg("WITHSCORES")
573+
.arg("LIMIT")
574+
.arg(0)
575+
.arg(limit as isize)
576+
.query_async(&mut conn)
577+
.await?;
578+
579+
self.hydrate_pending_transactions(&mut conn, transaction_ids)
580+
.await
581+
}
571582

583+
/// Given a list of (transaction_id, queued_at) tuples, fetch each transaction's
584+
/// `user_request` via a pipelined HGET, deserialize into `PendingTransaction`,
585+
/// and ZREM any entries whose transaction data has gone missing.
586+
///
587+
/// Shared by the various `peek_pending_transactions*` methods to centralize
588+
/// hydration, deserialization error handling, and cleanup of orphaned zset
589+
/// entries.
590+
async fn hydrate_pending_transactions(
591+
&self,
592+
conn: &mut ConnectionManager,
593+
transaction_ids: Vec<PendingTransactionStringWithQueuedAt>,
594+
) -> Result<Vec<PendingTransaction>, TransactionStoreError> {
572595
if transaction_ids.is_empty() {
573596
return Ok(Vec::new());
574597
}
575598

576599
let mut pipe = twmq::redis::pipe();
577-
578600
for (transaction_id, _) in &transaction_ids {
579601
let tx_data_key = self.transaction_data_key_name(transaction_id);
580602
pipe.hget(&tx_data_key, "user_request");
581603
}
604+
let user_requests: Vec<Option<String>> = pipe.query_async(conn).await?;
582605

583-
let user_requests: Vec<Option<String>> = pipe.query_async(&mut conn).await?;
584-
585-
let mut pending_transactions: Vec<PendingTransaction> = Vec::new();
606+
let mut pending_transactions: Vec<PendingTransaction> =
607+
Vec::with_capacity(transaction_ids.len());
586608
let mut deletion_pipe = twmq::redis::pipe();
587609

588610
for ((transaction_id, queued_at), user_request) in
@@ -608,29 +630,36 @@ impl EoaExecutorStore {
608630
}
609631

610632
if !deletion_pipe.is_empty() {
611-
deletion_pipe.query_async::<()>(&mut conn).await?;
633+
deletion_pipe.query_async::<()>(conn).await?;
612634
}
613635

614-
// let user_requests: Vec<EoaTransactionRequest> = user_requests
615-
// .into_iter()
616-
// .map(|user_request_json| serde_json::from_str(&user_request_json))
617-
// .collect::<Result<Vec<EoaTransactionRequest>, serde_json::Error>>()?;
618-
619-
// let pending_transactions: Vec<PendingTransaction> = transaction_ids
620-
// .into_iter()
621-
// .zip(user_requests)
622-
// .map(
623-
// |((transaction_id, queued_at), user_request)| PendingTransaction {
624-
// transaction_id,
625-
// queued_at,
626-
// user_request,
627-
// },
628-
// )
629-
// .collect();
630-
631636
Ok(pending_transactions)
632637
}
633638

639+
/// Peek at pending transactions with pagination support
640+
pub async fn peek_pending_transactions_paginated(
641+
&self,
642+
offset: u64,
643+
limit: u64,
644+
) -> Result<Vec<PendingTransaction>, TransactionStoreError> {
645+
if limit == 0 {
646+
return Ok(Vec::new());
647+
}
648+
649+
let pending_key = self.pending_transactions_zset_name();
650+
let mut conn = self.redis.clone();
651+
652+
// Use ZRANGE to peek without removing, with offset support
653+
let start = offset as isize;
654+
let stop = (offset + limit - 1) as isize;
655+
656+
let transaction_ids: Vec<PendingTransactionStringWithQueuedAt> =
657+
conn.zrange_withscores(&pending_key, start, stop).await?;
658+
659+
self.hydrate_pending_transactions(&mut conn, transaction_ids)
660+
.await
661+
}
662+
634663
/// Peek at pending transactions and get optimistic nonce in a single operation
635664
/// This is optimized for the send flow to reduce Redis round-trips
636665
pub async fn peek_pending_transactions_with_optimistic_nonce(
@@ -661,47 +690,9 @@ impl EoaExecutorStore {
661690

662691
let optimistic = optimistic_nonce.ok_or_else(|| self.nonce_sync_required_error())?;
663692

664-
if transaction_ids.is_empty() {
665-
return Ok((Vec::new(), optimistic));
666-
}
667-
668-
// Second pipeline: Get transaction data
669-
let mut pipe = twmq::redis::pipe();
670-
for (transaction_id, _) in &transaction_ids {
671-
let tx_data_key = self.transaction_data_key_name(transaction_id);
672-
pipe.hget(&tx_data_key, "user_request");
673-
}
674-
675-
let user_requests: Vec<Option<String>> = pipe.query_async(&mut conn).await?;
676-
677-
let mut pending_transactions: Vec<PendingTransaction> = Vec::new();
678-
let mut deletion_pipe = twmq::redis::pipe();
679-
680-
for ((transaction_id, queued_at), user_request) in
681-
transaction_ids.into_iter().zip(user_requests)
682-
{
683-
match user_request {
684-
Some(user_request) => {
685-
let user_request_parsed = serde_json::from_str(&user_request)?;
686-
pending_transactions.push(PendingTransaction {
687-
transaction_id,
688-
queued_at,
689-
user_request: user_request_parsed,
690-
});
691-
}
692-
None => {
693-
tracing::warn!(
694-
"Transaction {} data was missing, deleting transaction from redis",
695-
transaction_id
696-
);
697-
deletion_pipe.zrem(self.keys.pending_transactions_zset_name(), transaction_id);
698-
}
699-
}
700-
}
701-
702-
if !deletion_pipe.is_empty() {
703-
deletion_pipe.query_async::<()>(&mut conn).await?;
704-
}
693+
let pending_transactions = self
694+
.hydrate_pending_transactions(&mut conn, transaction_ids)
695+
.await?;
705696

706697
Ok((pending_transactions, optimistic))
707698
}

executors/src/eoa/worker/mod.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ const MAX_RECYCLED_THRESHOLD: u64 = 50; // Circuit breaker from spec
3838
const TARGET_TRANSACTIONS_PER_EOA: u64 = 10; // Fleet management from spec
3939
const MIN_TRANSACTIONS_PER_EOA: u64 = 1; // Fleet management from spec
4040

41+
const MAX_PENDING_TRANSACTION_AGE_MS: u64 = 24 * 60 * 60 * 1000;
42+
const MAX_STALE_PENDING_PER_CYCLE: u64 = 500;
43+
4144
// ========== JOB DATA ==========
4245
#[derive(Serialize, Deserialize, Debug, Clone)]
4346
#[serde(rename_all = "camelCase")]
@@ -308,6 +311,18 @@ impl<C: Chain> EoaExecutorWorker<C> {
308311
async fn execute_main_workflow(
309312
&self,
310313
) -> JobResult<EoaExecutorWorkerResult, EoaExecutorWorkerError> {
314+
self.cull_stale_pending_transactions()
315+
.await
316+
.inspect_err(|e| {
317+
tracing::error!(
318+
error = ?e,
319+
eoa = ?self.eoa,
320+
chain_id = self.chain_id,
321+
"Error culling stale pending transactions"
322+
);
323+
})
324+
.map_err(|e| e.handle())?;
325+
311326
// 1. CRASH RECOVERY
312327
let start_time = current_timestamp_ms();
313328
let recovered = self
@@ -402,6 +417,55 @@ impl<C: Chain> EoaExecutorWorker<C> {
402417
})
403418
}
404419

420+
// ========== STALE TRANSACTION CULLING ==========
421+
#[tracing::instrument(skip_all)]
422+
async fn cull_stale_pending_transactions(&self) -> Result<(), EoaExecutorWorkerError> {
423+
let now_ms = current_timestamp_ms();
424+
let cutoff = now_ms.saturating_sub(MAX_PENDING_TRANSACTION_AGE_MS);
425+
426+
let stale = self
427+
.store
428+
.peek_pending_transactions_older_than(cutoff, MAX_STALE_PENDING_PER_CYCLE)
429+
.await
430+
.map_err(EoaExecutorWorkerError::from)?;
431+
432+
if stale.is_empty() {
433+
return Ok(());
434+
}
435+
436+
tracing::warn!(
437+
eoa = ?self.eoa,
438+
chain_id = self.chain_id,
439+
count = stale.len(),
440+
cutoff_unix_ms = cutoff,
441+
max_age_ms = MAX_PENDING_TRANSACTION_AGE_MS,
442+
"Culling stale pending transactions older than max age"
443+
);
444+
445+
let failures: Vec<(&crate::eoa::store::PendingTransaction, EoaExecutorWorkerError)> = stale
446+
.iter()
447+
.map(|p| {
448+
let age_ms = now_ms.saturating_sub(p.queued_at);
449+
(
450+
p,
451+
EoaExecutorWorkerError::InternalError {
452+
message: format!(
453+
"Transaction exceeded maximum pending age of {}ms (was pending for {}ms); failing to prevent zombie retries",
454+
MAX_PENDING_TRANSACTION_AGE_MS, age_ms
455+
),
456+
},
457+
)
458+
})
459+
.collect();
460+
461+
self.store
462+
.fail_pending_transactions_batch(failures, self.webhook_queue.clone())
463+
.await
464+
.map_err(EoaExecutorWorkerError::from)?;
465+
466+
Ok(())
467+
}
468+
405469
// ========== CRASH RECOVERY ==========
406470
#[tracing::instrument(skip_all)]
407471
async fn recover_borrowed_state(&self) -> Result<u32, EoaExecutorWorkerError> {

0 commit comments

Comments
 (0)