diff --git a/plugins/examples/channels/index.ts b/plugins/examples/channels/index.ts
new file mode 100644
index 000000000..8e483a238
--- /dev/null
+++ b/plugins/examples/channels/index.ts
@@ -0,0 +1 @@
+export { handler } from '@openzeppelin/relayer-plugin-channels';
diff --git a/src/domain/transaction/stellar/status.rs b/src/domain/transaction/stellar/status.rs
index 46ad22781..d6d1379b5 100644
--- a/src/domain/transaction/stellar/status.rs
+++ b/src/domain/transaction/stellar/status.rs
@@ -112,6 +112,23 @@ where
"status check encountered error"
);
+ // CAS conflict means another writer already mutated this tx.
+ // Reload the latest state and return Ok so the status handler
+ // sees a non-final status and schedules the next poll cycle via
+ // HandlerError::Retry — no work is lost, just deferred.
+ if error.is_concurrent_update_conflict() {
+ info!(
+ tx_id = %tx.id,
+ relayer_id = %tx.relayer_id,
+ "concurrent transaction update detected during status handling, reloading latest state"
+ );
+ return self
+ .transaction_repository()
+ .get_by_id(tx.id.clone())
+ .await
+ .map_err(TransactionError::from);
+ }
+
// Handle different error types appropriately
match error {
TransactionError::ValidationError(ref msg) => {
@@ -1746,6 +1763,80 @@ mod tests {
.unwrap()
.contains("stuck in Sent status for too long"));
}
+ #[tokio::test]
+ async fn handle_status_concurrent_update_conflict_reloads_latest_state() {
+ // When status_core returns ConcurrentUpdateConflict, the handler
+ // should reload the latest state via get_by_id and return Ok.
+ let relayer = create_test_relayer();
+ let mut mocks = default_test_mocks();
+
+ let mut tx = create_test_transaction(&relayer.id);
+ tx.id = "tx-cas-conflict".to_string();
+ tx.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
+ let tx_hash_bytes = [11u8; 32];
+ if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
+ stellar_data.hash = Some(hex::encode(tx_hash_bytes));
+ }
+ tx.status = TransactionStatus::Submitted;
+
+ let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
+
+ // Provider returns SUCCESS — triggers a partial_update for confirmation
+ mocks
+ .provider
+ .expect_get_transaction()
+ .with(eq(expected_stellar_hash))
+ .times(1)
+ .returning(move |_| {
+ Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) })
+ });
+
+ // partial_update fails with ConcurrentUpdateConflict
+ mocks
+ .tx_repo
+ .expect_partial_update()
+ .times(1)
+ .returning(|_id, _update| {
+ Err(RepositoryError::ConcurrentUpdateConflict(
+ "CAS mismatch".to_string(),
+ ))
+ });
+
+ // After conflict, handler reloads via get_by_id
+ let reloaded_tx = {
+ let mut t = create_test_transaction(&relayer.id);
+ t.id = "tx-cas-conflict".to_string();
+ // Simulate another writer already confirmed it
+ t.status = TransactionStatus::Confirmed;
+ t
+ };
+ let reloaded_clone = reloaded_tx.clone();
+ mocks
+ .tx_repo
+ .expect_get_by_id()
+ .with(eq("tx-cas-conflict".to_string()))
+ .times(1)
+ .returning(move |_| Ok(reloaded_clone.clone()));
+
+ // No notifications or job enqueuing should happen on CAS path
+ mocks
+ .job_producer
+ .expect_produce_send_notification_job()
+ .never();
+ mocks
+ .job_producer
+ .expect_produce_transaction_request_job()
+ .never();
+
+ let handler = make_stellar_tx_handler(relayer.clone(), mocks);
+ let result = handler.handle_transaction_status_impl(tx, None).await;
+
+ assert!(result.is_ok(), "CAS conflict should return Ok after reload");
+ let returned_tx = result.unwrap();
+ assert_eq!(returned_tx.id, "tx-cas-conflict");
+ // The reloaded tx reflects what the other writer persisted
+ assert_eq!(returned_tx.status, TransactionStatus::Confirmed);
+ }
}
mod handle_pending_state_tests {
diff --git a/src/domain/transaction/stellar/submit.rs b/src/domain/transaction/stellar/submit.rs
index 88af382ca..630f8e400 100644
--- a/src/domain/transaction/stellar/submit.rs
+++ b/src/domain/transaction/stellar/submit.rs
@@ -125,7 +125,6 @@ where
"transaction already submitted (DUPLICATE status)"
);
}
-
let tx_hash_hex = response.hash.clone();
let updated_stellar_data = stellar_data.with_hash(tx_hash_hex.clone());
@@ -161,16 +160,24 @@ where
}
"TRY_AGAIN_LATER" => {
// Network is temporarily congested — the transaction is valid but the
- // node's queue is full. Update sent_at so the status checker's backoff
- // gate measures time since this attempt, then return Ok to keep the
- // transaction alive. The status checker will handle retries:
+ // node's queue is full. Atomically update sent_at and increment
+ // try_again_later_retries so the status checker's backoff gate measures
+ // time since this attempt. Return Ok to keep the transaction alive.
+ // The status checker will handle retries:
// - Submitted txs: resubmitted with exponential backoff
// - Sent txs: re-enqueued via handle_sent_state
- let mut meta = tx.metadata.clone().unwrap_or_default();
- meta.try_again_later_retries = meta.try_again_later_retries.saturating_add(1);
+ let updated_tx = self
+ .transaction_repository()
+ .record_stellar_try_again_later_retry(tx.id.clone(), Utc::now().to_rfc3339())
+ .await?;
+
+ let retries = updated_tx
+ .metadata
+ .as_ref()
+ .map_or(0, |m| m.try_again_later_retries);
// Only push on first encounter (dedup: won't fire on retry 2, 3, etc.)
- if meta.try_again_later_retries == 1 {
+ if retries == 1 {
crate::metrics::STELLAR_TRY_AGAIN_LATER
.with_label_values(&[&tx.relayer_id, &tx.status.to_string()])
.inc();
@@ -180,18 +187,9 @@ where
tx_id = %tx.id,
relayer_id = %tx.relayer_id,
status = ?tx.status,
- try_again_later_retries = meta.try_again_later_retries,
+ try_again_later_retries = retries,
"TRY_AGAIN_LATER — status checker will retry"
);
- let update_req = TransactionUpdateRequest {
- sent_at: Some(Utc::now().to_rfc3339()),
- metadata: Some(meta),
- ..Default::default()
- };
- let updated_tx = self
- .transaction_repository()
- .partial_update(tx.id.clone(), update_req)
- .await?;
Ok(updated_tx)
}
"ERROR" => {
@@ -235,14 +233,13 @@ where
result_code = decoded_result_code.as_deref().unwrap_or("Unknown"),
"ERROR with insufficient fee — status checker will retry"
);
- let update_req = TransactionUpdateRequest {
- sent_at: Some(Utc::now().to_rfc3339()),
- metadata: Some(meta),
- ..Default::default()
- };
+ // Atomically sets `sent_at` and increments Stellar insufficient-fee retries.
let updated_tx = self
.transaction_repository()
- .partial_update(tx.id.clone(), update_req)
+ .record_stellar_insufficient_fee_retry(
+ tx.id.clone(),
+ Utc::now().to_rfc3339(),
+ )
.await?;
return Ok(updated_tx);
}
@@ -292,6 +289,23 @@ where
"transaction submission failed"
);
+ // CAS conflict in the submission path only occurs after the RPC
+ // already accepted the transaction (PENDING status update raced).
+ // The on-chain state is valid; reload the latest DB state and return
+ // Ok — the status checker will reconcile on its next poll.
+ if error.is_concurrent_update_conflict() {
+ info!(
+ tx_id = %tx_id,
+ relayer_id = %relayer_id,
+ "concurrent transaction update detected during submission, reloading latest state"
+ );
+ return self
+ .transaction_repository()
+ .get_by_id(tx_id)
+ .await
+ .map_err(TransactionError::from);
+ }
+
if is_bad_sequence_error(&error_reason) {
// For bad sequence errors, sync sequence from chain first
if let Ok(stellar_data) = tx.network_data.get_stellar_transaction_data() {
@@ -974,22 +988,20 @@ mod tests {
Box::pin(async move { Ok(r) })
});
- // partial_update is called to refresh sent_at and persist metadata — status should NOT change
mocks
.tx_repo
- .expect_partial_update()
- .withf(|_, upd| {
- upd.sent_at.is_some()
- && upd.status.is_none()
- && upd
- .metadata
- .as_ref()
- .is_some_and(|m| m.try_again_later_retries == 1)
- })
- .returning(|id, _upd| {
+ .expect_record_stellar_try_again_later_retry()
+ .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty())
+ .returning(|id, _| {
let mut tx = create_test_transaction("relayer-1");
tx.id = id;
tx.status = TransactionStatus::Sent;
+ tx.metadata = Some(TransactionMetadata {
+ consecutive_failures: 0,
+ total_failures: 0,
+ insufficient_fee_retries: 0,
+ try_again_later_retries: 1,
+ });
Ok::<_, RepositoryError>(tx)
});
@@ -1028,14 +1040,20 @@ mod tests {
});
submit_mocks
.tx_repo
- .expect_partial_update()
- .withf(|_, upd| upd.sent_at.is_some() && upd.status.is_none())
+ .expect_record_stellar_try_again_later_retry()
+ .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty())
.times(1)
- .returning(|id, upd| {
+ .returning(|id, sent_at| {
let mut tx = create_test_transaction("relayer-1");
tx.id = id;
tx.status = TransactionStatus::Sent;
- tx.sent_at = upd.sent_at.clone();
+ tx.sent_at = Some(sent_at);
+ tx.metadata = Some(TransactionMetadata {
+ consecutive_failures: 0,
+ total_failures: 0,
+ insufficient_fee_retries: 0,
+ try_again_later_retries: 1,
+ });
Ok::<_, RepositoryError>(tx)
});
@@ -1102,18 +1120,20 @@ mod tests {
Box::pin(async move { Ok(r) })
});
- // partial_update is called to refresh sent_at so the status checker's
- // backoff gate measures time since this attempt, not the original submission.
mocks
.tx_repo
- .expect_partial_update()
- .withf(|_, upd| {
- upd.sent_at.is_some() && upd.status.is_none() // status should not change
- })
- .returning(|id, _upd| {
+ .expect_record_stellar_try_again_later_retry()
+ .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty())
+ .returning(|id, _| {
let mut tx = create_test_transaction("relayer-1");
tx.id = id;
tx.status = TransactionStatus::Submitted;
+ tx.metadata = Some(TransactionMetadata {
+ consecutive_failures: 0,
+ total_failures: 0,
+ insufficient_fee_retries: 0,
+ try_again_later_retries: 1,
+ });
Ok::<_, RepositoryError>(tx)
});
@@ -1217,25 +1237,24 @@ mod tests {
Box::pin(async move { Ok(r) })
});
- // partial_update is called to refresh sent_at — status should NOT change
+ // insufficient-fee retry updates sent_at and retry metadata atomically
mocks
.tx_repo
- .expect_partial_update()
- .withf(|_, upd| {
- upd.sent_at.is_some()
- && upd.status.is_none()
- && upd
- .metadata
- .as_ref()
- .is_some_and(|metadata| metadata.insufficient_fee_retries == 1)
- })
- .returning(|id, upd| {
+ .expect_record_stellar_insufficient_fee_retry()
+ .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty())
+ .returning(|id, _| {
let mut tx = create_test_transaction("relayer-1");
tx.id = id;
tx.status = TransactionStatus::Sent;
- tx.metadata = upd.metadata;
+ tx.metadata = Some(TransactionMetadata {
+ consecutive_failures: 0,
+ total_failures: 0,
+ insufficient_fee_retries: 1,
+ try_again_later_retries: 0,
+ });
Ok::<_, RepositoryError>(tx)
- });
+ })
+ .times(1);
let handler = make_stellar_tx_handler(relayer.clone(), mocks);
let mut tx = create_test_transaction(&relayer.id);
@@ -1400,5 +1419,76 @@ mod tests {
let failed_tx = res.unwrap();
assert_eq!(failed_tx.status, TransactionStatus::Failed);
}
+
+ #[tokio::test]
+ async fn submit_transaction_concurrent_update_conflict_reloads_latest_state() {
+ // When partial_update fails with ConcurrentUpdateConflict during submission,
+ // the handler should reload the latest state via get_by_id and return Ok.
+ let relayer = create_test_relayer();
+ let mut mocks = default_test_mocks();
+
+ // Provider returns PENDING — submission to RPC succeeded
+ let response = create_send_tx_response(
+ "PENDING",
+ "0101010101010101010101010101010101010101010101010101010101010101",
+ );
+ mocks
+ .provider
+ .expect_send_transaction_with_status()
+ .returning(move |_| {
+ let r = response.clone();
+ Box::pin(async move { Ok(r) })
+ });
+
+ // partial_update (Submitted) fails with CAS conflict
+ mocks
+ .tx_repo
+ .expect_partial_update()
+ .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
+ .times(1)
+ .returning(|_, _| {
+ Err(RepositoryError::ConcurrentUpdateConflict(
+ "CAS mismatch".to_string(),
+ ))
+ });
+
+ // After conflict, handler reloads via get_by_id
+ let reloaded_tx = {
+ let mut t = create_test_transaction(&relayer.id);
+ t.status = TransactionStatus::Submitted;
+ t
+ };
+ let reloaded_clone = reloaded_tx.clone();
+ mocks
+ .tx_repo
+ .expect_get_by_id()
+ .times(1)
+ .returning(move |_| Ok(reloaded_clone.clone()));
+
+ // No failure handling (notifications, next-pending) should occur
+ mocks
+ .job_producer
+ .expect_produce_send_notification_job()
+ .never();
+ mocks
+ .job_producer
+ .expect_produce_transaction_request_job()
+ .never();
+
+ let handler = make_stellar_tx_handler(relayer.clone(), mocks);
+ let mut tx = create_test_transaction(&relayer.id);
+ tx.status = TransactionStatus::Sent;
+ if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
+ data.signatures.push(dummy_signature());
+ data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
+ }
+
+ let res = handler.submit_transaction_impl(tx).await;
+
+ assert!(res.is_ok(), "CAS conflict should return Ok after reload");
+ let returned_tx = res.unwrap();
+ // Reloaded state reflects the concurrent writer's update
+ assert_eq!(returned_tx.status, TransactionStatus::Submitted);
+ }
}
}
diff --git a/src/jobs/handlers/transaction_status_handler.rs b/src/jobs/handlers/transaction_status_handler.rs
index 3f357243a..0e1be351a 100644
--- a/src/jobs/handlers/transaction_status_handler.rs
+++ b/src/jobs/handlers/transaction_status_handler.rs
@@ -13,8 +13,7 @@ use crate::{
domain::{get_relayer_transaction, get_transaction_by_id, is_final_state, Transaction},
jobs::{Job, StatusCheckContext, TransactionStatusCheck},
models::{
- ApiError, DefaultAppState, TransactionMetadata, TransactionRepoModel,
- TransactionUpdateRequest,
+ ApiError, DefaultAppState, TransactionError, TransactionMetadata, TransactionRepoModel,
},
observability::request_id::set_request_id,
queues::{HandlerError, WorkerContext},
@@ -70,7 +69,7 @@ pub async fn transaction_status_handler(
/// - If error with should_retry=false → Return Ok (job completes, e.g., transaction not found)
/// - If counters are None (early failure) → Skip counter updates
///
-/// Counters are stored in transaction metadata, persisted via partial_update.
+/// Counters are stored in transaction metadata, persisted via atomic Lua scripts.
async fn handle_result
(
result: Result,
tx_repo: &TR,
@@ -105,19 +104,15 @@ where
"transaction not in final state"
);
- // Use tx.metadata (fresh from handle_transaction_status) instead of the
- // pre-check snapshot to avoid overwriting retry counters
- // (e.g. try_again_later_retries, insufficient_fee_retries) that may have
- // been updated during resubmission.
+ // Use fresh metadata from the transaction (updated during handle_transaction_status)
+ // to decide whether a reset is needed, falling back to the pre-check snapshot.
let fresh_meta = tx.metadata.clone().or(metadata);
- if let Some(mut meta) = fresh_meta {
- if meta.consecutive_failures > 0 || meta.total_failures > 0 {
- meta.consecutive_failures = 0;
- let update = TransactionUpdateRequest {
- metadata: Some(meta),
- ..Default::default()
- };
- if let Err(e) = tx_repo.partial_update(tx_id.to_string(), update).await {
+ if let Some(meta) = fresh_meta {
+ if meta.consecutive_failures > 0 {
+ if let Err(e) = tx_repo
+ .reset_status_check_consecutive_failures(tx_id.to_string())
+ .await
+ {
warn!(error = %e, tx_id = %tx_id, relayer_id = %tx.relayer_id, "failed to reset consecutive counter");
}
}
@@ -130,6 +125,17 @@ where
)))
}
Err(e) => {
+ if e.downcast_ref::()
+ .is_some_and(TransactionError::is_concurrent_update_conflict)
+ {
+ info!(
+ error = %e,
+ tx_id = %tx_id,
+ "status check lost a concurrent update race, completing job without counter changes"
+ );
+ return Ok(());
+ }
+
// Check if this is a permanent failure that shouldn't retry
if !should_retry_on_error {
info!(
@@ -141,24 +147,20 @@ where
}
// Transient error - INCREMENT both counters (only if we have metadata)
- if let Some(mut meta) = metadata {
- meta.consecutive_failures = meta.consecutive_failures.saturating_add(1);
- meta.total_failures = meta.total_failures.saturating_add(1);
-
+ if let Some(meta) = metadata {
warn!(
error = %e,
tx_id = %tx_id,
- consecutive_failures = meta.consecutive_failures,
- total_failures = meta.total_failures,
+ consecutive_failures = meta.consecutive_failures.saturating_add(1),
+ total_failures = meta.total_failures.saturating_add(1),
"status check failed, incrementing failure counters"
);
- // Update counters via transaction repository
- let update = TransactionUpdateRequest {
- metadata: Some(meta),
- ..Default::default()
- };
- if let Err(update_err) = tx_repo.partial_update(tx_id.to_string(), update).await {
+ // Update counters via atomic transaction repository method
+ if let Err(update_err) = tx_repo
+ .increment_status_check_failures(tx_id.to_string())
+ .await
+ {
warn!(error = %update_err, tx_id = %tx_id, "failed to update counters");
}
} else {
@@ -304,7 +306,10 @@ async fn handle_request(
#[cfg(test)]
mod tests {
use super::*;
- use crate::models::{NetworkType, TransactionStatus};
+ use crate::{
+ models::{NetworkType, TransactionStatus},
+ repositories::MockTransactionRepository,
+ };
use std::collections::HashMap;
#[tokio::test]
@@ -505,6 +510,26 @@ mod tests {
mod handle_request_result_tests {
use super::*;
+ #[tokio::test]
+ async fn test_handle_result_ignores_concurrent_update_conflict() {
+ let tx_repo = MockTransactionRepository::new();
+
+ let result = handle_result(
+ Err(TransactionError::ConcurrentUpdateConflict("tx race".to_string()).into()),
+ &tx_repo,
+ "tx-1",
+ Some(TransactionMetadata {
+ consecutive_failures: 2,
+ total_failures: 5,
+ ..Default::default()
+ }),
+ true,
+ )
+ .await;
+
+ assert!(result.is_ok());
+ }
+
#[test]
fn test_handle_request_result_with_counters() {
let result = HandleRequestResult {
diff --git a/src/models/error/repository.rs b/src/models/error/repository.rs
index ca526c46b..50e6a1b67 100644
--- a/src/models/error/repository.rs
+++ b/src/models/error/repository.rs
@@ -22,6 +22,9 @@ pub enum RepositoryError {
#[error("Transaction failure: {0}")]
TransactionFailure(String),
+ #[error("Concurrent update conflict: {0}")]
+ ConcurrentUpdateConflict(String),
+
#[error("Transaction validation failed: {0}")]
TransactionValidationFailed(String),
@@ -85,6 +88,7 @@ mod tests {
RepositoryError::ConstraintViolation("Constraint error".to_string()),
RepositoryError::InvalidData("Invalid data".to_string()),
RepositoryError::TransactionFailure("Transaction failed".to_string()),
+ RepositoryError::ConcurrentUpdateConflict("Concurrent conflict".to_string()),
RepositoryError::TransactionValidationFailed("Validation failed".to_string()),
RepositoryError::PermissionDenied("Permission denied".to_string()),
RepositoryError::NotSupported("Not supported".to_string()),
diff --git a/src/models/error/transaction.rs b/src/models/error/transaction.rs
index 1d8c87a5a..d5387b186 100644
--- a/src/models/error/transaction.rs
+++ b/src/models/error/transaction.rs
@@ -42,6 +42,9 @@ pub enum TransactionError {
#[error("Unexpected error: {0}")]
UnexpectedError(String),
+ #[error("Concurrent update conflict: {0}")]
+ ConcurrentUpdateConflict(String),
+
#[error("Not supported: {0}")]
NotSupported(String),
@@ -82,6 +85,7 @@ impl TransactionError {
// Transient errors - may resolve on retry
TransactionError::UnexpectedError(_) => true,
+ TransactionError::ConcurrentUpdateConflict(_) => true,
TransactionError::JobProducerError(_) => true,
// Permanent errors - fail immediately
@@ -95,6 +99,11 @@ impl TransactionError {
TransactionError::StellarTransactionValidationError(_) => false,
}
}
+
+ /// Detects optimistic-lock conflicts caused by concurrent transaction updates.
+ pub fn is_concurrent_update_conflict(&self) -> bool {
+ matches!(self, TransactionError::ConcurrentUpdateConflict(_))
+ }
}
impl From for ApiError {
@@ -114,6 +123,7 @@ impl From for ApiError {
}
TransactionError::NotSupported(msg) => ApiError::BadRequest(msg),
TransactionError::UnexpectedError(msg) => ApiError::InternalError(msg),
+ TransactionError::ConcurrentUpdateConflict(msg) => ApiError::InternalError(msg),
TransactionError::SignerError(msg) => ApiError::InternalError(msg),
TransactionError::InsufficientBalance(msg) => ApiError::BadRequest(msg),
TransactionError::SimulationFailed(msg) => ApiError::BadRequest(msg),
@@ -123,7 +133,25 @@ impl From for ApiError {
impl From for TransactionError {
fn from(error: RepositoryError) -> Self {
- TransactionError::ValidationError(error.to_string())
+ match error {
+ RepositoryError::NotFound(msg)
+ | RepositoryError::InvalidData(msg)
+ | RepositoryError::ConstraintViolation(msg)
+ | RepositoryError::TransactionValidationFailed(msg) => {
+ TransactionError::ValidationError(msg)
+ }
+ RepositoryError::ConcurrentUpdateConflict(msg) => {
+ TransactionError::ConcurrentUpdateConflict(msg)
+ }
+ RepositoryError::TransactionFailure(msg)
+ | RepositoryError::LockError(msg)
+ | RepositoryError::ConnectionError(msg)
+ | RepositoryError::PermissionDenied(msg)
+ | RepositoryError::Unknown(msg)
+ | RepositoryError::UnexpectedError(msg)
+ | RepositoryError::Other(msg) => TransactionError::UnexpectedError(msg),
+ RepositoryError::NotSupported(msg) => TransactionError::NotSupported(msg),
+ }
}
}
@@ -205,6 +233,10 @@ mod tests {
TransactionError::SimulationFailed("sim failed".to_string()),
"Stellar transaction simulation failed: sim failed",
),
+ (
+ TransactionError::ConcurrentUpdateConflict("conflict on tx-1".to_string()),
+ "Concurrent update conflict: conflict on tx-1",
+ ),
];
for (error, expected_message) in test_cases {
@@ -247,6 +279,10 @@ mod tests {
TransactionError::SimulationFailed("boom".to_string()),
ApiError::BadRequest("boom".to_string()),
),
+ (
+ TransactionError::ConcurrentUpdateConflict("conflict".to_string()),
+ ApiError::InternalError("conflict".to_string()),
+ ),
];
for (tx_error, expected_api_error) in test_cases {
@@ -271,12 +307,52 @@ mod tests {
match tx_error {
TransactionError::ValidationError(msg) => {
- assert_eq!(msg, "Entity not found: record not found");
+ assert_eq!(msg, "record not found");
}
_ => panic!("Expected TransactionError::ValidationError"),
}
}
+ #[test]
+ fn test_concurrent_update_conflict_variant() {
+ let error = TransactionError::ConcurrentUpdateConflict("conflict on tx-1".to_string());
+ assert!(error.is_transient());
+ assert!(error.is_concurrent_update_conflict());
+ assert_eq!(
+ error.to_string(),
+ "Concurrent update conflict: conflict on tx-1"
+ );
+
+ let api_error = ApiError::from(error);
+ assert!(matches!(api_error, ApiError::InternalError(_)));
+ }
+
+ #[test]
+ fn test_concurrent_update_conflict_repo_conversion() {
+ let repo_error = RepositoryError::ConcurrentUpdateConflict("conflict on tx-1".to_string());
+ let tx_error = TransactionError::from(repo_error);
+ assert!(matches!(
+ tx_error,
+ TransactionError::ConcurrentUpdateConflict(_)
+ ));
+ assert!(tx_error.is_concurrent_update_conflict());
+ }
+
+ #[test]
+ fn test_non_conflict_errors_are_not_concurrent_update_conflict() {
+ let errors = vec![
+ TransactionError::ValidationError("some error".to_string()),
+ TransactionError::UnexpectedError("some error".to_string()),
+ TransactionError::NetworkConfiguration("some error".to_string()),
+ ];
+ for error in errors {
+ assert!(
+ !error.is_concurrent_update_conflict(),
+ "Error {error:?} should not be a concurrent update conflict"
+ );
+ }
+ }
+
#[test]
fn test_report_to_transaction_error() {
let report = Report::msg("An unexpected error occurred");
diff --git a/src/models/transaction/repository.rs b/src/models/transaction/repository.rs
index 803407a84..65490308e 100644
--- a/src/models/transaction/repository.rs
+++ b/src/models/transaction/repository.rs
@@ -73,22 +73,33 @@ pub struct TransactionMetadata {
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct TransactionUpdateRequest {
+ #[serde(skip_serializing_if = "Option::is_none")]
pub status: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
pub status_reason: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
pub sent_at: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
pub confirmed_at: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
pub network_data: Option,
/// Timestamp when gas price was determined
+ #[serde(skip_serializing_if = "Option::is_none")]
pub priced_at: Option,
/// History of transaction hashes
+ #[serde(skip_serializing_if = "Option::is_none")]
pub hashes: Option>,
/// Number of no-ops in the transaction
+ #[serde(skip_serializing_if = "Option::is_none")]
pub noop_count: Option,
/// Whether the transaction is canceled
+ #[serde(skip_serializing_if = "Option::is_none")]
pub is_canceled: Option,
/// Timestamp when this transaction should be deleted (for final states)
+ #[serde(skip_serializing_if = "Option::is_none")]
pub delete_at: Option,
/// Status check metadata (failure counters for circuit breaker)
+ #[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option,
}
diff --git a/src/repositories/transaction/mod.rs b/src/repositories/transaction/mod.rs
index c5787ca9f..2a96c0a88 100644
--- a/src/repositories/transaction/mod.rs
+++ b/src/repositories/transaction/mod.rs
@@ -116,6 +116,32 @@ pub trait TransactionRepository: Repository {
sent_at: String,
) -> Result;
+ /// Atomically increments status-check failure counters using the latest stored metadata.
+ async fn increment_status_check_failures(
+ &self,
+ tx_id: String,
+ ) -> Result;
+
+ /// Atomically resets consecutive status-check failures to zero while preserving other counters.
+ async fn reset_status_check_consecutive_failures(
+ &self,
+ tx_id: String,
+ ) -> Result;
+
+ /// Atomically sets `sent_at` and increments Stellar insufficient-fee retries.
+ async fn record_stellar_insufficient_fee_retry(
+ &self,
+ tx_id: String,
+ sent_at: String,
+ ) -> Result;
+
+ /// Atomically sets `sent_at` and increments Stellar try-again-later retries.
+ async fn record_stellar_try_again_later_retry(
+ &self,
+ tx_id: String,
+ sent_at: String,
+ ) -> Result;
+
/// Set the confirmed_at timestamp of a transaction
async fn set_confirmed_at(
&self,
@@ -192,6 +218,10 @@ mockall::mock! {
async fn partial_update(&self, tx_id: String, update: TransactionUpdateRequest) -> Result;
async fn update_network_data(&self, tx_id: String, network_data: NetworkTransactionData) -> Result;
async fn set_sent_at(&self, tx_id: String, sent_at: String) -> Result;
+ async fn increment_status_check_failures(&self, tx_id: String) -> Result;
+ async fn reset_status_check_consecutive_failures(&self, tx_id: String) -> Result;
+ async fn record_stellar_insufficient_fee_retry(&self, tx_id: String, sent_at: String) -> Result;
+ async fn record_stellar_try_again_later_retry(&self, tx_id: String, sent_at: String) -> Result;
async fn set_confirmed_at(&self, tx_id: String, confirmed_at: String) -> Result;
async fn count_by_status(&self, relayer_id: &str, statuses: &[TransactionStatus]) -> Result;
async fn delete_by_ids(&self, ids: Vec) -> Result;
@@ -367,6 +397,68 @@ impl TransactionRepository for TransactionRepositoryStorage {
}
}
+ async fn increment_status_check_failures(
+ &self,
+ tx_id: String,
+ ) -> Result {
+ match self {
+ TransactionRepositoryStorage::InMemory(repo) => {
+ repo.increment_status_check_failures(tx_id).await
+ }
+ TransactionRepositoryStorage::Redis(repo) => {
+ repo.increment_status_check_failures(tx_id).await
+ }
+ }
+ }
+
+ async fn reset_status_check_consecutive_failures(
+ &self,
+ tx_id: String,
+ ) -> Result {
+ match self {
+ TransactionRepositoryStorage::InMemory(repo) => {
+ repo.reset_status_check_consecutive_failures(tx_id).await
+ }
+ TransactionRepositoryStorage::Redis(repo) => {
+ repo.reset_status_check_consecutive_failures(tx_id).await
+ }
+ }
+ }
+
+ async fn record_stellar_insufficient_fee_retry(
+ &self,
+ tx_id: String,
+ sent_at: String,
+ ) -> Result {
+ match self {
+ TransactionRepositoryStorage::InMemory(repo) => {
+ repo.record_stellar_insufficient_fee_retry(tx_id, sent_at)
+ .await
+ }
+ TransactionRepositoryStorage::Redis(repo) => {
+ repo.record_stellar_insufficient_fee_retry(tx_id, sent_at)
+ .await
+ }
+ }
+ }
+
+ async fn record_stellar_try_again_later_retry(
+ &self,
+ tx_id: String,
+ sent_at: String,
+ ) -> Result {
+ match self {
+ TransactionRepositoryStorage::InMemory(repo) => {
+ repo.record_stellar_try_again_later_retry(tx_id, sent_at)
+ .await
+ }
+ TransactionRepositoryStorage::Redis(repo) => {
+ repo.record_stellar_try_again_later_retry(tx_id, sent_at)
+ .await
+ }
+ }
+ }
+
async fn set_confirmed_at(
&self,
tx_id: String,
@@ -1284,4 +1376,90 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_record_stellar_try_again_later_retry_in_memory() -> Result<()> {
+ let storage = TransactionRepositoryStorage::new_in_memory();
+ let mut transaction = create_test_transaction("test-tx", "test-relayer");
+ transaction.status = TransactionStatus::Sent;
+ storage.create(transaction).await?;
+
+ let sent_at = "2025-03-18T10:00:00Z".to_string();
+ let updated = storage
+ .record_stellar_try_again_later_retry("test-tx".to_string(), sent_at.clone())
+ .await?;
+
+ assert_eq!(updated.id, "test-tx");
+ assert_eq!(updated.sent_at, Some(sent_at));
+ let meta = updated.metadata.expect("metadata should be set");
+ assert_eq!(meta.try_again_later_retries, 1);
+ assert_eq!(meta.consecutive_failures, 0);
+ assert_eq!(meta.total_failures, 0);
+ assert_eq!(meta.insufficient_fee_retries, 0);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_record_stellar_try_again_later_retry_accumulates_in_memory() -> Result<()> {
+ let storage = TransactionRepositoryStorage::new_in_memory();
+ let mut transaction = create_test_transaction("test-tx", "test-relayer");
+ transaction.status = TransactionStatus::Sent;
+ storage.create(transaction).await?;
+
+ storage
+ .record_stellar_try_again_later_retry(
+ "test-tx".to_string(),
+ "2025-03-18T10:00:00Z".to_string(),
+ )
+ .await?;
+
+ let updated = storage
+ .record_stellar_try_again_later_retry(
+ "test-tx".to_string(),
+ "2025-03-18T10:01:00Z".to_string(),
+ )
+ .await?;
+
+ assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
+ let meta = updated.metadata.unwrap();
+ assert_eq!(meta.try_again_later_retries, 2);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_record_stellar_try_again_later_retry_noop_on_final_state_in_memory() -> Result<()>
+ {
+ let storage = TransactionRepositoryStorage::new_in_memory();
+ let mut transaction = create_test_transaction("test-tx", "test-relayer");
+ transaction.status = TransactionStatus::Confirmed;
+ transaction.sent_at = Some("old-time".to_string());
+ storage.create(transaction).await?;
+
+ let result = storage
+ .record_stellar_try_again_later_retry("test-tx".to_string(), "new-time".to_string())
+ .await?;
+
+ assert_eq!(result.sent_at.as_deref(), Some("old-time"));
+ assert!(result.metadata.is_none());
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_record_stellar_try_again_later_retry_not_found_in_memory() -> Result<()> {
+ let storage = TransactionRepositoryStorage::new_in_memory();
+
+ let result = storage
+ .record_stellar_try_again_later_retry(
+ "nonexistent".to_string(),
+ "2025-03-18T10:00:00Z".to_string(),
+ )
+ .await;
+
+ assert!(matches!(result, Err(RepositoryError::NotFound(_))));
+
+ Ok(())
+ }
}
diff --git a/src/repositories/transaction/transaction_in_memory.rs b/src/repositories/transaction/transaction_in_memory.rs
index ecbc3bac5..0de1c71de 100644
--- a/src/repositories/transaction/transaction_in_memory.rs
+++ b/src/repositories/transaction/transaction_in_memory.rs
@@ -71,6 +71,16 @@ impl InMemoryTransactionRepository {
.cmp(a_key) // Descending (newest first)
.then_with(|| b.id.cmp(&a.id)) // Tie-breaker: sort by ID for deterministic ordering
}
+
+ fn is_final_state(status: &TransactionStatus) -> bool {
+ matches!(
+ status,
+ TransactionStatus::Confirmed
+ | TransactionStatus::Failed
+ | TransactionStatus::Expired
+ | TransactionStatus::Canceled
+ )
+ }
}
// Implement both traits for InMemoryTransactionRepository
@@ -355,9 +365,100 @@ impl TransactionRepository for InMemoryTransactionRepository {
tx_id: String,
sent_at: String,
) -> Result {
- let mut tx = self.get_by_id(tx_id.clone()).await?;
- tx.sent_at = Some(sent_at);
- self.update(tx_id, tx).await
+ let update = TransactionUpdateRequest {
+ sent_at: Some(sent_at),
+ ..Default::default()
+ };
+ self.partial_update(tx_id, update).await
+ }
+
+ async fn increment_status_check_failures(
+ &self,
+ tx_id: String,
+ ) -> Result {
+ let mut store = Self::acquire_lock(&self.store).await?;
+
+ if let Some(tx) = store.get_mut(&tx_id) {
+ if Self::is_final_state(&tx.status) {
+ return Ok(tx.clone());
+ }
+ let mut metadata = tx.metadata.clone().unwrap_or_default();
+ metadata.consecutive_failures = metadata.consecutive_failures.saturating_add(1);
+ metadata.total_failures = metadata.total_failures.saturating_add(1);
+ tx.metadata = Some(metadata);
+ Ok(tx.clone())
+ } else {
+ Err(RepositoryError::NotFound(format!(
+ "Transaction with ID {tx_id} not found"
+ )))
+ }
+ }
+
+ async fn reset_status_check_consecutive_failures(
+ &self,
+ tx_id: String,
+ ) -> Result {
+ let mut store = Self::acquire_lock(&self.store).await?;
+
+ if let Some(tx) = store.get_mut(&tx_id) {
+ if Self::is_final_state(&tx.status) {
+ return Ok(tx.clone());
+ }
+ let mut metadata = tx.metadata.clone().unwrap_or_default();
+ metadata.consecutive_failures = 0;
+ tx.metadata = Some(metadata);
+ Ok(tx.clone())
+ } else {
+ Err(RepositoryError::NotFound(format!(
+ "Transaction with ID {tx_id} not found"
+ )))
+ }
+ }
+
+ async fn record_stellar_insufficient_fee_retry(
+ &self,
+ tx_id: String,
+ sent_at: String,
+ ) -> Result {
+ let mut store = Self::acquire_lock(&self.store).await?;
+
+ if let Some(tx) = store.get_mut(&tx_id) {
+ if Self::is_final_state(&tx.status) {
+ return Ok(tx.clone());
+ }
+ let mut metadata = tx.metadata.clone().unwrap_or_default();
+ metadata.insufficient_fee_retries = metadata.insufficient_fee_retries.saturating_add(1);
+ tx.metadata = Some(metadata);
+ tx.sent_at = Some(sent_at);
+ Ok(tx.clone())
+ } else {
+ Err(RepositoryError::NotFound(format!(
+ "Transaction with ID {tx_id} not found"
+ )))
+ }
+ }
+
+ async fn record_stellar_try_again_later_retry(
+ &self,
+ tx_id: String,
+ sent_at: String,
+ ) -> Result {
+ let mut store = Self::acquire_lock(&self.store).await?;
+
+ if let Some(tx) = store.get_mut(&tx_id) {
+ if Self::is_final_state(&tx.status) {
+ return Ok(tx.clone());
+ }
+ let mut metadata = tx.metadata.clone().unwrap_or_default();
+ metadata.try_again_later_retries = metadata.try_again_later_retries.saturating_add(1);
+ tx.metadata = Some(metadata);
+ tx.sent_at = Some(sent_at);
+ Ok(tx.clone())
+ } else {
+ Err(RepositoryError::NotFound(format!(
+ "Transaction with ID {tx_id} not found"
+ )))
+ }
}
async fn set_confirmed_at(
@@ -1932,4 +2033,304 @@ mod tests {
let remaining = repo.get_by_id("tx-relayer-2".to_string()).await.unwrap();
assert_eq!(remaining.relayer_id, "relayer-2");
}
+
+ // ── increment_status_check_failures ─────────────────────────────
+
+ #[tokio::test]
+ async fn test_increment_status_check_failures_no_prior_metadata() {
+ let repo = InMemoryTransactionRepository::new();
+ let tx = create_test_transaction_pending_state("tx-inc-1");
+ repo.create(tx).await.unwrap();
+
+ let updated = repo
+ .increment_status_check_failures("tx-inc-1".to_string())
+ .await
+ .unwrap();
+
+ let meta = updated.metadata.expect("metadata should be set");
+ assert_eq!(meta.consecutive_failures, 1);
+ assert_eq!(meta.total_failures, 1);
+ assert_eq!(meta.insufficient_fee_retries, 0);
+ }
+
+ #[tokio::test]
+ async fn test_increment_status_check_failures_accumulates() {
+ let repo = InMemoryTransactionRepository::new();
+ let tx = create_test_transaction_pending_state("tx-inc-2");
+ repo.create(tx).await.unwrap();
+
+ repo.increment_status_check_failures("tx-inc-2".to_string())
+ .await
+ .unwrap();
+ repo.increment_status_check_failures("tx-inc-2".to_string())
+ .await
+ .unwrap();
+ let updated = repo
+ .increment_status_check_failures("tx-inc-2".to_string())
+ .await
+ .unwrap();
+
+ let meta = updated.metadata.unwrap();
+ assert_eq!(meta.consecutive_failures, 3);
+ assert_eq!(meta.total_failures, 3);
+ }
+
+ #[tokio::test]
+ async fn test_increment_status_check_failures_noop_on_final_state() {
+ let repo = InMemoryTransactionRepository::new();
+ let mut tx = create_test_transaction_pending_state("tx-inc-final");
+ tx.status = TransactionStatus::Confirmed;
+ repo.create(tx).await.unwrap();
+
+ let result = repo
+ .increment_status_check_failures("tx-inc-final".to_string())
+ .await
+ .unwrap();
+
+ // Should return unchanged — no metadata set
+ assert!(result.metadata.is_none());
+ assert_eq!(result.status, TransactionStatus::Confirmed);
+ }
+
+ #[tokio::test]
+ async fn test_increment_status_check_failures_not_found() {
+ let repo = InMemoryTransactionRepository::new();
+ let result = repo
+ .increment_status_check_failures("nonexistent".to_string())
+ .await;
+
+ assert!(matches!(result, Err(RepositoryError::NotFound(_))));
+ }
+
+ // ── reset_status_check_consecutive_failures ─────────────────────
+
+ #[tokio::test]
+ async fn test_reset_consecutive_failures() {
+ let repo = InMemoryTransactionRepository::new();
+ let tx = create_test_transaction_pending_state("tx-reset-1");
+ repo.create(tx).await.unwrap();
+
+ // Increment a few times first
+ repo.increment_status_check_failures("tx-reset-1".to_string())
+ .await
+ .unwrap();
+ repo.increment_status_check_failures("tx-reset-1".to_string())
+ .await
+ .unwrap();
+
+ let updated = repo
+ .reset_status_check_consecutive_failures("tx-reset-1".to_string())
+ .await
+ .unwrap();
+
+ let meta = updated.metadata.unwrap();
+ assert_eq!(meta.consecutive_failures, 0);
+ // total_failures should be preserved
+ assert_eq!(meta.total_failures, 2);
+ }
+
+ #[tokio::test]
+ async fn test_reset_consecutive_failures_noop_on_final_state() {
+ let repo = InMemoryTransactionRepository::new();
+ let mut tx = create_test_transaction_pending_state("tx-reset-final");
+ tx.status = TransactionStatus::Failed;
+ tx.metadata = Some(crate::models::TransactionMetadata {
+ consecutive_failures: 5,
+ total_failures: 10,
+ insufficient_fee_retries: 0,
+ try_again_later_retries: 0,
+ });
+ repo.create(tx).await.unwrap();
+
+ let result = repo
+ .reset_status_check_consecutive_failures("tx-reset-final".to_string())
+ .await
+ .unwrap();
+
+ // Should return unchanged
+ let meta = result.metadata.unwrap();
+ assert_eq!(meta.consecutive_failures, 5);
+ }
+
+ #[tokio::test]
+ async fn test_reset_consecutive_failures_not_found() {
+ let repo = InMemoryTransactionRepository::new();
+ let result = repo
+ .reset_status_check_consecutive_failures("nonexistent".to_string())
+ .await;
+
+ assert!(matches!(result, Err(RepositoryError::NotFound(_))));
+ }
+
+ // ── record_stellar_insufficient_fee_retry ───────────────────────
+
+ #[tokio::test]
+ async fn test_record_insufficient_fee_retry() {
+ let repo = InMemoryTransactionRepository::new();
+ let mut tx = create_test_transaction_pending_state("tx-fee-1");
+ tx.status = TransactionStatus::Sent;
+ tx.sent_at = None;
+ repo.create(tx).await.unwrap();
+
+ let updated = repo
+ .record_stellar_insufficient_fee_retry(
+ "tx-fee-1".to_string(),
+ "2025-03-18T10:00:00Z".to_string(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z"));
+ let meta = updated.metadata.unwrap();
+ assert_eq!(meta.insufficient_fee_retries, 1);
+ assert_eq!(meta.consecutive_failures, 0);
+ assert_eq!(meta.total_failures, 0);
+ }
+
+ #[tokio::test]
+ async fn test_record_insufficient_fee_retry_accumulates() {
+ let repo = InMemoryTransactionRepository::new();
+ let mut tx = create_test_transaction_pending_state("tx-fee-2");
+ tx.status = TransactionStatus::Sent;
+ repo.create(tx).await.unwrap();
+
+ repo.record_stellar_insufficient_fee_retry(
+ "tx-fee-2".to_string(),
+ "2025-03-18T10:00:00Z".to_string(),
+ )
+ .await
+ .unwrap();
+
+ let updated = repo
+ .record_stellar_insufficient_fee_retry(
+ "tx-fee-2".to_string(),
+ "2025-03-18T10:01:00Z".to_string(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
+ let meta = updated.metadata.unwrap();
+ assert_eq!(meta.insufficient_fee_retries, 2);
+ }
+
+ #[tokio::test]
+ async fn test_record_insufficient_fee_retry_noop_on_final_state() {
+ let repo = InMemoryTransactionRepository::new();
+ let mut tx = create_test_transaction_pending_state("tx-fee-final");
+ tx.status = TransactionStatus::Confirmed;
+ tx.sent_at = Some("old-time".to_string());
+ repo.create(tx).await.unwrap();
+
+ let result = repo
+ .record_stellar_insufficient_fee_retry(
+ "tx-fee-final".to_string(),
+ "new-time".to_string(),
+ )
+ .await
+ .unwrap();
+
+ // Should return unchanged
+ assert_eq!(result.sent_at.as_deref(), Some("old-time"));
+ assert!(result.metadata.is_none());
+ }
+
+ #[tokio::test]
+ async fn test_record_insufficient_fee_retry_not_found() {
+ let repo = InMemoryTransactionRepository::new();
+ let result = repo
+ .record_stellar_insufficient_fee_retry(
+ "nonexistent".to_string(),
+ "2025-03-18T10:00:00Z".to_string(),
+ )
+ .await;
+
+ assert!(matches!(result, Err(RepositoryError::NotFound(_))));
+ }
+
+ // ── record_stellar_try_again_later_retry ───────────────────────
+
+ #[tokio::test]
+ async fn test_record_try_again_later_retry() {
+ let repo = InMemoryTransactionRepository::new();
+ let mut tx = create_test_transaction_pending_state("tx-tal-1");
+ tx.status = TransactionStatus::Sent;
+ tx.sent_at = None;
+ repo.create(tx).await.unwrap();
+
+ let updated = repo
+ .record_stellar_try_again_later_retry(
+ "tx-tal-1".to_string(),
+ "2025-03-18T10:00:00Z".to_string(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z"));
+ let meta = updated.metadata.unwrap();
+ assert_eq!(meta.try_again_later_retries, 1);
+ assert_eq!(meta.consecutive_failures, 0);
+ assert_eq!(meta.total_failures, 0);
+ }
+
+ #[tokio::test]
+ async fn test_record_try_again_later_retry_accumulates() {
+ let repo = InMemoryTransactionRepository::new();
+ let mut tx = create_test_transaction_pending_state("tx-tal-2");
+ tx.status = TransactionStatus::Sent;
+ repo.create(tx).await.unwrap();
+
+ repo.record_stellar_try_again_later_retry(
+ "tx-tal-2".to_string(),
+ "2025-03-18T10:00:00Z".to_string(),
+ )
+ .await
+ .unwrap();
+
+ let updated = repo
+ .record_stellar_try_again_later_retry(
+ "tx-tal-2".to_string(),
+ "2025-03-18T10:01:00Z".to_string(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
+ let meta = updated.metadata.unwrap();
+ assert_eq!(meta.try_again_later_retries, 2);
+ }
+
+ #[tokio::test]
+ async fn test_record_try_again_later_retry_noop_on_final_state() {
+ let repo = InMemoryTransactionRepository::new();
+ let mut tx = create_test_transaction_pending_state("tx-tal-final");
+ tx.status = TransactionStatus::Confirmed;
+ tx.sent_at = Some("old-time".to_string());
+ repo.create(tx).await.unwrap();
+
+ let result = repo
+ .record_stellar_try_again_later_retry(
+ "tx-tal-final".to_string(),
+ "new-time".to_string(),
+ )
+ .await
+ .unwrap();
+
+ // Should return unchanged
+ assert_eq!(result.sent_at.as_deref(), Some("old-time"));
+ assert!(result.metadata.is_none());
+ }
+
+ #[tokio::test]
+ async fn test_record_try_again_later_retry_not_found() {
+ let repo = InMemoryTransactionRepository::new();
+ let result = repo
+ .record_stellar_try_again_later_retry(
+ "nonexistent".to_string(),
+ "2025-03-18T10:00:00Z".to_string(),
+ )
+ .await;
+
+ assert!(matches!(result, Err(RepositoryError::NotFound(_))));
+ }
}
diff --git a/src/repositories/transaction/transaction_redis.rs b/src/repositories/transaction/transaction_redis.rs
index 830e903e0..9406f7980 100644
--- a/src/repositories/transaction/transaction_redis.rs
+++ b/src/repositories/transaction/transaction_redis.rs
@@ -1,5 +1,7 @@
//! Redis-backed implementation of the TransactionRepository.
+use crate::config::ServerConfig;
+use crate::constants::FINAL_TRANSACTION_STATUSES;
use crate::domain::transaction::common::is_final_state;
use crate::metrics::{
TRANSACTIONS_BY_STATUS, TRANSACTIONS_CREATED, TRANSACTIONS_FAILED,
@@ -112,6 +114,160 @@ impl RedisTransactionRepository {
)
}
+ /// Returns the components needed for Lua scripts to resolve a tx key from
+ /// only the tx_id: (tx_to_relayer lookup key, key prefix, key suffix).
+ /// The Lua script does: `GET KEYS[1]` to get the relayer_id, then
+ /// constructs the tx key as `ARGV[1] .. relayer_id .. ARGV[2]`.
+ fn tx_key_parts(&self, tx_id: &str) -> (String, String, String) {
+ let lookup_key = self.tx_to_relayer_key(tx_id);
+ let key_prefix = format!("{}:{}:", self.key_prefix, RELAYER_PREFIX);
+ let key_suffix = format!(":{TX_PREFIX}:{tx_id}");
+ (lookup_key, key_prefix, key_suffix)
+ }
+
+ /// Executes an atomic Lua script with retry/backoff for transient Redis failures.
+ ///
+ /// Every script receives `KEYS[1]` = tx_to_relayer lookup key and
+ /// `ARGV[1..2]` = key prefix/suffix. `extra_args` are appended as `ARGV[3..]`.
+ /// The script must return the (possibly updated) JSON string or `false` for
+ /// not-found.
+ async fn run_atomic_script(
+ &self,
+ lua: &str,
+ tx_id: &str,
+ extra_args: &[&str],
+ op_name: &str,
+ ) -> Result {
+ const MAX_RETRIES: u32 = 3;
+ const BASE_BACKOFF_MS: u64 = 100;
+
+ let (lookup_key, key_prefix, key_suffix) = self.tx_key_parts(tx_id);
+ let script = Script::new(lua);
+ let mut last_error = None;
+
+ for attempt in 0..MAX_RETRIES {
+ let backoff = BASE_BACKOFF_MS * 2u64.pow(attempt);
+
+ let mut conn = match self
+ .get_connection(self.connections.primary(), op_name)
+ .await
+ {
+ Ok(conn) => conn,
+ Err(e) => {
+ last_error = Some(e);
+ if attempt < MAX_RETRIES - 1 {
+ warn!(tx_id = %tx_id, attempt, op = %op_name, "connection failed, retrying");
+ tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
+ continue;
+ }
+ return Err(last_error.unwrap());
+ }
+ };
+
+ let mut invocation = script.prepare_invoke();
+ invocation
+ .key(&lookup_key)
+ .arg(&key_prefix)
+ .arg(&key_suffix);
+ for arg in extra_args {
+ invocation.arg(*arg);
+ }
+
+ match invocation.invoke_async::