Skip to content

Commit 77fcf91

Browse files
committed
refactor(chain): use single queue for anchored_txs in canonicalization
- Unify both `unprocessed_anchored_txs` and `pending_anchored_txs` in a single `unprocessed_anchored_txs` queue. - Changes the `unprocessed_anchored_txs from `Iterator` to `VecDeque`. - Removes the `pending_anchored_txs` field and it's usage. - Collects all `anchored_txs` upfront instead of lazy iteration.
1 parent 0d4a64a commit 77fcf91

1 file changed

Lines changed: 12 additions & 35 deletions

File tree

crates/chain/src/canonical_task.rs

Lines changed: 12 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,13 @@ pub struct CanonicalizationTask<'g, A> {
2727
chain_tip: BlockId,
2828

2929
unprocessed_assumed_txs: Box<dyn Iterator<Item = (Txid, Arc<Transaction>)> + 'g>,
30-
unprocessed_anchored_txs:
31-
Box<dyn Iterator<Item = (Txid, Arc<Transaction>, &'g BTreeSet<A>)> + 'g>,
30+
unprocessed_anchored_txs: VecDeque<(Txid, Arc<Transaction>, &'g BTreeSet<A>)>,
3231
unprocessed_seen_txs: Box<dyn Iterator<Item = (Txid, Arc<Transaction>, u64)> + 'g>,
3332
unprocessed_leftover_txs: VecDeque<(Txid, Arc<Transaction>, u32)>,
3433

3534
canonical: CanonicalMap<A>,
3635
not_canonical: NotCanonicalSet,
3736

38-
pending_anchor_checks: VecDeque<(Txid, Arc<Transaction>, Vec<A>)>,
39-
4037
// Store canonical transactions in order
4138
canonical_order: Vec<Txid>,
4239

@@ -48,22 +45,19 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
4845
type Output = CanonicalView<A>;
4946

5047
fn next_query(&mut self) -> Option<ChainRequest> {
51-
// Check if we have pending anchor checks
52-
if let Some((_, _, anchors)) = self.pending_anchor_checks.front() {
53-
// Convert anchors to BlockIds for the ChainRequest
48+
// Get the next unprocessed anchored tx that needs to query a chain oracle.
49+
if let Some((_txid, _tx, anchors)) = self.unprocessed_anchored_txs.front() {
5450
let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect();
5551
return Some(ChainRequest {
5652
chain_tip: self.chain_tip,
5753
block_ids,
5854
});
5955
}
60-
61-
// Process more anchored transactions if available
62-
self.process_anchored_txs()
56+
None
6357
}
6458

6559
fn resolve_query(&mut self, response: ChainResponse) {
66-
if let Some((txid, tx, anchors)) = self.pending_anchor_checks.pop_front() {
60+
if let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.pop_front() {
6761
// Find the anchor that matches the confirmed BlockId
6862
let best_anchor = response.and_then(|block_id| {
6963
anchors
@@ -97,7 +91,7 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
9791
}
9892

9993
fn is_finished(&mut self) -> bool {
100-
self.pending_anchor_checks.is_empty() && self.unprocessed_anchored_txs.size_hint().0 == 0
94+
self.unprocessed_anchored_txs.is_empty()
10195
}
10296

10397
fn finish(mut self) -> Self::Output {
@@ -199,11 +193,10 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
199193
.rev()
200194
.filter_map(|txid| Some((txid, tx_graph.get_tx(txid)?))),
201195
);
202-
let unprocessed_anchored_txs = Box::new(
203-
tx_graph
204-
.txids_by_descending_anchor_height()
205-
.filter_map(|(_, txid)| Some((txid, tx_graph.get_tx(txid)?, anchors.get(&txid)?))),
206-
);
196+
let unprocessed_anchored_txs: VecDeque<_> = tx_graph
197+
.txids_by_descending_anchor_height()
198+
.filter_map(|(_, txid)| Some((txid, tx_graph.get_tx(txid)?, anchors.get(&txid)?)))
199+
.collect();
207200
let unprocessed_seen_txs = Box::new(
208201
tx_graph
209202
.txids_by_descending_last_seen()
@@ -222,8 +215,6 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
222215
canonical: HashMap::new(),
223216
not_canonical: HashSet::new(),
224217

225-
pending_anchor_checks: VecDeque::new(),
226-
227218
canonical_order: Vec::new(),
228219
confirmed_anchors: HashMap::new(),
229220
};
@@ -246,17 +237,6 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
246237
}
247238
}
248239

249-
fn process_anchored_txs(&mut self) -> Option<ChainRequest> {
250-
while let Some((txid, tx, anchors)) = self.unprocessed_anchored_txs.next() {
251-
if !self.is_canonicalized(txid) {
252-
self.pending_anchor_checks
253-
.push_back((txid, tx, anchors.iter().cloned().collect()));
254-
return self.next_query();
255-
}
256-
}
257-
None
258-
}
259-
260240
fn process_seen_txs(&mut self) {
261241
while let Some((txid, tx, last_seen)) = self.unprocessed_seen_txs.next() {
262242
debug_assert!(
@@ -372,11 +352,8 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
372352
if let Some(anchors) = self.tx_graph.all_anchors().get(txid) {
373353
// only check anchors we haven't already confirmed
374354
if !self.confirmed_anchors.contains_key(txid) {
375-
self.pending_anchor_checks.push_back((
376-
*txid,
377-
tx.clone(),
378-
anchors.iter().cloned().collect(),
379-
));
355+
self.unprocessed_anchored_txs
356+
.push_back((*txid, tx.clone(), anchors));
380357
}
381358
}
382359
}

0 commit comments

Comments
 (0)