Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/examples/channels/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { handler } from '@openzeppelin/relayer-plugin-channels';
91 changes: 91 additions & 0 deletions src/domain/transaction/stellar/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,23 @@ where
"status check encountered error"
);

// CAS conflict means another writer already mutated this tx.
// Reload the latest state and return Ok so the status handler
// sees a non-final status and schedules the next poll cycle via
// HandlerError::Retry — no work is lost, just deferred.
if error.is_concurrent_update_conflict() {
info!(
tx_id = %tx.id,
relayer_id = %tx.relayer_id,
"concurrent transaction update detected during status handling, reloading latest state"
);
return self
.transaction_repository()
.get_by_id(tx.id.clone())
.await
.map_err(TransactionError::from);
Comment thread
zeljkoX marked this conversation as resolved.
}

// Handle different error types appropriately
match error {
TransactionError::ValidationError(ref msg) => {
Expand Down Expand Up @@ -1746,6 +1763,80 @@ mod tests {
.unwrap()
.contains("stuck in Sent status for too long"));
}
#[tokio::test]
async fn handle_status_concurrent_update_conflict_reloads_latest_state() {
// When status_core returns ConcurrentUpdateConflict, the handler
// should reload the latest state via get_by_id and return Ok.
let relayer = create_test_relayer();
let mut mocks = default_test_mocks();

let mut tx = create_test_transaction(&relayer.id);
tx.id = "tx-cas-conflict".to_string();
tx.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
let tx_hash_bytes = [11u8; 32];
if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
stellar_data.hash = Some(hex::encode(tx_hash_bytes));
}
tx.status = TransactionStatus::Submitted;

let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);

// Provider returns SUCCESS — triggers a partial_update for confirmation
mocks
.provider
.expect_get_transaction()
.with(eq(expected_stellar_hash))
.times(1)
.returning(move |_| {
Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) })
});

// partial_update fails with ConcurrentUpdateConflict
mocks
.tx_repo
.expect_partial_update()
.times(1)
.returning(|_id, _update| {
Err(RepositoryError::ConcurrentUpdateConflict(
"CAS mismatch".to_string(),
))
});

// After conflict, handler reloads via get_by_id
let reloaded_tx = {
let mut t = create_test_transaction(&relayer.id);
t.id = "tx-cas-conflict".to_string();
// Simulate another writer already confirmed it
t.status = TransactionStatus::Confirmed;
t
};
let reloaded_clone = reloaded_tx.clone();
mocks
.tx_repo
.expect_get_by_id()
.with(eq("tx-cas-conflict".to_string()))
.times(1)
.returning(move |_| Ok(reloaded_clone.clone()));

// No notifications or job enqueuing should happen on CAS path
mocks
.job_producer
.expect_produce_send_notification_job()
.never();
mocks
.job_producer
.expect_produce_transaction_request_job()
.never();

let handler = make_stellar_tx_handler(relayer.clone(), mocks);
let result = handler.handle_transaction_status_impl(tx, None).await;

assert!(result.is_ok(), "CAS conflict should return Ok after reload");
let returned_tx = result.unwrap();
assert_eq!(returned_tx.id, "tx-cas-conflict");
// The reloaded tx reflects what the other writer persisted
assert_eq!(returned_tx.status, TransactionStatus::Confirmed);
}
}

mod handle_pending_state_tests {
Expand Down
Loading
Loading