Skip to content

Commit d1e14c0

Browse files
apollo_mempool: refactor delay queue to support multiple tx types (#13405)
Introduce DelayedTx enum and extend AddTransactionQueue with delay-aware methods (try_delay, drain_ready). Replace the single delayed_declares field with a Vec<AddTransactionQueue> so all callsites iterate generically, preparing for delayed proof transactions. Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent f230436 commit d1e14c0

2 files changed

Lines changed: 126 additions & 44 deletions

File tree

crates/apollo_mempool/src/fee_mempool_test.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use starknet_api::transaction::fields::TransactionSignature;
2828
use starknet_api::transaction::TransactionHash;
2929
use starknet_api::{contract_address, declare_tx_args, felt, invoke_tx_args, nonce, tx_hash};
3030

31-
use super::AddTransactionQueue;
31+
use super::DelayedQueues;
3232
use crate::communication::MempoolCommunicationWrapper;
3333
use crate::fee_transaction_queue::FeeTransactionQueue;
3434
use crate::mempool::{
@@ -150,7 +150,7 @@ impl MempoolTestContentBuilder {
150150
fn build_full_mempool(self) -> Mempool {
151151
Mempool {
152152
config: self.config.clone(),
153-
delayed_declares: AddTransactionQueue::new(),
153+
delayed_queues: DelayedQueues::new(self.config.static_config.declare_delay),
154154
tx_pool: self.content.tx_pool.unwrap_or_default().into_values().collect(),
155155
tx_queue: Box::new(FeeTransactionQueue::new(
156156
self.content.priority_txs.unwrap_or_default(),
@@ -1381,7 +1381,7 @@ fn delay_declare_txs() {
13811381

13821382
// Complete the first declare's delay.
13831383
fake_clock.advance(declare_delay - Duration::from_secs(1));
1384-
// Add another transaction to trigger `add_ready_declares`.
1384+
// Add another transaction to trigger draining of ready delayed txs.
13851385
let another_tx_1 =
13861386
add_tx_input!(tx_hash: 123, address: "0x123", tx_nonce: 123, account_nonce: 0, tip: 123);
13871387
add_tx(&mut mempool, &another_tx_1);
@@ -1391,7 +1391,7 @@ fn delay_declare_txs() {
13911391

13921392
// Complete the second declare's delay.
13931393
fake_clock.advance(Duration::from_secs(1));
1394-
// Add another transaction to trigger `add_ready_declares`
1394+
// Add another transaction to trigger draining of ready delayed txs.
13951395
let another_tx_2 =
13961396
add_tx_input!(tx_hash: 2, address: "0x1", tx_nonce: 5, account_nonce: 0, tip: 100);
13971397
add_tx(&mut mempool, &another_tx_2);

crates/apollo_mempool/src/mempool.rs

Lines changed: 122 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::collections::{HashMap, VecDeque};
22
use std::sync::Arc;
3+
use std::time::Duration;
34

45
use apollo_config::behavior_mode::BehaviorMode;
56
use apollo_mempool_config::config::{MempoolConfig, MempoolDynamicConfig};
@@ -191,16 +192,60 @@ impl MempoolState {
191192
}
192193
}
193194

194-
// A queue to hold transactions that are waiting to be added to the tx pool.
195-
struct AddTransactionQueue {
195+
/// Classifies which transactions should be delayed before entering the main tx pool.
196+
#[cfg_attr(test, derive(Clone))]
197+
pub(crate) enum DelayedTx {
198+
Declare,
199+
}
200+
201+
impl DelayedTx {
202+
fn matches(&self, tx: &InternalRpcTransactionWithoutTxHash) -> bool {
203+
match self {
204+
Self::Declare => matches!(tx, InternalRpcTransactionWithoutTxHash::Declare(_)),
205+
}
206+
}
207+
}
208+
209+
/// A queue to hold transactions that are waiting to be added to the tx pool after a delay.
210+
#[cfg_attr(test, derive(Clone))]
211+
pub(crate) struct AddTransactionQueue {
196212
elements: VecDeque<(DateTime, AddTransactionArgs)>,
197213
// Keeps track of the total size of the transactions in this queue.
198214
size_in_bytes: u64,
215+
delayed_tx: DelayedTx,
216+
// A zero duration effectively disables the queue.
217+
// Transactions are added directly to the pool.
218+
delay: Duration,
199219
}
200220

201221
impl AddTransactionQueue {
202-
fn new() -> Self {
203-
AddTransactionQueue { elements: VecDeque::new(), size_in_bytes: 0 }
222+
pub(crate) fn new(delayed_tx: DelayedTx, delay: Duration) -> Self {
223+
AddTransactionQueue { elements: VecDeque::new(), size_in_bytes: 0, delayed_tx, delay }
224+
}
225+
226+
/// If this tx matches and delay > 0, enqueue it and return None.
227+
/// Otherwise return Some(args) unchanged.
228+
/// Note: `Duration::ZERO` means the delay feature is disabled for this queue.
229+
fn try_delay(&mut self, now: DateTime, args: AddTransactionArgs) -> Option<AddTransactionArgs> {
230+
if self.delay > Duration::ZERO && self.delayed_tx.matches(&args.tx.tx) {
231+
self.push_back(now, args);
232+
None
233+
} else {
234+
Some(args)
235+
}
236+
}
237+
238+
/// Pop all txs whose delay has elapsed.
239+
fn drain_ready(&mut self, now: DateTime) -> Vec<AddTransactionArgs> {
240+
let mut ready = Vec::new();
241+
while let Some((submission_time, _)) = self.front() {
242+
if now - self.delay < *submission_time {
243+
break;
244+
}
245+
let (_, args) = self.pop_front().expect("Delayed tx should exist.");
246+
ready.push(args);
247+
}
248+
ready
204249
}
205250

206251
fn push_back(&mut self, submission_time: DateTime, args: AddTransactionArgs) {
@@ -240,14 +285,49 @@ impl AddTransactionQueue {
240285
fn size_in_bytes(&self) -> u64 {
241286
self.size_in_bytes
242287
}
288+
289+
fn tx_hashes(&self) -> Vec<TransactionHash> {
290+
self.elements.iter().map(|(_, args)| args.tx.tx_hash).collect()
291+
}
292+
}
293+
294+
#[cfg_attr(test, derive(Clone))]
295+
pub(crate) struct DelayedQueues {
296+
declares: AddTransactionQueue,
297+
}
298+
299+
impl DelayedQueues {
300+
fn new(declare_delay: Duration) -> Self {
301+
Self { declares: AddTransactionQueue::new(DelayedTx::Declare, declare_delay) }
302+
}
303+
304+
fn iter(&self) -> impl Iterator<Item = &AddTransactionQueue> {
305+
[&self.declares].into_iter()
306+
}
307+
308+
fn iter_mut(&mut self) -> impl Iterator<Item = &mut AddTransactionQueue> {
309+
[&mut self.declares].into_iter()
310+
}
311+
312+
fn drain_ready(&mut self, now: DateTime) -> Vec<AddTransactionArgs> {
313+
self.iter_mut().flat_map(|q| q.drain_ready(now)).collect()
314+
}
315+
316+
fn size_in_bytes(&self) -> u64 {
317+
self.iter().map(|q| q.size_in_bytes()).sum()
318+
}
319+
320+
fn contains(&self, address: ContractAddress, nonce: Nonce) -> bool {
321+
self.iter().any(|q| q.contains(address, nonce))
322+
}
243323
}
244324

245325
pub struct Mempool {
246326
pub(crate) config: MempoolConfig,
247327
// TODO(AlonH): add docstring explaining visibility and coupling of the fields.
248-
// Declare transactions that are waiting to be added to the tx pool after a delay.
249-
delayed_declares: AddTransactionQueue,
250-
// All transactions currently held in the mempool (excluding the delayed declares).
328+
// Transactions that are waiting to be added to the tx pool after a delay.
329+
delayed_queues: DelayedQueues,
330+
// All transactions currently held in the mempool (excluding delayed transactions).
251331
tx_pool: TransactionPool,
252332
// Transactions eligible for sequencing.
253333
tx_queue: Box<dyn TransactionQueueTrait>,
@@ -267,9 +347,11 @@ impl Mempool {
267347
BehaviorMode::Starknet => Box::new(FeeTransactionQueue::default()),
268348
};
269349

350+
let delayed_queues = DelayedQueues::new(config.static_config.declare_delay);
351+
270352
Mempool {
271353
config: config.clone(),
272-
delayed_declares: AddTransactionQueue::new(),
354+
delayed_queues,
273355
tx_pool: TransactionPool::new(clock.clone()),
274356
tx_queue,
275357
accounts_with_gap: AccountsWithGap::new(),
@@ -319,9 +401,8 @@ impl Mempool {
319401
// TODO(AlonH): Consider renaming to `pop_txs` to be more consistent with the standard library.
320402
#[instrument(skip(self), err(level = "debug"))]
321403
pub fn get_txs(&mut self, n_txs: usize) -> MempoolResult<Vec<InternalRpcTransaction>> {
322-
// All transactions are enqueued in FIFO mode.
323404
if !self.is_fifo() {
324-
self.add_ready_declares();
405+
self.drain_ready_delayed_txs();
325406
}
326407
let mut eligible_tx_references: Vec<TransactionReference> = Vec::with_capacity(n_txs);
327408
let mut n_remaining_txs = n_txs;
@@ -455,7 +536,7 @@ impl Mempool {
455536
// First remove old transactions from the pool.
456537
let mut account_nonce_updates = self.remove_expired_txs();
457538
if !self.is_fifo() {
458-
self.add_ready_declares();
539+
self.drain_ready_delayed_txs();
459540
}
460541

461542
let tx_reference = TransactionReference::new(&args.tx);
@@ -474,13 +555,10 @@ impl Mempool {
474555
self.state.resolve_nonce(args.account_state.address, args.account_state.nonce),
475556
);
476557

477-
let should_delay_declare =
478-
matches!(&args.tx.tx, InternalRpcTransactionWithoutTxHash::Declare(_))
479-
&& !self.is_fifo();
480-
if should_delay_declare {
481-
self.delayed_declares.push_back(self.clock.now(), args);
482-
} else {
558+
if self.is_fifo() {
483559
self.add_tx_inner(args);
560+
} else {
561+
self.add_or_delay_tx(args);
484562
}
485563

486564
self.update_state_metrics();
@@ -584,19 +662,28 @@ impl Mempool {
584662
}
585663
}
586664

587-
fn add_ready_declares(&mut self) {
665+
/// Drains all delayed transaction queues, moving ready transactions to the main pool.
666+
fn drain_ready_delayed_txs(&mut self) {
588667
let now = self.clock.now();
589-
while let Some((submission_time, _args)) = self.delayed_declares.front() {
590-
if now - self.config.static_config.declare_delay < *submission_time {
591-
break;
592-
}
593-
let (_submission_time, args) =
594-
self.delayed_declares.pop_front().expect("Delay declare should exist.");
668+
let ready_txs = self.delayed_queues.drain_ready(now);
669+
for args in ready_txs {
595670
self.add_tx_inner(args);
596671
}
597672
self.update_state_metrics();
598673
}
599674

675+
/// Tries to delay the transaction. If no queue claims it, adds it to the pool directly.
676+
fn add_or_delay_tx(&mut self, mut args: AddTransactionArgs) {
677+
let now = self.clock.now();
678+
for queue in self.delayed_queues.iter_mut() {
679+
match queue.try_delay(now, args) {
680+
None => return,
681+
Some(returned_args) => args = returned_args,
682+
}
683+
}
684+
self.add_tx_inner(args);
685+
}
686+
600687
/// Update the mempool's internal state according to the committed block (resolves nonce gaps,
601688
/// updates account balances).
602689
#[instrument(skip(self, args))]
@@ -677,13 +764,13 @@ impl Mempool {
677764
self.state.validate_incoming_tx(tx_reference, incoming_account_nonce)
678765
}
679766

680-
/// Validates that the given transaction does not front run a delayed declare. This means in
681-
/// particular that no fee escalation can occur to a declare that is being delayed.
682-
fn validate_no_delayed_declare_front_run(
767+
/// Validates that the given transaction does not front-run a delayed transaction. This means in
768+
/// particular that no fee escalation can occur to a transaction that is being delayed.
769+
fn validate_no_delayed_tx_front_run(
683770
&self,
684771
tx_reference: TransactionReference,
685772
) -> MempoolResult<()> {
686-
if self.delayed_declares.contains(tx_reference.address, tx_reference.nonce) {
773+
if self.delayed_queues.contains(tx_reference.address, tx_reference.nonce) {
687774
return Err(MempoolError::DuplicateNonce {
688775
address: tx_reference.address,
689776
nonce: tx_reference.nonce,
@@ -734,7 +821,7 @@ impl Mempool {
734821
) -> MempoolResult<()> {
735822
let TransactionReference { address, nonce, .. } = incoming_tx_reference;
736823

737-
self.validate_no_delayed_declare_front_run(incoming_tx_reference)?;
824+
self.validate_no_delayed_tx_front_run(incoming_tx_reference)?;
738825

739826
if !self.config.static_config.enable_fee_escalation {
740827
if self.tx_pool.get_by_address_and_nonce(address, nonce).is_some() {
@@ -868,19 +955,14 @@ impl Mempool {
868955
pub fn mempool_snapshot(&self) -> MempoolResult<MempoolSnapshot> {
869956
Ok(MempoolSnapshot {
870957
transactions: self.tx_pool.chronological_txs_hashes(),
871-
delayed_declares: self
872-
.delayed_declares
873-
.elements
874-
.iter()
875-
.map(|(_, args)| args.tx.tx_hash)
876-
.collect(),
958+
delayed_declares: self.delayed_queues.declares.tx_hashes(),
877959
transaction_queue: self.tx_queue.queue_snapshot(),
878960
mempool_state: self.state.state_snapshot(),
879961
})
880962
}
881963

882964
fn size_in_bytes(&self) -> u64 {
883-
self.tx_pool.size_in_bytes() + self.delayed_declares.size_in_bytes()
965+
self.tx_pool.size_in_bytes() + self.delayed_queues.size_in_bytes()
884966
}
885967

886968
// Returns true if the mempool will exceeds its capacity by adding the given transaction.
@@ -890,9 +972,9 @@ impl Mempool {
890972

891973
fn update_accounts_with_gap(&mut self, address_to_nonce: AddressToNonce) {
892974
for (address, account_nonce) in address_to_nonce {
893-
// Assumption: Future declares are not allowed — their nonce must match the account
894-
// nonce, so they fill a gap if one exists.
895-
if self.delayed_declares.contains(address, account_nonce) {
975+
// A delayed transaction whose nonce matches the account nonce fills a gap if one
976+
// exists.
977+
if self.delayed_queues.contains(address, account_nonce) {
896978
self.accounts_with_gap.swap_remove(&address);
897979
continue;
898980
}
@@ -990,7 +1072,7 @@ impl Mempool {
9901072
MEMPOOL_POOL_SIZE.set_lossy(self.tx_pool.len());
9911073
MEMPOOL_PRIORITY_QUEUE_SIZE.set_lossy(self.tx_queue.priority_queue_len());
9921074
MEMPOOL_PENDING_QUEUE_SIZE.set_lossy(self.tx_queue.pending_queue_len());
993-
MEMPOOL_DELAYED_DECLARES_SIZE.set_lossy(self.delayed_declares.len());
1075+
MEMPOOL_DELAYED_DECLARES_SIZE.set_lossy(self.delayed_queues.declares.len());
9941076
MEMPOOL_TOTAL_SIZE_BYTES.set_lossy(self.size_in_bytes());
9951077
}
9961078
}

0 commit comments

Comments
 (0)