Skip to content

Commit 7ce24c6

Browse files
zeljkoXtirumerla
andauthored
fix: Fix concurrent transaction repository update races (#698)
* fix: Tx repo concurent updates improvements * chore: Align final state & ignore conflicts * feat: Improve partial_update method * chore: PR suggestions * chore: Add tests * chore: Improvements --------- Co-authored-by: tirumerla <tirumerla@gmail.com>
1 parent 9c675aa commit 7ce24c6

10 files changed

Lines changed: 2070 additions & 476 deletions

File tree

plugins/examples/channels/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export { handler } from '@openzeppelin/relayer-plugin-channels';

src/domain/transaction/stellar/status.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,23 @@ where
112112
"status check encountered error"
113113
);
114114

115+
// CAS conflict means another writer already mutated this tx.
116+
// Reload the latest state and return Ok so the status handler
117+
// sees a non-final status and schedules the next poll cycle via
118+
// HandlerError::Retry — no work is lost, just deferred.
119+
if error.is_concurrent_update_conflict() {
120+
info!(
121+
tx_id = %tx.id,
122+
relayer_id = %tx.relayer_id,
123+
"concurrent transaction update detected during status handling, reloading latest state"
124+
);
125+
return self
126+
.transaction_repository()
127+
.get_by_id(tx.id.clone())
128+
.await
129+
.map_err(TransactionError::from);
130+
}
131+
115132
// Handle different error types appropriately
116133
match error {
117134
TransactionError::ValidationError(ref msg) => {
@@ -1746,6 +1763,80 @@ mod tests {
17461763
.unwrap()
17471764
.contains("stuck in Sent status for too long"));
17481765
}
1766+
#[tokio::test]
1767+
async fn handle_status_concurrent_update_conflict_reloads_latest_state() {
1768+
// When status_core returns ConcurrentUpdateConflict, the handler
1769+
// should reload the latest state via get_by_id and return Ok.
1770+
let relayer = create_test_relayer();
1771+
let mut mocks = default_test_mocks();
1772+
1773+
let mut tx = create_test_transaction(&relayer.id);
1774+
tx.id = "tx-cas-conflict".to_string();
1775+
tx.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1776+
let tx_hash_bytes = [11u8; 32];
1777+
if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1778+
stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1779+
}
1780+
tx.status = TransactionStatus::Submitted;
1781+
1782+
let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1783+
1784+
// Provider returns SUCCESS — triggers a partial_update for confirmation
1785+
mocks
1786+
.provider
1787+
.expect_get_transaction()
1788+
.with(eq(expected_stellar_hash))
1789+
.times(1)
1790+
.returning(move |_| {
1791+
Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) })
1792+
});
1793+
1794+
// partial_update fails with ConcurrentUpdateConflict
1795+
mocks
1796+
.tx_repo
1797+
.expect_partial_update()
1798+
.times(1)
1799+
.returning(|_id, _update| {
1800+
Err(RepositoryError::ConcurrentUpdateConflict(
1801+
"CAS mismatch".to_string(),
1802+
))
1803+
});
1804+
1805+
// After conflict, handler reloads via get_by_id
1806+
let reloaded_tx = {
1807+
let mut t = create_test_transaction(&relayer.id);
1808+
t.id = "tx-cas-conflict".to_string();
1809+
// Simulate another writer already confirmed it
1810+
t.status = TransactionStatus::Confirmed;
1811+
t
1812+
};
1813+
let reloaded_clone = reloaded_tx.clone();
1814+
mocks
1815+
.tx_repo
1816+
.expect_get_by_id()
1817+
.with(eq("tx-cas-conflict".to_string()))
1818+
.times(1)
1819+
.returning(move |_| Ok(reloaded_clone.clone()));
1820+
1821+
// No notifications or job enqueuing should happen on CAS path
1822+
mocks
1823+
.job_producer
1824+
.expect_produce_send_notification_job()
1825+
.never();
1826+
mocks
1827+
.job_producer
1828+
.expect_produce_transaction_request_job()
1829+
.never();
1830+
1831+
let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1832+
let result = handler.handle_transaction_status_impl(tx, None).await;
1833+
1834+
assert!(result.is_ok(), "CAS conflict should return Ok after reload");
1835+
let returned_tx = result.unwrap();
1836+
assert_eq!(returned_tx.id, "tx-cas-conflict");
1837+
// The reloaded tx reflects what the other writer persisted
1838+
assert_eq!(returned_tx.status, TransactionStatus::Confirmed);
1839+
}
17491840
}
17501841

17511842
mod handle_pending_state_tests {

0 commit comments

Comments
 (0)