diff --git a/plugins/examples/channels/index.ts b/plugins/examples/channels/index.ts new file mode 100644 index 000000000..8e483a238 --- /dev/null +++ b/plugins/examples/channels/index.ts @@ -0,0 +1 @@ +export { handler } from '@openzeppelin/relayer-plugin-channels'; diff --git a/src/domain/transaction/stellar/status.rs b/src/domain/transaction/stellar/status.rs index 46ad22781..d6d1379b5 100644 --- a/src/domain/transaction/stellar/status.rs +++ b/src/domain/transaction/stellar/status.rs @@ -112,6 +112,23 @@ where "status check encountered error" ); + // CAS conflict means another writer already mutated this tx. + // Reload the latest state and return Ok so the status handler + // sees a non-final status and schedules the next poll cycle via + // HandlerError::Retry — no work is lost, just deferred. + if error.is_concurrent_update_conflict() { + info!( + tx_id = %tx.id, + relayer_id = %tx.relayer_id, + "concurrent transaction update detected during status handling, reloading latest state" + ); + return self + .transaction_repository() + .get_by_id(tx.id.clone()) + .await + .map_err(TransactionError::from); + } + // Handle different error types appropriately match error { TransactionError::ValidationError(ref msg) => { @@ -1746,6 +1763,80 @@ mod tests { .unwrap() .contains("stuck in Sent status for too long")); } + #[tokio::test] + async fn handle_status_concurrent_update_conflict_reloads_latest_state() { + // When status_core returns ConcurrentUpdateConflict, the handler + // should reload the latest state via get_by_id and return Ok. + let relayer = create_test_relayer(); + let mut mocks = default_test_mocks(); + + let mut tx = create_test_transaction(&relayer.id); + tx.id = "tx-cas-conflict".to_string(); + tx.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339(); + let tx_hash_bytes = [11u8; 32]; + if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data { + stellar_data.hash = Some(hex::encode(tx_hash_bytes)); + } + tx.status = TransactionStatus::Submitted; + + let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes); + + // Provider returns SUCCESS — triggers a partial_update for confirmation + mocks + .provider + .expect_get_transaction() + .with(eq(expected_stellar_hash)) + .times(1) + .returning(move |_| { + Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) }) + }); + + // partial_update fails with ConcurrentUpdateConflict + mocks + .tx_repo + .expect_partial_update() + .times(1) + .returning(|_id, _update| { + Err(RepositoryError::ConcurrentUpdateConflict( + "CAS mismatch".to_string(), + )) + }); + + // After conflict, handler reloads via get_by_id + let reloaded_tx = { + let mut t = create_test_transaction(&relayer.id); + t.id = "tx-cas-conflict".to_string(); + // Simulate another writer already confirmed it + t.status = TransactionStatus::Confirmed; + t + }; + let reloaded_clone = reloaded_tx.clone(); + mocks + .tx_repo + .expect_get_by_id() + .with(eq("tx-cas-conflict".to_string())) + .times(1) + .returning(move |_| Ok(reloaded_clone.clone())); + + // No notifications or job enqueuing should happen on CAS path + mocks + .job_producer + .expect_produce_send_notification_job() + .never(); + mocks + .job_producer + .expect_produce_transaction_request_job() + .never(); + + let handler = make_stellar_tx_handler(relayer.clone(), mocks); + let result = handler.handle_transaction_status_impl(tx, None).await; + + assert!(result.is_ok(), "CAS conflict should return Ok after reload"); + let returned_tx = result.unwrap(); + assert_eq!(returned_tx.id, "tx-cas-conflict"); + // The reloaded tx reflects what the other writer persisted + assert_eq!(returned_tx.status, TransactionStatus::Confirmed); + } } mod handle_pending_state_tests { diff --git a/src/domain/transaction/stellar/submit.rs b/src/domain/transaction/stellar/submit.rs index 88af382ca..630f8e400 100644 --- a/src/domain/transaction/stellar/submit.rs +++ b/src/domain/transaction/stellar/submit.rs @@ -125,7 +125,6 @@ where "transaction already submitted (DUPLICATE status)" ); } - let tx_hash_hex = response.hash.clone(); let updated_stellar_data = stellar_data.with_hash(tx_hash_hex.clone()); @@ -161,16 +160,24 @@ where } "TRY_AGAIN_LATER" => { // Network is temporarily congested — the transaction is valid but the - // node's queue is full. Update sent_at so the status checker's backoff - // gate measures time since this attempt, then return Ok to keep the - // transaction alive. The status checker will handle retries: + // node's queue is full. Atomically update sent_at and increment + // try_again_later_retries so the status checker's backoff gate measures + // time since this attempt. Return Ok to keep the transaction alive. + // The status checker will handle retries: // - Submitted txs: resubmitted with exponential backoff // - Sent txs: re-enqueued via handle_sent_state - let mut meta = tx.metadata.clone().unwrap_or_default(); - meta.try_again_later_retries = meta.try_again_later_retries.saturating_add(1); + let updated_tx = self + .transaction_repository() + .record_stellar_try_again_later_retry(tx.id.clone(), Utc::now().to_rfc3339()) + .await?; + + let retries = updated_tx + .metadata + .as_ref() + .map_or(0, |m| m.try_again_later_retries); // Only push on first encounter (dedup: won't fire on retry 2, 3, etc.) - if meta.try_again_later_retries == 1 { + if retries == 1 { crate::metrics::STELLAR_TRY_AGAIN_LATER .with_label_values(&[&tx.relayer_id, &tx.status.to_string()]) .inc(); @@ -180,18 +187,9 @@ where tx_id = %tx.id, relayer_id = %tx.relayer_id, status = ?tx.status, - try_again_later_retries = meta.try_again_later_retries, + try_again_later_retries = retries, "TRY_AGAIN_LATER — status checker will retry" ); - let update_req = TransactionUpdateRequest { - sent_at: Some(Utc::now().to_rfc3339()), - metadata: Some(meta), - ..Default::default() - }; - let updated_tx = self - .transaction_repository() - .partial_update(tx.id.clone(), update_req) - .await?; Ok(updated_tx) } "ERROR" => { @@ -235,14 +233,13 @@ where result_code = decoded_result_code.as_deref().unwrap_or("Unknown"), "ERROR with insufficient fee — status checker will retry" ); - let update_req = TransactionUpdateRequest { - sent_at: Some(Utc::now().to_rfc3339()), - metadata: Some(meta), - ..Default::default() - }; + // Atomically sets `sent_at` and increments Stellar insufficient-fee retries. let updated_tx = self .transaction_repository() - .partial_update(tx.id.clone(), update_req) + .record_stellar_insufficient_fee_retry( + tx.id.clone(), + Utc::now().to_rfc3339(), + ) .await?; return Ok(updated_tx); } @@ -292,6 +289,23 @@ where "transaction submission failed" ); + // CAS conflict in the submission path only occurs after the RPC + // already accepted the transaction (PENDING status update raced). + // The on-chain state is valid; reload the latest DB state and return + // Ok — the status checker will reconcile on its next poll. + if error.is_concurrent_update_conflict() { + info!( + tx_id = %tx_id, + relayer_id = %relayer_id, + "concurrent transaction update detected during submission, reloading latest state" + ); + return self + .transaction_repository() + .get_by_id(tx_id) + .await + .map_err(TransactionError::from); + } + if is_bad_sequence_error(&error_reason) { // For bad sequence errors, sync sequence from chain first if let Ok(stellar_data) = tx.network_data.get_stellar_transaction_data() { @@ -974,22 +988,20 @@ mod tests { Box::pin(async move { Ok(r) }) }); - // partial_update is called to refresh sent_at and persist metadata — status should NOT change mocks .tx_repo - .expect_partial_update() - .withf(|_, upd| { - upd.sent_at.is_some() - && upd.status.is_none() - && upd - .metadata - .as_ref() - .is_some_and(|m| m.try_again_later_retries == 1) - }) - .returning(|id, _upd| { + .expect_record_stellar_try_again_later_retry() + .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty()) + .returning(|id, _| { let mut tx = create_test_transaction("relayer-1"); tx.id = id; tx.status = TransactionStatus::Sent; + tx.metadata = Some(TransactionMetadata { + consecutive_failures: 0, + total_failures: 0, + insufficient_fee_retries: 0, + try_again_later_retries: 1, + }); Ok::<_, RepositoryError>(tx) }); @@ -1028,14 +1040,20 @@ mod tests { }); submit_mocks .tx_repo - .expect_partial_update() - .withf(|_, upd| upd.sent_at.is_some() && upd.status.is_none()) + .expect_record_stellar_try_again_later_retry() + .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty()) .times(1) - .returning(|id, upd| { + .returning(|id, sent_at| { let mut tx = create_test_transaction("relayer-1"); tx.id = id; tx.status = TransactionStatus::Sent; - tx.sent_at = upd.sent_at.clone(); + tx.sent_at = Some(sent_at); + tx.metadata = Some(TransactionMetadata { + consecutive_failures: 0, + total_failures: 0, + insufficient_fee_retries: 0, + try_again_later_retries: 1, + }); Ok::<_, RepositoryError>(tx) }); @@ -1102,18 +1120,20 @@ mod tests { Box::pin(async move { Ok(r) }) }); - // partial_update is called to refresh sent_at so the status checker's - // backoff gate measures time since this attempt, not the original submission. mocks .tx_repo - .expect_partial_update() - .withf(|_, upd| { - upd.sent_at.is_some() && upd.status.is_none() // status should not change - }) - .returning(|id, _upd| { + .expect_record_stellar_try_again_later_retry() + .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty()) + .returning(|id, _| { let mut tx = create_test_transaction("relayer-1"); tx.id = id; tx.status = TransactionStatus::Submitted; + tx.metadata = Some(TransactionMetadata { + consecutive_failures: 0, + total_failures: 0, + insufficient_fee_retries: 0, + try_again_later_retries: 1, + }); Ok::<_, RepositoryError>(tx) }); @@ -1217,25 +1237,24 @@ mod tests { Box::pin(async move { Ok(r) }) }); - // partial_update is called to refresh sent_at — status should NOT change + // insufficient-fee retry updates sent_at and retry metadata atomically mocks .tx_repo - .expect_partial_update() - .withf(|_, upd| { - upd.sent_at.is_some() - && upd.status.is_none() - && upd - .metadata - .as_ref() - .is_some_and(|metadata| metadata.insufficient_fee_retries == 1) - }) - .returning(|id, upd| { + .expect_record_stellar_insufficient_fee_retry() + .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty()) + .returning(|id, _| { let mut tx = create_test_transaction("relayer-1"); tx.id = id; tx.status = TransactionStatus::Sent; - tx.metadata = upd.metadata; + tx.metadata = Some(TransactionMetadata { + consecutive_failures: 0, + total_failures: 0, + insufficient_fee_retries: 1, + try_again_later_retries: 0, + }); Ok::<_, RepositoryError>(tx) - }); + }) + .times(1); let handler = make_stellar_tx_handler(relayer.clone(), mocks); let mut tx = create_test_transaction(&relayer.id); @@ -1400,5 +1419,76 @@ mod tests { let failed_tx = res.unwrap(); assert_eq!(failed_tx.status, TransactionStatus::Failed); } + + #[tokio::test] + async fn submit_transaction_concurrent_update_conflict_reloads_latest_state() { + // When partial_update fails with ConcurrentUpdateConflict during submission, + // the handler should reload the latest state via get_by_id and return Ok. + let relayer = create_test_relayer(); + let mut mocks = default_test_mocks(); + + // Provider returns PENDING — submission to RPC succeeded + let response = create_send_tx_response( + "PENDING", + "0101010101010101010101010101010101010101010101010101010101010101", + ); + mocks + .provider + .expect_send_transaction_with_status() + .returning(move |_| { + let r = response.clone(); + Box::pin(async move { Ok(r) }) + }); + + // partial_update (Submitted) fails with CAS conflict + mocks + .tx_repo + .expect_partial_update() + .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted)) + .times(1) + .returning(|_, _| { + Err(RepositoryError::ConcurrentUpdateConflict( + "CAS mismatch".to_string(), + )) + }); + + // After conflict, handler reloads via get_by_id + let reloaded_tx = { + let mut t = create_test_transaction(&relayer.id); + t.status = TransactionStatus::Submitted; + t + }; + let reloaded_clone = reloaded_tx.clone(); + mocks + .tx_repo + .expect_get_by_id() + .times(1) + .returning(move |_| Ok(reloaded_clone.clone())); + + // No failure handling (notifications, next-pending) should occur + mocks + .job_producer + .expect_produce_send_notification_job() + .never(); + mocks + .job_producer + .expect_produce_transaction_request_job() + .never(); + + let handler = make_stellar_tx_handler(relayer.clone(), mocks); + let mut tx = create_test_transaction(&relayer.id); + tx.status = TransactionStatus::Sent; + if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data { + data.signatures.push(dummy_signature()); + data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2)); + } + + let res = handler.submit_transaction_impl(tx).await; + + assert!(res.is_ok(), "CAS conflict should return Ok after reload"); + let returned_tx = res.unwrap(); + // Reloaded state reflects the concurrent writer's update + assert_eq!(returned_tx.status, TransactionStatus::Submitted); + } } } diff --git a/src/jobs/handlers/transaction_status_handler.rs b/src/jobs/handlers/transaction_status_handler.rs index 3f357243a..0e1be351a 100644 --- a/src/jobs/handlers/transaction_status_handler.rs +++ b/src/jobs/handlers/transaction_status_handler.rs @@ -13,8 +13,7 @@ use crate::{ domain::{get_relayer_transaction, get_transaction_by_id, is_final_state, Transaction}, jobs::{Job, StatusCheckContext, TransactionStatusCheck}, models::{ - ApiError, DefaultAppState, TransactionMetadata, TransactionRepoModel, - TransactionUpdateRequest, + ApiError, DefaultAppState, TransactionError, TransactionMetadata, TransactionRepoModel, }, observability::request_id::set_request_id, queues::{HandlerError, WorkerContext}, @@ -70,7 +69,7 @@ pub async fn transaction_status_handler( /// - If error with should_retry=false → Return Ok (job completes, e.g., transaction not found) /// - If counters are None (early failure) → Skip counter updates /// -/// Counters are stored in transaction metadata, persisted via partial_update. +/// Counters are stored in transaction metadata, persisted via atomic Lua scripts. async fn handle_result( result: Result, tx_repo: &TR, @@ -105,19 +104,15 @@ where "transaction not in final state" ); - // Use tx.metadata (fresh from handle_transaction_status) instead of the - // pre-check snapshot to avoid overwriting retry counters - // (e.g. try_again_later_retries, insufficient_fee_retries) that may have - // been updated during resubmission. + // Use fresh metadata from the transaction (updated during handle_transaction_status) + // to decide whether a reset is needed, falling back to the pre-check snapshot. let fresh_meta = tx.metadata.clone().or(metadata); - if let Some(mut meta) = fresh_meta { - if meta.consecutive_failures > 0 || meta.total_failures > 0 { - meta.consecutive_failures = 0; - let update = TransactionUpdateRequest { - metadata: Some(meta), - ..Default::default() - }; - if let Err(e) = tx_repo.partial_update(tx_id.to_string(), update).await { + if let Some(meta) = fresh_meta { + if meta.consecutive_failures > 0 { + if let Err(e) = tx_repo + .reset_status_check_consecutive_failures(tx_id.to_string()) + .await + { warn!(error = %e, tx_id = %tx_id, relayer_id = %tx.relayer_id, "failed to reset consecutive counter"); } } @@ -130,6 +125,17 @@ where ))) } Err(e) => { + if e.downcast_ref::() + .is_some_and(TransactionError::is_concurrent_update_conflict) + { + info!( + error = %e, + tx_id = %tx_id, + "status check lost a concurrent update race, completing job without counter changes" + ); + return Ok(()); + } + // Check if this is a permanent failure that shouldn't retry if !should_retry_on_error { info!( @@ -141,24 +147,20 @@ where } // Transient error - INCREMENT both counters (only if we have metadata) - if let Some(mut meta) = metadata { - meta.consecutive_failures = meta.consecutive_failures.saturating_add(1); - meta.total_failures = meta.total_failures.saturating_add(1); - + if let Some(meta) = metadata { warn!( error = %e, tx_id = %tx_id, - consecutive_failures = meta.consecutive_failures, - total_failures = meta.total_failures, + consecutive_failures = meta.consecutive_failures.saturating_add(1), + total_failures = meta.total_failures.saturating_add(1), "status check failed, incrementing failure counters" ); - // Update counters via transaction repository - let update = TransactionUpdateRequest { - metadata: Some(meta), - ..Default::default() - }; - if let Err(update_err) = tx_repo.partial_update(tx_id.to_string(), update).await { + // Update counters via atomic transaction repository method + if let Err(update_err) = tx_repo + .increment_status_check_failures(tx_id.to_string()) + .await + { warn!(error = %update_err, tx_id = %tx_id, "failed to update counters"); } } else { @@ -304,7 +306,10 @@ async fn handle_request( #[cfg(test)] mod tests { use super::*; - use crate::models::{NetworkType, TransactionStatus}; + use crate::{ + models::{NetworkType, TransactionStatus}, + repositories::MockTransactionRepository, + }; use std::collections::HashMap; #[tokio::test] @@ -505,6 +510,26 @@ mod tests { mod handle_request_result_tests { use super::*; + #[tokio::test] + async fn test_handle_result_ignores_concurrent_update_conflict() { + let tx_repo = MockTransactionRepository::new(); + + let result = handle_result( + Err(TransactionError::ConcurrentUpdateConflict("tx race".to_string()).into()), + &tx_repo, + "tx-1", + Some(TransactionMetadata { + consecutive_failures: 2, + total_failures: 5, + ..Default::default() + }), + true, + ) + .await; + + assert!(result.is_ok()); + } + #[test] fn test_handle_request_result_with_counters() { let result = HandleRequestResult { diff --git a/src/models/error/repository.rs b/src/models/error/repository.rs index ca526c46b..50e6a1b67 100644 --- a/src/models/error/repository.rs +++ b/src/models/error/repository.rs @@ -22,6 +22,9 @@ pub enum RepositoryError { #[error("Transaction failure: {0}")] TransactionFailure(String), + #[error("Concurrent update conflict: {0}")] + ConcurrentUpdateConflict(String), + #[error("Transaction validation failed: {0}")] TransactionValidationFailed(String), @@ -85,6 +88,7 @@ mod tests { RepositoryError::ConstraintViolation("Constraint error".to_string()), RepositoryError::InvalidData("Invalid data".to_string()), RepositoryError::TransactionFailure("Transaction failed".to_string()), + RepositoryError::ConcurrentUpdateConflict("Concurrent conflict".to_string()), RepositoryError::TransactionValidationFailed("Validation failed".to_string()), RepositoryError::PermissionDenied("Permission denied".to_string()), RepositoryError::NotSupported("Not supported".to_string()), diff --git a/src/models/error/transaction.rs b/src/models/error/transaction.rs index 1d8c87a5a..d5387b186 100644 --- a/src/models/error/transaction.rs +++ b/src/models/error/transaction.rs @@ -42,6 +42,9 @@ pub enum TransactionError { #[error("Unexpected error: {0}")] UnexpectedError(String), + #[error("Concurrent update conflict: {0}")] + ConcurrentUpdateConflict(String), + #[error("Not supported: {0}")] NotSupported(String), @@ -82,6 +85,7 @@ impl TransactionError { // Transient errors - may resolve on retry TransactionError::UnexpectedError(_) => true, + TransactionError::ConcurrentUpdateConflict(_) => true, TransactionError::JobProducerError(_) => true, // Permanent errors - fail immediately @@ -95,6 +99,11 @@ impl TransactionError { TransactionError::StellarTransactionValidationError(_) => false, } } + + /// Detects optimistic-lock conflicts caused by concurrent transaction updates. + pub fn is_concurrent_update_conflict(&self) -> bool { + matches!(self, TransactionError::ConcurrentUpdateConflict(_)) + } } impl From for ApiError { @@ -114,6 +123,7 @@ impl From for ApiError { } TransactionError::NotSupported(msg) => ApiError::BadRequest(msg), TransactionError::UnexpectedError(msg) => ApiError::InternalError(msg), + TransactionError::ConcurrentUpdateConflict(msg) => ApiError::InternalError(msg), TransactionError::SignerError(msg) => ApiError::InternalError(msg), TransactionError::InsufficientBalance(msg) => ApiError::BadRequest(msg), TransactionError::SimulationFailed(msg) => ApiError::BadRequest(msg), @@ -123,7 +133,25 @@ impl From for ApiError { impl From for TransactionError { fn from(error: RepositoryError) -> Self { - TransactionError::ValidationError(error.to_string()) + match error { + RepositoryError::NotFound(msg) + | RepositoryError::InvalidData(msg) + | RepositoryError::ConstraintViolation(msg) + | RepositoryError::TransactionValidationFailed(msg) => { + TransactionError::ValidationError(msg) + } + RepositoryError::ConcurrentUpdateConflict(msg) => { + TransactionError::ConcurrentUpdateConflict(msg) + } + RepositoryError::TransactionFailure(msg) + | RepositoryError::LockError(msg) + | RepositoryError::ConnectionError(msg) + | RepositoryError::PermissionDenied(msg) + | RepositoryError::Unknown(msg) + | RepositoryError::UnexpectedError(msg) + | RepositoryError::Other(msg) => TransactionError::UnexpectedError(msg), + RepositoryError::NotSupported(msg) => TransactionError::NotSupported(msg), + } } } @@ -205,6 +233,10 @@ mod tests { TransactionError::SimulationFailed("sim failed".to_string()), "Stellar transaction simulation failed: sim failed", ), + ( + TransactionError::ConcurrentUpdateConflict("conflict on tx-1".to_string()), + "Concurrent update conflict: conflict on tx-1", + ), ]; for (error, expected_message) in test_cases { @@ -247,6 +279,10 @@ mod tests { TransactionError::SimulationFailed("boom".to_string()), ApiError::BadRequest("boom".to_string()), ), + ( + TransactionError::ConcurrentUpdateConflict("conflict".to_string()), + ApiError::InternalError("conflict".to_string()), + ), ]; for (tx_error, expected_api_error) in test_cases { @@ -271,12 +307,52 @@ mod tests { match tx_error { TransactionError::ValidationError(msg) => { - assert_eq!(msg, "Entity not found: record not found"); + assert_eq!(msg, "record not found"); } _ => panic!("Expected TransactionError::ValidationError"), } } + #[test] + fn test_concurrent_update_conflict_variant() { + let error = TransactionError::ConcurrentUpdateConflict("conflict on tx-1".to_string()); + assert!(error.is_transient()); + assert!(error.is_concurrent_update_conflict()); + assert_eq!( + error.to_string(), + "Concurrent update conflict: conflict on tx-1" + ); + + let api_error = ApiError::from(error); + assert!(matches!(api_error, ApiError::InternalError(_))); + } + + #[test] + fn test_concurrent_update_conflict_repo_conversion() { + let repo_error = RepositoryError::ConcurrentUpdateConflict("conflict on tx-1".to_string()); + let tx_error = TransactionError::from(repo_error); + assert!(matches!( + tx_error, + TransactionError::ConcurrentUpdateConflict(_) + )); + assert!(tx_error.is_concurrent_update_conflict()); + } + + #[test] + fn test_non_conflict_errors_are_not_concurrent_update_conflict() { + let errors = vec![ + TransactionError::ValidationError("some error".to_string()), + TransactionError::UnexpectedError("some error".to_string()), + TransactionError::NetworkConfiguration("some error".to_string()), + ]; + for error in errors { + assert!( + !error.is_concurrent_update_conflict(), + "Error {error:?} should not be a concurrent update conflict" + ); + } + } + #[test] fn test_report_to_transaction_error() { let report = Report::msg("An unexpected error occurred"); diff --git a/src/models/transaction/repository.rs b/src/models/transaction/repository.rs index 803407a84..65490308e 100644 --- a/src/models/transaction/repository.rs +++ b/src/models/transaction/repository.rs @@ -73,22 +73,33 @@ pub struct TransactionMetadata { #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct TransactionUpdateRequest { + #[serde(skip_serializing_if = "Option::is_none")] pub status: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub status_reason: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub sent_at: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub confirmed_at: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub network_data: Option, /// Timestamp when gas price was determined + #[serde(skip_serializing_if = "Option::is_none")] pub priced_at: Option, /// History of transaction hashes + #[serde(skip_serializing_if = "Option::is_none")] pub hashes: Option>, /// Number of no-ops in the transaction + #[serde(skip_serializing_if = "Option::is_none")] pub noop_count: Option, /// Whether the transaction is canceled + #[serde(skip_serializing_if = "Option::is_none")] pub is_canceled: Option, /// Timestamp when this transaction should be deleted (for final states) + #[serde(skip_serializing_if = "Option::is_none")] pub delete_at: Option, /// Status check metadata (failure counters for circuit breaker) + #[serde(skip_serializing_if = "Option::is_none")] pub metadata: Option, } diff --git a/src/repositories/transaction/mod.rs b/src/repositories/transaction/mod.rs index c5787ca9f..2a96c0a88 100644 --- a/src/repositories/transaction/mod.rs +++ b/src/repositories/transaction/mod.rs @@ -116,6 +116,32 @@ pub trait TransactionRepository: Repository { sent_at: String, ) -> Result; + /// Atomically increments status-check failure counters using the latest stored metadata. + async fn increment_status_check_failures( + &self, + tx_id: String, + ) -> Result; + + /// Atomically resets consecutive status-check failures to zero while preserving other counters. + async fn reset_status_check_consecutive_failures( + &self, + tx_id: String, + ) -> Result; + + /// Atomically sets `sent_at` and increments Stellar insufficient-fee retries. + async fn record_stellar_insufficient_fee_retry( + &self, + tx_id: String, + sent_at: String, + ) -> Result; + + /// Atomically sets `sent_at` and increments Stellar try-again-later retries. + async fn record_stellar_try_again_later_retry( + &self, + tx_id: String, + sent_at: String, + ) -> Result; + /// Set the confirmed_at timestamp of a transaction async fn set_confirmed_at( &self, @@ -192,6 +218,10 @@ mockall::mock! { async fn partial_update(&self, tx_id: String, update: TransactionUpdateRequest) -> Result; async fn update_network_data(&self, tx_id: String, network_data: NetworkTransactionData) -> Result; async fn set_sent_at(&self, tx_id: String, sent_at: String) -> Result; + async fn increment_status_check_failures(&self, tx_id: String) -> Result; + async fn reset_status_check_consecutive_failures(&self, tx_id: String) -> Result; + async fn record_stellar_insufficient_fee_retry(&self, tx_id: String, sent_at: String) -> Result; + async fn record_stellar_try_again_later_retry(&self, tx_id: String, sent_at: String) -> Result; async fn set_confirmed_at(&self, tx_id: String, confirmed_at: String) -> Result; async fn count_by_status(&self, relayer_id: &str, statuses: &[TransactionStatus]) -> Result; async fn delete_by_ids(&self, ids: Vec) -> Result; @@ -367,6 +397,68 @@ impl TransactionRepository for TransactionRepositoryStorage { } } + async fn increment_status_check_failures( + &self, + tx_id: String, + ) -> Result { + match self { + TransactionRepositoryStorage::InMemory(repo) => { + repo.increment_status_check_failures(tx_id).await + } + TransactionRepositoryStorage::Redis(repo) => { + repo.increment_status_check_failures(tx_id).await + } + } + } + + async fn reset_status_check_consecutive_failures( + &self, + tx_id: String, + ) -> Result { + match self { + TransactionRepositoryStorage::InMemory(repo) => { + repo.reset_status_check_consecutive_failures(tx_id).await + } + TransactionRepositoryStorage::Redis(repo) => { + repo.reset_status_check_consecutive_failures(tx_id).await + } + } + } + + async fn record_stellar_insufficient_fee_retry( + &self, + tx_id: String, + sent_at: String, + ) -> Result { + match self { + TransactionRepositoryStorage::InMemory(repo) => { + repo.record_stellar_insufficient_fee_retry(tx_id, sent_at) + .await + } + TransactionRepositoryStorage::Redis(repo) => { + repo.record_stellar_insufficient_fee_retry(tx_id, sent_at) + .await + } + } + } + + async fn record_stellar_try_again_later_retry( + &self, + tx_id: String, + sent_at: String, + ) -> Result { + match self { + TransactionRepositoryStorage::InMemory(repo) => { + repo.record_stellar_try_again_later_retry(tx_id, sent_at) + .await + } + TransactionRepositoryStorage::Redis(repo) => { + repo.record_stellar_try_again_later_retry(tx_id, sent_at) + .await + } + } + } + async fn set_confirmed_at( &self, tx_id: String, @@ -1284,4 +1376,90 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_record_stellar_try_again_later_retry_in_memory() -> Result<()> { + let storage = TransactionRepositoryStorage::new_in_memory(); + let mut transaction = create_test_transaction("test-tx", "test-relayer"); + transaction.status = TransactionStatus::Sent; + storage.create(transaction).await?; + + let sent_at = "2025-03-18T10:00:00Z".to_string(); + let updated = storage + .record_stellar_try_again_later_retry("test-tx".to_string(), sent_at.clone()) + .await?; + + assert_eq!(updated.id, "test-tx"); + assert_eq!(updated.sent_at, Some(sent_at)); + let meta = updated.metadata.expect("metadata should be set"); + assert_eq!(meta.try_again_later_retries, 1); + assert_eq!(meta.consecutive_failures, 0); + assert_eq!(meta.total_failures, 0); + assert_eq!(meta.insufficient_fee_retries, 0); + + Ok(()) + } + + #[tokio::test] + async fn test_record_stellar_try_again_later_retry_accumulates_in_memory() -> Result<()> { + let storage = TransactionRepositoryStorage::new_in_memory(); + let mut transaction = create_test_transaction("test-tx", "test-relayer"); + transaction.status = TransactionStatus::Sent; + storage.create(transaction).await?; + + storage + .record_stellar_try_again_later_retry( + "test-tx".to_string(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await?; + + let updated = storage + .record_stellar_try_again_later_retry( + "test-tx".to_string(), + "2025-03-18T10:01:00Z".to_string(), + ) + .await?; + + assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z")); + let meta = updated.metadata.unwrap(); + assert_eq!(meta.try_again_later_retries, 2); + + Ok(()) + } + + #[tokio::test] + async fn test_record_stellar_try_again_later_retry_noop_on_final_state_in_memory() -> Result<()> + { + let storage = TransactionRepositoryStorage::new_in_memory(); + let mut transaction = create_test_transaction("test-tx", "test-relayer"); + transaction.status = TransactionStatus::Confirmed; + transaction.sent_at = Some("old-time".to_string()); + storage.create(transaction).await?; + + let result = storage + .record_stellar_try_again_later_retry("test-tx".to_string(), "new-time".to_string()) + .await?; + + assert_eq!(result.sent_at.as_deref(), Some("old-time")); + assert!(result.metadata.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_record_stellar_try_again_later_retry_not_found_in_memory() -> Result<()> { + let storage = TransactionRepositoryStorage::new_in_memory(); + + let result = storage + .record_stellar_try_again_later_retry( + "nonexistent".to_string(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await; + + assert!(matches!(result, Err(RepositoryError::NotFound(_)))); + + Ok(()) + } } diff --git a/src/repositories/transaction/transaction_in_memory.rs b/src/repositories/transaction/transaction_in_memory.rs index ecbc3bac5..0de1c71de 100644 --- a/src/repositories/transaction/transaction_in_memory.rs +++ b/src/repositories/transaction/transaction_in_memory.rs @@ -71,6 +71,16 @@ impl InMemoryTransactionRepository { .cmp(a_key) // Descending (newest first) .then_with(|| b.id.cmp(&a.id)) // Tie-breaker: sort by ID for deterministic ordering } + + fn is_final_state(status: &TransactionStatus) -> bool { + matches!( + status, + TransactionStatus::Confirmed + | TransactionStatus::Failed + | TransactionStatus::Expired + | TransactionStatus::Canceled + ) + } } // Implement both traits for InMemoryTransactionRepository @@ -355,9 +365,100 @@ impl TransactionRepository for InMemoryTransactionRepository { tx_id: String, sent_at: String, ) -> Result { - let mut tx = self.get_by_id(tx_id.clone()).await?; - tx.sent_at = Some(sent_at); - self.update(tx_id, tx).await + let update = TransactionUpdateRequest { + sent_at: Some(sent_at), + ..Default::default() + }; + self.partial_update(tx_id, update).await + } + + async fn increment_status_check_failures( + &self, + tx_id: String, + ) -> Result { + let mut store = Self::acquire_lock(&self.store).await?; + + if let Some(tx) = store.get_mut(&tx_id) { + if Self::is_final_state(&tx.status) { + return Ok(tx.clone()); + } + let mut metadata = tx.metadata.clone().unwrap_or_default(); + metadata.consecutive_failures = metadata.consecutive_failures.saturating_add(1); + metadata.total_failures = metadata.total_failures.saturating_add(1); + tx.metadata = Some(metadata); + Ok(tx.clone()) + } else { + Err(RepositoryError::NotFound(format!( + "Transaction with ID {tx_id} not found" + ))) + } + } + + async fn reset_status_check_consecutive_failures( + &self, + tx_id: String, + ) -> Result { + let mut store = Self::acquire_lock(&self.store).await?; + + if let Some(tx) = store.get_mut(&tx_id) { + if Self::is_final_state(&tx.status) { + return Ok(tx.clone()); + } + let mut metadata = tx.metadata.clone().unwrap_or_default(); + metadata.consecutive_failures = 0; + tx.metadata = Some(metadata); + Ok(tx.clone()) + } else { + Err(RepositoryError::NotFound(format!( + "Transaction with ID {tx_id} not found" + ))) + } + } + + async fn record_stellar_insufficient_fee_retry( + &self, + tx_id: String, + sent_at: String, + ) -> Result { + let mut store = Self::acquire_lock(&self.store).await?; + + if let Some(tx) = store.get_mut(&tx_id) { + if Self::is_final_state(&tx.status) { + return Ok(tx.clone()); + } + let mut metadata = tx.metadata.clone().unwrap_or_default(); + metadata.insufficient_fee_retries = metadata.insufficient_fee_retries.saturating_add(1); + tx.metadata = Some(metadata); + tx.sent_at = Some(sent_at); + Ok(tx.clone()) + } else { + Err(RepositoryError::NotFound(format!( + "Transaction with ID {tx_id} not found" + ))) + } + } + + async fn record_stellar_try_again_later_retry( + &self, + tx_id: String, + sent_at: String, + ) -> Result { + let mut store = Self::acquire_lock(&self.store).await?; + + if let Some(tx) = store.get_mut(&tx_id) { + if Self::is_final_state(&tx.status) { + return Ok(tx.clone()); + } + let mut metadata = tx.metadata.clone().unwrap_or_default(); + metadata.try_again_later_retries = metadata.try_again_later_retries.saturating_add(1); + tx.metadata = Some(metadata); + tx.sent_at = Some(sent_at); + Ok(tx.clone()) + } else { + Err(RepositoryError::NotFound(format!( + "Transaction with ID {tx_id} not found" + ))) + } } async fn set_confirmed_at( @@ -1932,4 +2033,304 @@ mod tests { let remaining = repo.get_by_id("tx-relayer-2".to_string()).await.unwrap(); assert_eq!(remaining.relayer_id, "relayer-2"); } + + // ── increment_status_check_failures ───────────────────────────── + + #[tokio::test] + async fn test_increment_status_check_failures_no_prior_metadata() { + let repo = InMemoryTransactionRepository::new(); + let tx = create_test_transaction_pending_state("tx-inc-1"); + repo.create(tx).await.unwrap(); + + let updated = repo + .increment_status_check_failures("tx-inc-1".to_string()) + .await + .unwrap(); + + let meta = updated.metadata.expect("metadata should be set"); + assert_eq!(meta.consecutive_failures, 1); + assert_eq!(meta.total_failures, 1); + assert_eq!(meta.insufficient_fee_retries, 0); + } + + #[tokio::test] + async fn test_increment_status_check_failures_accumulates() { + let repo = InMemoryTransactionRepository::new(); + let tx = create_test_transaction_pending_state("tx-inc-2"); + repo.create(tx).await.unwrap(); + + repo.increment_status_check_failures("tx-inc-2".to_string()) + .await + .unwrap(); + repo.increment_status_check_failures("tx-inc-2".to_string()) + .await + .unwrap(); + let updated = repo + .increment_status_check_failures("tx-inc-2".to_string()) + .await + .unwrap(); + + let meta = updated.metadata.unwrap(); + assert_eq!(meta.consecutive_failures, 3); + assert_eq!(meta.total_failures, 3); + } + + #[tokio::test] + async fn test_increment_status_check_failures_noop_on_final_state() { + let repo = InMemoryTransactionRepository::new(); + let mut tx = create_test_transaction_pending_state("tx-inc-final"); + tx.status = TransactionStatus::Confirmed; + repo.create(tx).await.unwrap(); + + let result = repo + .increment_status_check_failures("tx-inc-final".to_string()) + .await + .unwrap(); + + // Should return unchanged — no metadata set + assert!(result.metadata.is_none()); + assert_eq!(result.status, TransactionStatus::Confirmed); + } + + #[tokio::test] + async fn test_increment_status_check_failures_not_found() { + let repo = InMemoryTransactionRepository::new(); + let result = repo + .increment_status_check_failures("nonexistent".to_string()) + .await; + + assert!(matches!(result, Err(RepositoryError::NotFound(_)))); + } + + // ── reset_status_check_consecutive_failures ───────────────────── + + #[tokio::test] + async fn test_reset_consecutive_failures() { + let repo = InMemoryTransactionRepository::new(); + let tx = create_test_transaction_pending_state("tx-reset-1"); + repo.create(tx).await.unwrap(); + + // Increment a few times first + repo.increment_status_check_failures("tx-reset-1".to_string()) + .await + .unwrap(); + repo.increment_status_check_failures("tx-reset-1".to_string()) + .await + .unwrap(); + + let updated = repo + .reset_status_check_consecutive_failures("tx-reset-1".to_string()) + .await + .unwrap(); + + let meta = updated.metadata.unwrap(); + assert_eq!(meta.consecutive_failures, 0); + // total_failures should be preserved + assert_eq!(meta.total_failures, 2); + } + + #[tokio::test] + async fn test_reset_consecutive_failures_noop_on_final_state() { + let repo = InMemoryTransactionRepository::new(); + let mut tx = create_test_transaction_pending_state("tx-reset-final"); + tx.status = TransactionStatus::Failed; + tx.metadata = Some(crate::models::TransactionMetadata { + consecutive_failures: 5, + total_failures: 10, + insufficient_fee_retries: 0, + try_again_later_retries: 0, + }); + repo.create(tx).await.unwrap(); + + let result = repo + .reset_status_check_consecutive_failures("tx-reset-final".to_string()) + .await + .unwrap(); + + // Should return unchanged + let meta = result.metadata.unwrap(); + assert_eq!(meta.consecutive_failures, 5); + } + + #[tokio::test] + async fn test_reset_consecutive_failures_not_found() { + let repo = InMemoryTransactionRepository::new(); + let result = repo + .reset_status_check_consecutive_failures("nonexistent".to_string()) + .await; + + assert!(matches!(result, Err(RepositoryError::NotFound(_)))); + } + + // ── record_stellar_insufficient_fee_retry ─────────────────────── + + #[tokio::test] + async fn test_record_insufficient_fee_retry() { + let repo = InMemoryTransactionRepository::new(); + let mut tx = create_test_transaction_pending_state("tx-fee-1"); + tx.status = TransactionStatus::Sent; + tx.sent_at = None; + repo.create(tx).await.unwrap(); + + let updated = repo + .record_stellar_insufficient_fee_retry( + "tx-fee-1".to_string(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await + .unwrap(); + + assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z")); + let meta = updated.metadata.unwrap(); + assert_eq!(meta.insufficient_fee_retries, 1); + assert_eq!(meta.consecutive_failures, 0); + assert_eq!(meta.total_failures, 0); + } + + #[tokio::test] + async fn test_record_insufficient_fee_retry_accumulates() { + let repo = InMemoryTransactionRepository::new(); + let mut tx = create_test_transaction_pending_state("tx-fee-2"); + tx.status = TransactionStatus::Sent; + repo.create(tx).await.unwrap(); + + repo.record_stellar_insufficient_fee_retry( + "tx-fee-2".to_string(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await + .unwrap(); + + let updated = repo + .record_stellar_insufficient_fee_retry( + "tx-fee-2".to_string(), + "2025-03-18T10:01:00Z".to_string(), + ) + .await + .unwrap(); + + assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z")); + let meta = updated.metadata.unwrap(); + assert_eq!(meta.insufficient_fee_retries, 2); + } + + #[tokio::test] + async fn test_record_insufficient_fee_retry_noop_on_final_state() { + let repo = InMemoryTransactionRepository::new(); + let mut tx = create_test_transaction_pending_state("tx-fee-final"); + tx.status = TransactionStatus::Confirmed; + tx.sent_at = Some("old-time".to_string()); + repo.create(tx).await.unwrap(); + + let result = repo + .record_stellar_insufficient_fee_retry( + "tx-fee-final".to_string(), + "new-time".to_string(), + ) + .await + .unwrap(); + + // Should return unchanged + assert_eq!(result.sent_at.as_deref(), Some("old-time")); + assert!(result.metadata.is_none()); + } + + #[tokio::test] + async fn test_record_insufficient_fee_retry_not_found() { + let repo = InMemoryTransactionRepository::new(); + let result = repo + .record_stellar_insufficient_fee_retry( + "nonexistent".to_string(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await; + + assert!(matches!(result, Err(RepositoryError::NotFound(_)))); + } + + // ── record_stellar_try_again_later_retry ─────────────────────── + + #[tokio::test] + async fn test_record_try_again_later_retry() { + let repo = InMemoryTransactionRepository::new(); + let mut tx = create_test_transaction_pending_state("tx-tal-1"); + tx.status = TransactionStatus::Sent; + tx.sent_at = None; + repo.create(tx).await.unwrap(); + + let updated = repo + .record_stellar_try_again_later_retry( + "tx-tal-1".to_string(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await + .unwrap(); + + assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z")); + let meta = updated.metadata.unwrap(); + assert_eq!(meta.try_again_later_retries, 1); + assert_eq!(meta.consecutive_failures, 0); + assert_eq!(meta.total_failures, 0); + } + + #[tokio::test] + async fn test_record_try_again_later_retry_accumulates() { + let repo = InMemoryTransactionRepository::new(); + let mut tx = create_test_transaction_pending_state("tx-tal-2"); + tx.status = TransactionStatus::Sent; + repo.create(tx).await.unwrap(); + + repo.record_stellar_try_again_later_retry( + "tx-tal-2".to_string(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await + .unwrap(); + + let updated = repo + .record_stellar_try_again_later_retry( + "tx-tal-2".to_string(), + "2025-03-18T10:01:00Z".to_string(), + ) + .await + .unwrap(); + + assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z")); + let meta = updated.metadata.unwrap(); + assert_eq!(meta.try_again_later_retries, 2); + } + + #[tokio::test] + async fn test_record_try_again_later_retry_noop_on_final_state() { + let repo = InMemoryTransactionRepository::new(); + let mut tx = create_test_transaction_pending_state("tx-tal-final"); + tx.status = TransactionStatus::Confirmed; + tx.sent_at = Some("old-time".to_string()); + repo.create(tx).await.unwrap(); + + let result = repo + .record_stellar_try_again_later_retry( + "tx-tal-final".to_string(), + "new-time".to_string(), + ) + .await + .unwrap(); + + // Should return unchanged + assert_eq!(result.sent_at.as_deref(), Some("old-time")); + assert!(result.metadata.is_none()); + } + + #[tokio::test] + async fn test_record_try_again_later_retry_not_found() { + let repo = InMemoryTransactionRepository::new(); + let result = repo + .record_stellar_try_again_later_retry( + "nonexistent".to_string(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await; + + assert!(matches!(result, Err(RepositoryError::NotFound(_)))); + } } diff --git a/src/repositories/transaction/transaction_redis.rs b/src/repositories/transaction/transaction_redis.rs index 830e903e0..9406f7980 100644 --- a/src/repositories/transaction/transaction_redis.rs +++ b/src/repositories/transaction/transaction_redis.rs @@ -1,5 +1,7 @@ //! Redis-backed implementation of the TransactionRepository. +use crate::config::ServerConfig; +use crate::constants::FINAL_TRANSACTION_STATUSES; use crate::domain::transaction::common::is_final_state; use crate::metrics::{ TRANSACTIONS_BY_STATUS, TRANSACTIONS_CREATED, TRANSACTIONS_FAILED, @@ -112,6 +114,160 @@ impl RedisTransactionRepository { ) } + /// Returns the components needed for Lua scripts to resolve a tx key from + /// only the tx_id: (tx_to_relayer lookup key, key prefix, key suffix). + /// The Lua script does: `GET KEYS[1]` to get the relayer_id, then + /// constructs the tx key as `ARGV[1] .. relayer_id .. ARGV[2]`. + fn tx_key_parts(&self, tx_id: &str) -> (String, String, String) { + let lookup_key = self.tx_to_relayer_key(tx_id); + let key_prefix = format!("{}:{}:", self.key_prefix, RELAYER_PREFIX); + let key_suffix = format!(":{TX_PREFIX}:{tx_id}"); + (lookup_key, key_prefix, key_suffix) + } + + /// Executes an atomic Lua script with retry/backoff for transient Redis failures. + /// + /// Every script receives `KEYS[1]` = tx_to_relayer lookup key and + /// `ARGV[1..2]` = key prefix/suffix. `extra_args` are appended as `ARGV[3..]`. + /// The script must return the (possibly updated) JSON string or `false` for + /// not-found. + async fn run_atomic_script( + &self, + lua: &str, + tx_id: &str, + extra_args: &[&str], + op_name: &str, + ) -> Result { + const MAX_RETRIES: u32 = 3; + const BASE_BACKOFF_MS: u64 = 100; + + let (lookup_key, key_prefix, key_suffix) = self.tx_key_parts(tx_id); + let script = Script::new(lua); + let mut last_error = None; + + for attempt in 0..MAX_RETRIES { + let backoff = BASE_BACKOFF_MS * 2u64.pow(attempt); + + let mut conn = match self + .get_connection(self.connections.primary(), op_name) + .await + { + Ok(conn) => conn, + Err(e) => { + last_error = Some(e); + if attempt < MAX_RETRIES - 1 { + warn!(tx_id = %tx_id, attempt, op = %op_name, "connection failed, retrying"); + tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await; + continue; + } + return Err(last_error.unwrap()); + } + }; + + let mut invocation = script.prepare_invoke(); + invocation + .key(&lookup_key) + .arg(&key_prefix) + .arg(&key_suffix); + for arg in extra_args { + invocation.arg(*arg); + } + + match invocation.invoke_async::>(&mut conn).await { + Ok(result) => { + let json = result.ok_or_else(|| { + RepositoryError::NotFound(format!("Transaction with ID {tx_id} not found")) + })?; + return self.deserialize_entity::( + &json, + tx_id, + "transaction", + ); + } + Err(e) => { + last_error = Some(self.map_redis_error(e, op_name)); + if attempt < MAX_RETRIES - 1 { + warn!( + tx_id = %tx_id, attempt, op = %op_name, + "atomic script failed, retrying" + ); + tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await; + continue; + } + return Err(last_error.unwrap()); + } + } + } + Err(last_error.unwrap_or_else(|| { + RepositoryError::UnexpectedError(format!("retry loop exhausted for {op_name}")) + })) + } + + /// Executes a Lua script with retry/backoff, returning a Vec result + /// (for scripts that return Lua tables / multi-bulk replies). + /// Returns `Ok(None)` when the script returns `false`. + async fn run_script_with_retry_vec( + &self, + script: &Script, + lookup_key: &str, + key_prefix: &str, + key_suffix: &str, + extra_args: &[&str], + op_name: &str, + ) -> Result>, RepositoryError> { + const MAX_RETRIES: u32 = 3; + const BASE_BACKOFF_MS: u64 = 100; + + let mut last_error = None; + + for attempt in 0..MAX_RETRIES { + let backoff = BASE_BACKOFF_MS * 2u64.pow(attempt); + + let mut conn = match self + .get_connection(self.connections.primary(), op_name) + .await + { + Ok(conn) => conn, + Err(e) => { + last_error = Some(e); + if attempt < MAX_RETRIES - 1 { + warn!(op = %op_name, attempt, "connection failed, retrying"); + tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await; + continue; + } + return Err(last_error.unwrap()); + } + }; + + let mut invocation = script.prepare_invoke(); + invocation.key(lookup_key).arg(key_prefix).arg(key_suffix); + for arg in extra_args { + invocation.arg(*arg); + } + + // Redis returns `false` from Lua as a Nil bulk reply, which + // redis-rs maps to `None` for `Option>`. + match invocation + .invoke_async::>>(&mut conn) + .await + { + Ok(result) => return Ok(result), + Err(e) => { + last_error = Some(self.map_redis_error(e, op_name)); + if attempt < MAX_RETRIES - 1 { + warn!(op = %op_name, attempt, "script failed, retrying"); + tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await; + continue; + } + return Err(last_error.unwrap()); + } + } + } + Err(last_error.unwrap_or_else(|| { + RepositoryError::UnexpectedError(format!("retry loop exhausted for {op_name}")) + })) + } + /// Parse timestamp string to score for sorted set (milliseconds since epoch) fn timestamp_to_score(&self, timestamp: &str) -> f64 { chrono::DateTime::parse_from_rfc3339(timestamp) @@ -196,7 +352,6 @@ impl RedisTransactionRepository { let mut transactions = Vec::new(); let mut failed_count = 0; - let mut failed_ids = Vec::new(); for (i, value) in values.into_iter().enumerate() { match value { Some(json) => { @@ -488,6 +643,166 @@ impl RedisTransactionRepository { debug!(tx_id = %tx.id, "successfully removed all indexes for transaction"); Ok(()) } + + /// Track Prometheus metrics when a transaction status changes. + fn track_status_change_metrics( + &self, + _original_tx: &TransactionRepoModel, + updated_tx: &TransactionRepoModel, + old_status: &TransactionStatus, + new_status: &TransactionStatus, + ) { + let network_type = format!("{:?}", updated_tx.network_type).to_lowercase(); + let relayer_id = updated_tx.relayer_id.as_str(); + + // Track submission (when status changes to Submitted) + if *old_status != TransactionStatus::Submitted + && *new_status == TransactionStatus::Submitted + { + TRANSACTIONS_SUBMITTED + .with_label_values(&[relayer_id, &network_type]) + .inc(); + + if let Ok(created_time) = chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at) { + let processing_seconds = + (Utc::now() - created_time.with_timezone(&Utc)).num_seconds() as f64; + TRANSACTION_PROCESSING_TIME + .with_label_values(&[relayer_id, &network_type, "creation_to_submission"]) + .observe(processing_seconds); + } + } + + // Track status distribution (update gauge when status changes) + if old_status != new_status { + let old_status_str = format!("{old_status:?}").to_lowercase(); + let old_status_gauge = TRANSACTIONS_BY_STATUS.with_label_values(&[ + relayer_id, + &network_type, + &old_status_str, + ]); + let clamped_value = (old_status_gauge.get() - 1.0).max(0.0); + old_status_gauge.set(clamped_value); + + let new_status_str = format!("{new_status:?}").to_lowercase(); + TRANSACTIONS_BY_STATUS + .with_label_values(&[relayer_id, &network_type, &new_status_str]) + .inc(); + } + + // Track metrics for final transaction states + let was_final = is_final_state(old_status); + let is_final = is_final_state(new_status); + + if !was_final && is_final { + let meta = updated_tx.metadata.as_ref(); + let had_insufficient_fee = meta.is_some_and(|m| m.insufficient_fee_retries > 0); + let had_try_again_later = meta.is_some_and(|m| m.try_again_later_retries > 0); + + match new_status { + TransactionStatus::Confirmed => { + TRANSACTIONS_SUCCESS + .with_label_values(&[relayer_id, &network_type]) + .inc(); + if had_insufficient_fee { + TRANSACTIONS_INSUFFICIENT_FEE_SUCCESS + .with_label_values(&[relayer_id, &network_type]) + .inc(); + } + if had_try_again_later { + TRANSACTIONS_TRY_AGAIN_LATER_SUCCESS + .with_label_values(&[relayer_id, &network_type]) + .inc(); + } + + if let (Some(sent_at_str), Some(confirmed_at_str)) = + (&updated_tx.sent_at, &updated_tx.confirmed_at) + { + if let (Ok(sent_time), Ok(confirmed_time)) = ( + chrono::DateTime::parse_from_rfc3339(sent_at_str), + chrono::DateTime::parse_from_rfc3339(confirmed_at_str), + ) { + let processing_seconds = (confirmed_time.with_timezone(&Utc) + - sent_time.with_timezone(&Utc)) + .num_seconds() + as f64; + TRANSACTION_PROCESSING_TIME + .with_label_values(&[ + relayer_id, + &network_type, + "submission_to_confirmation", + ]) + .observe(processing_seconds); + } + } + + if let Ok(created_time) = + chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at) + { + if let Some(confirmed_at_str) = &updated_tx.confirmed_at { + if let Ok(confirmed_time) = + chrono::DateTime::parse_from_rfc3339(confirmed_at_str) + { + let processing_seconds = (confirmed_time.with_timezone(&Utc) + - created_time.with_timezone(&Utc)) + .num_seconds() + as f64; + TRANSACTION_PROCESSING_TIME + .with_label_values(&[ + relayer_id, + &network_type, + "creation_to_confirmation", + ]) + .observe(processing_seconds); + } + } + } + } + TransactionStatus::Failed => { + let failure_reason = updated_tx + .status_reason + .as_deref() + .map(|reason| { + if reason.starts_with("Submission failed:") { + "submission_failed" + } else if reason.starts_with("Preparation failed:") { + "preparation_failed" + } else { + "failed" + } + }) + .unwrap_or("failed"); + TRANSACTIONS_FAILED + .with_label_values(&[relayer_id, &network_type, failure_reason]) + .inc(); + } + TransactionStatus::Expired => { + TRANSACTIONS_FAILED + .with_label_values(&[relayer_id, &network_type, "expired"]) + .inc(); + } + TransactionStatus::Canceled => { + TRANSACTIONS_FAILED + .with_label_values(&[relayer_id, &network_type, "canceled"]) + .inc(); + } + _ => {} + } + + // Track retry-related failure metrics for all non-success final states + if *new_status != TransactionStatus::Confirmed { + if had_insufficient_fee { + TRANSACTIONS_INSUFFICIENT_FEE_FAILED + .with_label_values(&[relayer_id, &network_type]) + .inc(); + } + if had_try_again_later { + TRANSACTIONS_TRY_AGAIN_LATER_FAILED + .with_label_values(&[relayer_id, &network_type]) + .inc(); + } + } + } + } } impl fmt::Debug for RedisTransactionRepository { @@ -1296,301 +1611,144 @@ impl TransactionRepository for RedisTransactionRepository { tx_id: String, update: TransactionUpdateRequest, ) -> Result { - const MAX_RETRIES: u32 = 3; - const BACKOFF_MS: u64 = 100; - - // Optimistic CAS: only apply update if the current stored value still matches the - // expected pre-update value. This avoids duplicate status metric updates on races. - let mut original_tx = self.get_by_id(tx_id.clone()).await?; - let mut updated_tx = original_tx.clone(); - updated_tx.apply_partial_update(update.clone()); - - let key = self.tx_key(&updated_tx.relayer_id, &tx_id); - let mut original_value = self.serialize_entity(&original_tx, |t| &t.id, "transaction")?; - let mut updated_value = self.serialize_entity(&updated_tx, |t| &t.id, "transaction")?; - let mut data_updated = false; - - let mut last_error = None; - - for attempt in 0..MAX_RETRIES { - let mut conn = match self - .get_connection(self.connections.primary(), "partial_update") - .await - { - Ok(conn) => conn, - Err(e) => { - last_error = Some(e); - if attempt < MAX_RETRIES - 1 { - tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await; - continue; - } - return Err(last_error.unwrap()); - } - }; + // Serialize only the non-None fields as a JSON patch. + let patch_json = serde_json::to_string(&update).map_err(|e| { + RepositoryError::InvalidData(format!("Failed to serialize update patch: {e}")) + })?; - if !data_updated { - let cas_script = Script::new( - r#" - local current = redis.call('GET', KEYS[1]) - if not current then - return -1 - end - if current == ARGV[1] then - redis.call('SET', KEYS[1], ARGV[2]) - return 1 - end - return 0 - "#, - ); + // If the update sets a final status, compute delete_at in Rust (depends on server config) + // and include it in the patch so the Lua script applies it atomically. + let delete_at_value = if let Some(ref status) = update.status { + if FINAL_TRANSACTION_STATUSES.contains(status) { + let expiration_hours = ServerConfig::get_transaction_expiration_hours(); + let seconds = (expiration_hours * 3600.0) as i64; + let delete_time = Utc::now() + chrono::Duration::seconds(seconds); + Some(delete_time.to_rfc3339()) + } else { + None + } + } else { + None + }; + let delete_at_arg = delete_at_value.as_deref().unwrap_or(""); + + let (lookup_key, key_prefix, key_suffix) = self.tx_key_parts(&tx_id); + + // Lua script: atomically applies a JSON patch to the stored transaction. + // Guards: rejects status changes on already-finalized transactions. + // Returns a two-element array {old_json, new_json} so Rust has the full + // pre-update state for index cleanup and metrics. + // Returns false if tx not found. + let patch_script = Script::new( + r#" + local relayer_id = redis.call('GET', KEYS[1]) + if not relayer_id then return false end + + local tx_key = ARGV[1] .. relayer_id .. ARGV[2] + local current = redis.call('GET', tx_key) + if not current then return false end + + local tx = cjson.decode(current) + local patch = cjson.decode(ARGV[3]) + + -- Guard: reject status changes on finalized transactions. + -- A stale worker must not resurrect a tx that another worker + -- already moved to a terminal state. + local final_states = {confirmed=true, failed=true, expired=true, canceled=true} + if final_states[tx["status"]] and patch["status"] then + return {current, current} + end + + local old_snapshot = current + + -- lua-cjson cannot distinguish empty Lua tables from empty + -- arrays, so a decode/encode round-trip turns [] into {}. + -- Record which keys held [] in the stored doc and the patch + -- so we can restore them after cjson.encode. + -- NOTE: this relies on each array-typed field having a unique key + -- name across the entire JSON document (including nested objects). + -- If the model ever introduces duplicate key names at different + -- nesting levels (e.g. metadata.hashes), the gsub below could + -- restore the wrong occurrence. + local empty_arrs = {} + for k in string.gmatch(current, '"([^"]+)"%s*:%s*%[%s*%]') do + empty_arrs[k] = true + end + for k in string.gmatch(ARGV[3], '"([^"]+)"%s*:%s*%[%s*%]') do + empty_arrs[k] = true + end + + for k, v in pairs(patch) do + tx[k] = v + end + + -- Apply delete_at if transitioning to a final state and not already set + if ARGV[4] ~= '' and (not tx["delete_at"] or tx["delete_at"] == cjson.null) then + tx["delete_at"] = ARGV[4] + end + + local updated = cjson.encode(tx) + + -- Restore empty arrays that cjson.encode converted to {} + for k, _ in pairs(empty_arrs) do + updated = string.gsub( + updated, '"'..k..'"%s*:%s*{}', '"'..k..'":[]', 1 + ) + end + + redis.call('SET', tx_key, updated) + return {old_snapshot, updated} + "#, + ); - let cas_result: i32 = match cas_script - .key(&key) - .arg(&original_value) - .arg(&updated_value) - .invoke_async(&mut conn) - .await - { - Ok(result) => result, - Err(e) => { - if attempt < MAX_RETRIES - 1 { - warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed CAS transaction update, retrying"); - last_error = Some(self.map_redis_error(e, "partial_update_cas")); - tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)) - .await; - continue; - } - return Err(self.map_redis_error(e, "partial_update_cas")); - } - }; + let result: Option> = self + .run_script_with_retry_vec( + &patch_script, + &lookup_key, + &key_prefix, + &key_suffix, + &[&patch_json, delete_at_arg], + "partial_update", + ) + .await?; - if cas_result == -1 { - return Err(RepositoryError::NotFound(format!( - "Transaction with ID {tx_id} not found" - ))); - } + let parts = result.ok_or_else(|| { + RepositoryError::NotFound(format!("Transaction with ID {tx_id} not found")) + })?; - if cas_result == 0 { - if attempt < MAX_RETRIES - 1 { - warn!(tx_id = %tx_id, attempt = %attempt, "concurrent transaction update detected, rebasing retry"); - original_tx = self.get_by_id(tx_id.clone()).await?; - updated_tx = original_tx.clone(); - updated_tx.apply_partial_update(update.clone()); - original_value = - self.serialize_entity(&original_tx, |t| &t.id, "transaction")?; - updated_value = - self.serialize_entity(&updated_tx, |t| &t.id, "transaction")?; - tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await; - continue; - } - return Err(RepositoryError::TransactionFailure(format!( - "Concurrent update conflict for transaction {tx_id}" - ))); - } + if parts.len() != 2 { + return Err(RepositoryError::UnexpectedError(format!( + "partial_update script returned {} elements, expected 2", + parts.len() + ))); + } - data_updated = true; - } + let old_json = &parts[0]; + let new_json = &parts[1]; - // Try to update indexes with the original pre-update state - // This ensures stale indexes are removed even on retry attempts - match self.update_indexes(&updated_tx, Some(&original_tx)).await { - Ok(_) => { - debug!(tx_id = %tx_id, attempt = %attempt, "successfully updated transaction"); - - // Track metrics for transaction state changes - if let Some(new_status) = &update.status { - let network_type = format!("{:?}", updated_tx.network_type).to_lowercase(); - let relayer_id = updated_tx.relayer_id.as_str(); - - // Track submission (when status changes to Submitted) - if original_tx.status != TransactionStatus::Submitted - && *new_status == TransactionStatus::Submitted - { - TRANSACTIONS_SUBMITTED - .with_label_values(&[relayer_id, &network_type]) - .inc(); - - // Track processing time: creation to submission - if let Ok(created_time) = - chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at) - { - let processing_seconds = - (Utc::now() - created_time.with_timezone(&Utc)).num_seconds() - as f64; - TRANSACTION_PROCESSING_TIME - .with_label_values(&[ - relayer_id, - &network_type, - "creation_to_submission", - ]) - .observe(processing_seconds); - } - } + let original_tx = + self.deserialize_entity::(old_json, &tx_id, "transaction")?; + let updated_tx = + self.deserialize_entity::(new_json, &tx_id, "transaction")?; - // Track status distribution (update gauge when status changes) - if original_tx.status != *new_status { - // Decrement old status and clamp to zero to avoid negative gauges. - let old_status = &original_tx.status; - let old_status_str = format!("{old_status:?}").to_lowercase(); - let old_status_gauge = TRANSACTIONS_BY_STATUS.with_label_values(&[ - relayer_id, - &network_type, - &old_status_str, - ]); - let clamped_value = (old_status_gauge.get() - 1.0).max(0.0); - old_status_gauge.set(clamped_value); - - // Increment new status - let new_status_str = format!("{new_status:?}").to_lowercase(); - TRANSACTIONS_BY_STATUS - .with_label_values(&[relayer_id, &network_type, &new_status_str]) - .inc(); - } + // Update indexes using the full pre-update state (status, network_data, nonce, etc.) + self.update_indexes(&updated_tx, Some(&original_tx)).await?; - // Track metrics for final transaction states - // Only track when status changes from non-final to final state - let was_final = is_final_state(&original_tx.status); - let is_final = is_final_state(new_status); - - if !was_final && is_final { - let meta = updated_tx.metadata.as_ref(); - let had_insufficient_fee = - meta.is_some_and(|m| m.insufficient_fee_retries > 0); - let had_try_again_later = - meta.is_some_and(|m| m.try_again_later_retries > 0); - - match new_status { - TransactionStatus::Confirmed => { - TRANSACTIONS_SUCCESS - .with_label_values(&[relayer_id, &network_type]) - .inc(); - if had_insufficient_fee { - TRANSACTIONS_INSUFFICIENT_FEE_SUCCESS - .with_label_values(&[relayer_id, &network_type]) - .inc(); - } - if had_try_again_later { - TRANSACTIONS_TRY_AGAIN_LATER_SUCCESS - .with_label_values(&[relayer_id, &network_type]) - .inc(); - } - - // Track processing time: submission to confirmation - if let (Some(sent_at_str), Some(confirmed_at_str)) = - (&updated_tx.sent_at, &updated_tx.confirmed_at) - { - if let (Ok(sent_time), Ok(confirmed_time)) = ( - chrono::DateTime::parse_from_rfc3339(sent_at_str), - chrono::DateTime::parse_from_rfc3339(confirmed_at_str), - ) { - let processing_seconds = (confirmed_time - .with_timezone(&Utc) - - sent_time.with_timezone(&Utc)) - .num_seconds() - as f64; - TRANSACTION_PROCESSING_TIME - .with_label_values(&[ - relayer_id, - &network_type, - "submission_to_confirmation", - ]) - .observe(processing_seconds); - } - } - - // Track processing time: creation to confirmation - if let Ok(created_time) = - chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at) - { - if let Some(confirmed_at_str) = &updated_tx.confirmed_at { - if let Ok(confirmed_time) = - chrono::DateTime::parse_from_rfc3339( - confirmed_at_str, - ) - { - let processing_seconds = (confirmed_time - .with_timezone(&Utc) - - created_time.with_timezone(&Utc)) - .num_seconds() - as f64; - TRANSACTION_PROCESSING_TIME - .with_label_values(&[ - relayer_id, - &network_type, - "creation_to_confirmation", - ]) - .observe(processing_seconds); - } - } - } - } - TransactionStatus::Failed => { - // Parse status_reason to determine failure type - let failure_reason = updated_tx - .status_reason - .as_deref() - .map(|reason| { - if reason.starts_with("Submission failed:") { - "submission_failed" - } else if reason.starts_with("Preparation failed:") { - "preparation_failed" - } else { - "failed" - } - }) - .unwrap_or("failed"); - TRANSACTIONS_FAILED - .with_label_values(&[ - relayer_id, - &network_type, - failure_reason, - ]) - .inc(); - } - TransactionStatus::Expired => { - TRANSACTIONS_FAILED - .with_label_values(&[relayer_id, &network_type, "expired"]) - .inc(); - } - TransactionStatus::Canceled => { - TRANSACTIONS_FAILED - .with_label_values(&[relayer_id, &network_type, "canceled"]) - .inc(); - } - _ => { - // Other final states (shouldn't happen, but handle gracefully) - } - } + debug!(tx_id = %tx_id, "successfully updated transaction via patch"); - // Track retry-related failure metrics for all non-success final states - if *new_status != TransactionStatus::Confirmed { - if had_insufficient_fee { - TRANSACTIONS_INSUFFICIENT_FEE_FAILED - .with_label_values(&[relayer_id, &network_type]) - .inc(); - } - if had_try_again_later { - TRANSACTIONS_TRY_AGAIN_LATER_FAILED - .with_label_values(&[relayer_id, &network_type]) - .inc(); - } - } - } - } - return Ok(updated_tx); - } - Err(e) if attempt < MAX_RETRIES - 1 => { - warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed to update indexes, retrying"); - last_error = Some(e); - tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await; - continue; - } - Err(e) => return Err(e), - } + // Track metrics only when the persisted status actually changed. + // The Lua script may silently reject a status patch on already-final + // transactions, so we compare the deserialized before/after states. + if original_tx.status != updated_tx.status { + self.track_status_change_metrics( + &original_tx, + &updated_tx, + &original_tx.status, + &updated_tx.status, + ); } - Err(last_error.unwrap_or_else(|| { - RepositoryError::UnexpectedError("partial_update exhausted retries".to_string()) - })) + Ok(updated_tx) } async fn update_network_data( @@ -1617,6 +1775,191 @@ impl TransactionRepository for RedisTransactionRepository { self.partial_update(tx_id, update).await } + async fn increment_status_check_failures( + &self, + tx_id: String, + ) -> Result { + self.run_atomic_script( + r#" + local function set_obj(json, key, tbl) + local enc = cjson.encode(tbl) + local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1) + if n > 0 then return r end + r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1) + if n > 0 then return r end + return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1) + end + + local relayer_id = redis.call('GET', KEYS[1]) + if not relayer_id then return false end + + local tx_key = ARGV[1] .. relayer_id .. ARGV[2] + local current = redis.call('GET', tx_key) + if not current then return false end + + local tx = cjson.decode(current) + local final_states = {confirmed=true, failed=true, expired=true, canceled=true} + if final_states[tx["status"]] then return current end + + local metadata = tx["metadata"] + if type(metadata) ~= 'table' then metadata = {} end + metadata["consecutive_failures"] = (metadata["consecutive_failures"] or 0) + 1 + metadata["total_failures"] = (metadata["total_failures"] or 0) + 1 + + local updated = set_obj(current, "metadata", metadata) + redis.call('SET', tx_key, updated) + return updated + "#, + &tx_id, + &[], + "increment_status_check_failures", + ) + .await + } + + async fn reset_status_check_consecutive_failures( + &self, + tx_id: String, + ) -> Result { + self.run_atomic_script( + r#" + local function set_obj(json, key, tbl) + local enc = cjson.encode(tbl) + local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1) + if n > 0 then return r end + r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1) + if n > 0 then return r end + return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1) + end + + local relayer_id = redis.call('GET', KEYS[1]) + if not relayer_id then return false end + + local tx_key = ARGV[1] .. relayer_id .. ARGV[2] + local current = redis.call('GET', tx_key) + if not current then return false end + + local tx = cjson.decode(current) + local final_states = {confirmed=true, failed=true, expired=true, canceled=true} + if final_states[tx["status"]] then return current end + + local metadata = tx["metadata"] + if type(metadata) ~= 'table' then metadata = {} end + metadata["consecutive_failures"] = 0 + + local updated = set_obj(current, "metadata", metadata) + redis.call('SET', tx_key, updated) + return updated + "#, + &tx_id, + &[], + "reset_status_check_consecutive_failures", + ) + .await + } + + async fn record_stellar_insufficient_fee_retry( + &self, + tx_id: String, + sent_at: String, + ) -> Result { + self.run_atomic_script( + r#" + local function set_str(json, key, val) + local enc = cjson.encode(val) + local r, n = string.gsub(json, '"'..key..'"%s*:%s*"[^"]*"', '"'..key..'":'..enc, 1) + if n > 0 then return r end + r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1) + if n > 0 then return r end + return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1) + end + local function set_obj(json, key, tbl) + local enc = cjson.encode(tbl) + local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1) + if n > 0 then return r end + r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1) + if n > 0 then return r end + return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1) + end + + local relayer_id = redis.call('GET', KEYS[1]) + if not relayer_id then return false end + + local tx_key = ARGV[1] .. relayer_id .. ARGV[2] + local current = redis.call('GET', tx_key) + if not current then return false end + + local tx = cjson.decode(current) + local final_states = {confirmed=true, failed=true, expired=true, canceled=true} + if final_states[tx["status"]] then return current end + + local metadata = tx["metadata"] + if type(metadata) ~= 'table' then metadata = {} end + metadata["insufficient_fee_retries"] = (metadata["insufficient_fee_retries"] or 0) + 1 + + local updated = set_str(current, "sent_at", ARGV[3]) + updated = set_obj(updated, "metadata", metadata) + redis.call('SET', tx_key, updated) + return updated + "#, + &tx_id, + &[&sent_at], + "record_stellar_insufficient_fee_retry", + ) + .await + } + + async fn record_stellar_try_again_later_retry( + &self, + tx_id: String, + sent_at: String, + ) -> Result { + self.run_atomic_script( + r#" + local function set_str(json, key, val) + local enc = cjson.encode(val) + local r, n = string.gsub(json, '"'..key..'"%s*:%s*"[^"]*"', '"'..key..'":'..enc, 1) + if n > 0 then return r end + r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1) + if n > 0 then return r end + return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1) + end + local function set_obj(json, key, tbl) + local enc = cjson.encode(tbl) + local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1) + if n > 0 then return r end + r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1) + if n > 0 then return r end + return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1) + end + + local relayer_id = redis.call('GET', KEYS[1]) + if not relayer_id then return false end + + local tx_key = ARGV[1] .. relayer_id .. ARGV[2] + local current = redis.call('GET', tx_key) + if not current then return false end + + local tx = cjson.decode(current) + local final_states = {confirmed=true, failed=true, expired=true, canceled=true} + if final_states[tx["status"]] then return current end + + local metadata = tx["metadata"] + if type(metadata) ~= 'table' then metadata = {} end + metadata["try_again_later_retries"] = (metadata["try_again_later_retries"] or 0) + 1 + + local updated = set_str(current, "sent_at", ARGV[3]) + updated = set_obj(updated, "metadata", metadata) + redis.call('SET', tx_key, updated) + return updated + "#, + &tx_id, + &[&sent_at], + "record_stellar_try_again_later_retry", + ) + .await + } + async fn set_confirmed_at( &self, tx_id: String, @@ -2219,106 +2562,6 @@ mod tests { ); } - #[tokio::test] - #[ignore = "Requires active Redis instance"] - async fn test_find_by_relayer_id_migration_from_old_index() { - let repo = setup_test_repo().await; - let relayer_id = Uuid::new_v4().to_string(); - - // Create transactions with different created_at timestamps - let mut tx1 = create_test_transaction_with_relayer("migrate-test-1", &relayer_id); - tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); // Oldest - - let mut tx2 = create_test_transaction_with_relayer("migrate-test-2", &relayer_id); - tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); // Middle - - let mut tx3 = create_test_transaction_with_relayer("migrate-test-3", &relayer_id); - tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); // Newest - - // Create transactions directly in Redis WITHOUT adding to sorted set - // This simulates old transactions created before the sorted set index existed - let mut conn = repo.connections.primary().get().await.unwrap(); - let relayer_list_key = repo.relayer_list_key(); - let _: () = conn.sadd(&relayer_list_key, &relayer_id).await.unwrap(); - - for tx in &[&tx1, &tx2, &tx3] { - let key = repo.tx_key(&tx.relayer_id, &tx.id); - let reverse_key = repo.tx_to_relayer_key(&tx.id); - let value = repo.serialize_entity(tx, |t| &t.id, "transaction").unwrap(); - - let mut pipe = redis::pipe(); - pipe.atomic(); - pipe.set(&key, &value); - pipe.set(&reverse_key, &tx.relayer_id); - - // Add to status index (but NOT to sorted set) - let status_key = repo.relayer_status_key(&tx.relayer_id, &tx.status); - pipe.sadd(&status_key, &tx.id); - - pipe.exec_async(&mut conn).await.unwrap(); - } - - // Verify sorted set is empty (transactions were created without sorted set index) - let relayer_sorted_key = repo.relayer_tx_by_created_at_key(&relayer_id); - let count: u64 = conn.zcard(&relayer_sorted_key).await.unwrap(); - assert_eq!(count, 0, "Sorted set should be empty for old transactions"); - - // Call find_by_relayer_id - this should trigger migration - let query = PaginationQuery { - page: 1, - per_page: 10, - }; - let result = repo - .find_by_relayer_id(&relayer_id, query.clone()) - .await - .unwrap(); - - // Verify migration happened - sorted set should now have entries - let count_after: u64 = conn.zcard(&relayer_sorted_key).await.unwrap(); - assert_eq!( - count_after, 3, - "Sorted set should be populated after migration" - ); - - // Verify results are correct and sorted (newest first) - assert_eq!(result.total, 3); - assert_eq!(result.items.len(), 3); - - assert_eq!( - result.items[0].id, "migrate-test-3", - "First item should be newest after migration" - ); - assert_eq!( - result.items[0].created_at, - "2025-01-27T14:00:00.000000+00:00" - ); - - assert_eq!( - result.items[1].id, "migrate-test-2", - "Second item should be middle after migration" - ); - assert_eq!( - result.items[1].created_at, - "2025-01-27T12:00:00.000000+00:00" - ); - - assert_eq!( - result.items[2].id, "migrate-test-1", - "Third item should be oldest after migration" - ); - assert_eq!( - result.items[2].created_at, - "2025-01-27T10:00:00.000000+00:00" - ); - - // Verify second call uses sorted set (no migration needed) - let result2 = repo.find_by_relayer_id(&relayer_id, query).await.unwrap(); - assert_eq!(result2.total, 3); - assert_eq!(result2.items.len(), 3); - // Results should be identical since sorted set is now populated - assert_eq!(result.items[0].id, result2.items[0].id); - } - #[tokio::test] #[ignore = "Requires active Redis instance"] async fn test_find_by_status() { @@ -3380,4 +3623,478 @@ mod tests { let remaining = repo.get_by_id(tx_id_2).await.unwrap(); assert_eq!(remaining.relayer_id, relayer_2); } + + // ── increment_status_check_failures ───────────────────────────── + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_increment_status_check_failures_no_prior_metadata() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + let relayer_id = Uuid::new_v4().to_string(); + let tx_id = Uuid::new_v4().to_string(); + let mut tx = + create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent); + tx.metadata = None; + repo.create(tx).await.unwrap(); + + let updated = repo.increment_status_check_failures(tx_id).await.unwrap(); + + let meta = updated.metadata.expect("metadata should be set"); + assert_eq!(meta.consecutive_failures, 1); + assert_eq!(meta.total_failures, 1); + assert_eq!(meta.insufficient_fee_retries, 0); + } + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_increment_status_check_failures_accumulates() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + let relayer_id = Uuid::new_v4().to_string(); + let tx_id = Uuid::new_v4().to_string(); + let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent); + repo.create(tx).await.unwrap(); + + repo.increment_status_check_failures(tx_id.clone()) + .await + .unwrap(); + repo.increment_status_check_failures(tx_id.clone()) + .await + .unwrap(); + let updated = repo.increment_status_check_failures(tx_id).await.unwrap(); + + let meta = updated.metadata.unwrap(); + assert_eq!(meta.consecutive_failures, 3); + assert_eq!(meta.total_failures, 3); + } + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_increment_status_check_failures_noop_on_final_state() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + let relayer_id = Uuid::new_v4().to_string(); + let tx_id = Uuid::new_v4().to_string(); + let tx = + create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed); + repo.create(tx).await.unwrap(); + + let result = repo.increment_status_check_failures(tx_id).await.unwrap(); + + // Should return unchanged — no metadata mutation on final state + assert!(result.metadata.is_none()); + assert_eq!(result.status, TransactionStatus::Confirmed); + } + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_increment_status_check_failures_not_found() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + + let result = repo + .increment_status_check_failures("nonexistent".to_string()) + .await; + + assert!(matches!(result, Err(RepositoryError::NotFound(_)))); + } + + // ── reset_status_check_consecutive_failures ───────────────────── + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_reset_consecutive_failures() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + let relayer_id = Uuid::new_v4().to_string(); + let tx_id = Uuid::new_v4().to_string(); + let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent); + repo.create(tx).await.unwrap(); + + // Increment a few times first + repo.increment_status_check_failures(tx_id.clone()) + .await + .unwrap(); + repo.increment_status_check_failures(tx_id.clone()) + .await + .unwrap(); + + let updated = repo + .reset_status_check_consecutive_failures(tx_id) + .await + .unwrap(); + + let meta = updated.metadata.unwrap(); + assert_eq!(meta.consecutive_failures, 0); + // total_failures should be preserved + assert_eq!(meta.total_failures, 2); + } + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_reset_consecutive_failures_noop_on_final_state() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + let relayer_id = Uuid::new_v4().to_string(); + let tx_id = Uuid::new_v4().to_string(); + let mut tx = + create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Failed); + tx.metadata = Some(crate::models::TransactionMetadata { + consecutive_failures: 5, + total_failures: 10, + insufficient_fee_retries: 0, + try_again_later_retries: 0, + }); + repo.create(tx).await.unwrap(); + + let result = repo + .reset_status_check_consecutive_failures(tx_id) + .await + .unwrap(); + + // Should return unchanged on final state + let meta = result.metadata.unwrap(); + assert_eq!(meta.consecutive_failures, 5); + } + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_reset_consecutive_failures_not_found() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + + let result = repo + .reset_status_check_consecutive_failures("nonexistent".to_string()) + .await; + + assert!(matches!(result, Err(RepositoryError::NotFound(_)))); + } + + // ── record_stellar_insufficient_fee_retry ─────────────────────── + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_record_insufficient_fee_retry() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + let relayer_id = Uuid::new_v4().to_string(); + let tx_id = Uuid::new_v4().to_string(); + let mut tx = + create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent); + tx.sent_at = None; + repo.create(tx).await.unwrap(); + + let updated = repo + .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:00:00Z".to_string()) + .await + .unwrap(); + + assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z")); + let meta = updated.metadata.unwrap(); + assert_eq!(meta.insufficient_fee_retries, 1); + assert_eq!(meta.consecutive_failures, 0); + assert_eq!(meta.total_failures, 0); + } + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_record_insufficient_fee_retry_accumulates() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + let relayer_id = Uuid::new_v4().to_string(); + let tx_id = Uuid::new_v4().to_string(); + let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent); + repo.create(tx).await.unwrap(); + + repo.record_stellar_insufficient_fee_retry( + tx_id.clone(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await + .unwrap(); + + let updated = repo + .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:01:00Z".to_string()) + .await + .unwrap(); + + assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z")); + let meta = updated.metadata.unwrap(); + assert_eq!(meta.insufficient_fee_retries, 2); + } + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_record_insufficient_fee_retry_noop_on_final_state() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + let relayer_id = Uuid::new_v4().to_string(); + let tx_id = Uuid::new_v4().to_string(); + let mut tx = + create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed); + tx.sent_at = Some("old-time".to_string()); + repo.create(tx).await.unwrap(); + + let result = repo + .record_stellar_insufficient_fee_retry(tx_id, "new-time".to_string()) + .await + .unwrap(); + + // Should return unchanged on final state + assert_eq!(result.sent_at.as_deref(), Some("old-time")); + assert!(result.metadata.is_none()); + } + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_record_insufficient_fee_retry_not_found() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + + let result = repo + .record_stellar_insufficient_fee_retry( + "nonexistent".to_string(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await; + + assert!(matches!(result, Err(RepositoryError::NotFound(_)))); + } + + // ── record_stellar_try_again_later_retry ──────────────────────── + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_record_try_again_later_retry() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + let relayer_id = Uuid::new_v4().to_string(); + let tx_id = Uuid::new_v4().to_string(); + let mut tx = + create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent); + tx.sent_at = None; + repo.create(tx).await.unwrap(); + + let updated = repo + .record_stellar_try_again_later_retry(tx_id, "2025-03-18T10:00:00Z".to_string()) + .await + .unwrap(); + + assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z")); + let meta = updated.metadata.unwrap(); + assert_eq!(meta.try_again_later_retries, 1); + assert_eq!(meta.consecutive_failures, 0); + assert_eq!(meta.total_failures, 0); + } + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_record_try_again_later_retry_accumulates() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + let relayer_id = Uuid::new_v4().to_string(); + let tx_id = Uuid::new_v4().to_string(); + let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent); + repo.create(tx).await.unwrap(); + + repo.record_stellar_try_again_later_retry( + tx_id.clone(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await + .unwrap(); + + let updated = repo + .record_stellar_try_again_later_retry(tx_id, "2025-03-18T10:01:00Z".to_string()) + .await + .unwrap(); + + assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z")); + let meta = updated.metadata.unwrap(); + assert_eq!(meta.try_again_later_retries, 2); + } + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_record_try_again_later_retry_noop_on_final_state() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + let relayer_id = Uuid::new_v4().to_string(); + let tx_id = Uuid::new_v4().to_string(); + let mut tx = + create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed); + tx.sent_at = Some("old-time".to_string()); + repo.create(tx).await.unwrap(); + + let result = repo + .record_stellar_try_again_later_retry(tx_id, "new-time".to_string()) + .await + .unwrap(); + + // Should return unchanged on final state + assert_eq!(result.sent_at.as_deref(), Some("old-time")); + assert!(result.metadata.is_none()); + } + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_record_try_again_later_retry_not_found() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + + let result = repo + .record_stellar_try_again_later_retry( + "nonexistent".to_string(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await; + + assert!(matches!(result, Err(RepositoryError::NotFound(_)))); + } + + // ── metadata preservation across operations ───────────────────── + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_increment_failures_preserves_try_again_later_retries() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + let relayer_id = Uuid::new_v4().to_string(); + let tx_id = Uuid::new_v4().to_string(); + let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent); + repo.create(tx).await.unwrap(); + + // Set try_again_later_retries = 1 + repo.record_stellar_try_again_later_retry( + tx_id.clone(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await + .unwrap(); + + // Now increment failures — should NOT clobber try_again_later_retries + let updated = repo.increment_status_check_failures(tx_id).await.unwrap(); + + let meta = updated.metadata.unwrap(); + assert_eq!( + meta.try_again_later_retries, 1, + "try_again_later_retries must survive increment_status_check_failures" + ); + assert_eq!(meta.consecutive_failures, 1); + assert_eq!(meta.total_failures, 1); + } + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_increment_failures_preserves_insufficient_fee_retries() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + let relayer_id = Uuid::new_v4().to_string(); + let tx_id = Uuid::new_v4().to_string(); + let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent); + repo.create(tx).await.unwrap(); + + // Set insufficient_fee_retries = 1 + repo.record_stellar_insufficient_fee_retry( + tx_id.clone(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await + .unwrap(); + + // Now increment failures — should NOT clobber insufficient_fee_retries + let updated = repo.increment_status_check_failures(tx_id).await.unwrap(); + + let meta = updated.metadata.unwrap(); + assert_eq!( + meta.insufficient_fee_retries, 1, + "insufficient_fee_retries must survive increment_status_check_failures" + ); + assert_eq!(meta.consecutive_failures, 1); + } + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_reset_failures_preserves_retry_counters() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + let relayer_id = Uuid::new_v4().to_string(); + let tx_id = Uuid::new_v4().to_string(); + let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent); + repo.create(tx).await.unwrap(); + + // Set both retry counters + repo.record_stellar_try_again_later_retry( + tx_id.clone(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await + .unwrap(); + repo.record_stellar_insufficient_fee_retry( + tx_id.clone(), + "2025-03-18T10:01:00Z".to_string(), + ) + .await + .unwrap(); + + // Increment then reset consecutive failures + repo.increment_status_check_failures(tx_id.clone()) + .await + .unwrap(); + let updated = repo + .reset_status_check_consecutive_failures(tx_id) + .await + .unwrap(); + + let meta = updated.metadata.unwrap(); + assert_eq!(meta.consecutive_failures, 0); + assert_eq!(meta.total_failures, 1); + assert_eq!( + meta.try_again_later_retries, 1, + "try_again_later_retries must survive reset" + ); + assert_eq!( + meta.insufficient_fee_retries, 1, + "insufficient_fee_retries must survive reset" + ); + } + + #[tokio::test] + #[ignore = "Requires active Redis instance"] + async fn test_fee_and_try_again_later_retries_independent() { + let _lock = ENV_MUTEX.lock().await; + let repo = setup_test_repo().await; + let relayer_id = Uuid::new_v4().to_string(); + let tx_id = Uuid::new_v4().to_string(); + let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent); + repo.create(tx).await.unwrap(); + + // Set try_again_later_retries = 2 + repo.record_stellar_try_again_later_retry( + tx_id.clone(), + "2025-03-18T10:00:00Z".to_string(), + ) + .await + .unwrap(); + repo.record_stellar_try_again_later_retry( + tx_id.clone(), + "2025-03-18T10:01:00Z".to_string(), + ) + .await + .unwrap(); + + // Set insufficient_fee_retries = 1 — should NOT clobber try_again_later_retries + let updated = repo + .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:02:00Z".to_string()) + .await + .unwrap(); + + let meta = updated.metadata.unwrap(); + assert_eq!( + meta.try_again_later_retries, 2, + "try_again_later_retries must survive insufficient_fee_retry" + ); + assert_eq!(meta.insufficient_fee_retries, 1); + } }