Skip to content

Commit 0bb3ea6

Browse files
authored
Merge pull request #2080 from mintlayer/mempool_shrink_hash_tables
Mempool: fix spuriouly failing test; shrink hash tables; some cleanup.
2 parents 0537dc9 + 818b349 commit 0bb3ea6

8 files changed

Lines changed: 376 additions & 56 deletions

File tree

mempool/src/pool/tests/orphans.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,9 +258,9 @@ async fn transaction_graph_subset_permutation(#[case] seed: Seed) {
258258

259259
log::info!(
260260
"Stats: count {}, memory {}, encoded size {}",
261-
mempool.tx_store().txs_by_id.len(),
261+
mempool.tx_store().txs_by_id().len(),
262262
mempool.memory_usage(),
263-
mempool.tx_store().txs_by_id.values().map(|e| e.size().get()).sum::<usize>(),
263+
mempool.tx_store().txs_by_id().values().map(|e| e.size().get()).sum::<usize>(),
264264
);
265265

266266
// Check the final state of each transaction in the original sequence

mempool/src/pool/tx_pool/collect_txs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ pub fn collect_txs<M>(
133133
// Transaction IDs taken from mempool to fill in the rest of the block
134134
let mempool_txids = {
135135
// Get transactions from mempool by score
136-
let txids = mempool.store.txs_by_ancestor_score.iter().map(|x| &x.1).rev();
136+
let txids = mempool.store.txs_by_ancestor_score().iter().map(|x| &x.1).rev();
137137
// Take the appropriate amount of them as determined by the packing strategy
138138
txids.take(match packing_strategy {
139139
PackingStrategy::FillSpaceFromMempool => usize::MAX,

mempool/src/pool/tx_pool/mod.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ impl<M> TxPool<M> {
172172

173173
pub fn get_all(&self) -> Vec<SignedTransaction> {
174174
self.store
175-
.txs_by_descendant_score
175+
.txs_by_descendant_score()
176176
.iter()
177177
.map(|(_score, id)| self.store.get_entry(id).expect("entry").transaction().clone())
178178
.collect()
@@ -486,7 +486,7 @@ impl<M: MemoryUsageEstimator> TxPool<M> {
486486
conflicts_with_descendants: &StoreHashSet<Id<Transaction>>,
487487
) -> Result<Fee, MempoolPolicyError> {
488488
let conflicts_with_descendants = conflicts_with_descendants.iter().map(|conflict_id| {
489-
self.store.txs_by_id.get(conflict_id).expect("tx should exist in mempool")
489+
self.store.txs_by_id().get(conflict_id).expect("tx should exist in mempool")
490490
});
491491

492492
let total_conflict_fees = conflicts_with_descendants
@@ -626,7 +626,7 @@ impl<M: MemoryUsageEstimator> TxPool<M> {
626626

627627
let expired_ids = self
628628
.store
629-
.txs_by_creation_time
629+
.txs_by_creation_time()
630630
.iter()
631631
// Note: entries in txs_by_creation_time are sorted by the creation time in ascending order,
632632
// so once we find a tx that is not expired, the rest will not be expired either.
@@ -653,16 +653,22 @@ impl<M: MemoryUsageEstimator> TxPool<M> {
653653

654654
fn trim(&mut self) -> Result<Vec<FeeRate>, MempoolPolicyError> {
655655
let mut removed_fees = Vec::new();
656-
while !self.store.is_empty() && self.memory_usage() > self.max_size.as_bytes() {
656+
loop {
657+
self.store.shrink_capacity_if_needed();
658+
659+
if self.store.is_empty() || self.memory_usage() <= self.max_size.as_bytes() {
660+
break;
661+
}
662+
657663
// TODO sort by descendant score, not by fee
658664
let removed_id = self
659665
.store
660-
.txs_by_descendant_score
666+
.txs_by_descendant_score()
661667
.iter()
662668
.map(|(_score, entry)| *entry)
663669
.next()
664670
.expect("pool not empty");
665-
let removed = self.store.txs_by_id.get(&removed_id).expect("tx with id should exist");
671+
let removed = self.store.txs_by_id().get(&removed_id).expect("tx with id should exist");
666672

667673
log::debug!(
668674
"Mempool trim: Evicting tx {:x} which has a descendant score of {:?} and has size {}",
@@ -673,6 +679,7 @@ impl<M: MemoryUsageEstimator> TxPool<M> {
673679
removed_fees.push(FeeRate::from_total_tx_fee(removed.fee(), removed.size())?);
674680
self.remove_tx_and_descendants(&removed_id, MempoolRemovalReason::SizeLimit);
675681
}
682+
676683
Ok(removed_fees)
677684
}
678685

@@ -946,8 +953,8 @@ impl<M: MemoryUsageEstimator> TxPool<M> {
946953
in_top_x_mb,
947954
&self.mempool_config,
948955
&self.rolling_fee_rate.read(),
949-
&self.store.txs_by_descendant_score,
950-
&self.store.txs_by_id,
956+
self.store.txs_by_descendant_score(),
957+
self.store.txs_by_id(),
951958
)
952959
}
953960

@@ -991,8 +998,8 @@ impl<M: MemoryUsageEstimator> TxPool<M> {
991998
num_points,
992999
&self.mempool_config,
9931000
&self.rolling_fee_rate.read(),
994-
&self.store.txs_by_descendant_score,
995-
&self.store.txs_by_id,
1001+
self.store.txs_by_descendant_score(),
1002+
self.store.txs_by_id(),
9961003
)
9971004
}
9981005

mempool/src/pool/tx_pool/store/mod.rs

Lines changed: 107 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ pub struct MempoolStore {
124124
// and doesn't free the memory when an item is removed - it's only replaced with a tombstone.
125125
// Since TxMempoolEntry is relatively big (size_of = 350+ bytes), we'd waste a noticeable
126126
// amount of memory without boxing.)
127-
pub txs_by_id: TrackedHashMap<Id<Transaction>, Tracked<Box<TxMempoolEntry>, StrictDropPolicy>>,
127+
txs_by_id: TrackedHashMap<Id<Transaction>, Tracked<Box<TxMempoolEntry>, StrictDropPolicy>>,
128128

129129
// Mempool entries sorted by descendant score.
130130
// We keep this index so that when the mempool grows full, we know which transactions are the
@@ -136,22 +136,22 @@ pub struct MempoolStore {
136136
// max(fee/size of entry's tx, fee/size with all descendants).
137137
// TODO if we wish to follow Bitcoin Core, "size" is not simply the encoded size, but
138138
// rather a value that takes into account witness and sigop data (see CTxMemPoolEntry::GetTxSize).
139-
pub txs_by_descendant_score: TrackedTxIdMultiMap<DescendantScore>,
139+
txs_by_descendant_score: TrackedTxIdMultiMap<DescendantScore>,
140140

141141
// Mempool entries sorted by ancestor score.
142142
// This is used to select the most economically attractive transactions for block production.
143143
// The ancestor score of an entry is defined as
144144
// min(fee/size of entry's tx, fee/size with all ancestors).
145-
pub txs_by_ancestor_score: TrackedTxIdMultiMap<AncestorScore>,
145+
txs_by_ancestor_score: TrackedTxIdMultiMap<AncestorScore>,
146146

147147
// Entries that have remained in the mempool for a long time (see DEFAULT_MEMPOOL_EXPIRY) are
148148
// evicted. To efficiently know which entries to evict, we store the mempool entries sorted by
149149
// their creation time, from earliest to latest.
150-
pub txs_by_creation_time: TrackedTxIdMultiMap<Time>,
150+
txs_by_creation_time: TrackedTxIdMultiMap<Time>,
151151

152152
// We keep the information of which inputs are spent by entries currently in the mempool.
153153
// This allows us to recognize conflicts (double-spends) and handle them
154-
pub spender_txs: Tracked<BTreeMap<TxDependency, Id<Transaction>>>,
154+
spender_txs: Tracked<BTreeMap<TxDependency, Id<Transaction>>>,
155155

156156
// Track transactions by internal unique sequence number. This is used to recover the order in
157157
// which the transactions have been inserted into the mempool, so they can be re-inserted in
@@ -243,6 +243,29 @@ impl MempoolStore {
243243
self.mem_tracker.get_usage()
244244
}
245245

246+
pub fn txs_by_id(
247+
&self,
248+
) -> &TrackedHashMap<Id<Transaction>, Tracked<Box<TxMempoolEntry>, StrictDropPolicy>> {
249+
&self.txs_by_id
250+
}
251+
252+
pub fn txs_by_descendant_score(&self) -> &TrackedTxIdMultiMap<DescendantScore> {
253+
&self.txs_by_descendant_score
254+
}
255+
256+
pub fn txs_by_ancestor_score(&self) -> &TrackedTxIdMultiMap<AncestorScore> {
257+
&self.txs_by_ancestor_score
258+
}
259+
260+
pub fn txs_by_creation_time(&self) -> &TrackedTxIdMultiMap<Time> {
261+
&self.txs_by_creation_time
262+
}
263+
264+
#[cfg(test)]
265+
pub fn seq_nos_by_tx(&self) -> &TrackedHashMap<Id<Transaction>, usize> {
266+
&self.seq_nos_by_tx
267+
}
268+
246269
pub fn assert_valid(&self) {
247270
#[cfg(test)]
248271
self.assert_valid_inner()
@@ -663,8 +686,87 @@ impl MempoolStore {
663686
let entry = self.get_existing_entry(tx_id)?;
664687
entry.collect_cluster(self)
665688
}
689+
690+
/// For internal containers that have capacity, check if the capacity is excessive; shrink
691+
/// the container if it is.
692+
pub fn shrink_capacity_if_needed(&mut self) {
693+
// Note:
694+
// * Hashbrown tables never shrink their capacity automatically.
695+
// * According to the pseudo-test `estimate_max_tx_count_in_store`, the store with the default
696+
// size of 300Mb can fit over 230'000 txs of the smallest possible size. Due to how hashbrown
697+
// tables work (1/8 of all buckets should always be empty, and reallocation doubles the number
698+
// of buckets), `txs_by_id` and `seq_nos_by_tx` may end up with more than 500'000 buckets each.
699+
// Given that the bucket size in each table is 40 bytes (in non-test builds), this results in
700+
// roughly 20Mb of allocated memory per table, which will not go down even if the tables'
701+
// element counts become zero. Since table's entire allocation_size counts towards the
702+
// mempool size, this will effectively reduce the max mempool size by 40Mb.
703+
// * On the other hand, the mempool re-creates its store completely every time a new block
704+
// arrives, so the situation described above can only exist for a few minutes. Still,
705+
// it's better for the store not to depend on such a behavior of its owner code and
706+
// manage the capacities explicitly.
707+
708+
// Implementation notes:
709+
// * table's `capacity` doesn't count the tombstones, so in a degenerate case like the one
710+
// described above it's possible to have a table with a huge allocation size and small
711+
// capacity. So below we don't use capacity when deciding whether to shrink, and estimate
712+
// (roughly) the number of buckets instead.
713+
// * even though `shrink_to` accepts capacity, it'll compare the estimated number of buckets
714+
// (from the passed capacity) with the current one and reallocate/rehash the table if the
715+
// latter is bigger.
716+
717+
fn maybe_shrink<K, V>(
718+
table: &mut TrackedHashMap<K, V>,
719+
mem_tracker: &mut MemUsageTracker,
720+
table_name: &str,
721+
) where
722+
K: Eq + std::hash::Hash,
723+
{
724+
let bucket_size = hash_map_bucket_size(table);
725+
let bucket_count = hash_map_bucket_count_upper_bound(table);
726+
727+
let max_bucket_count = table.len() * HASH_TABLE_MAX_BUCKET_COUNT_FACTOR;
728+
let adjusted_capacity = table.len() * HASH_TABLE_ADJUSTED_CAPACITY_FACTOR;
729+
730+
if bucket_count > max_bucket_count {
731+
let potentially_reclaimable_mem_size =
732+
(bucket_count - adjusted_capacity) * bucket_size;
733+
734+
// Only bother shrinking if the win is noticeable.
735+
if potentially_reclaimable_mem_size >= HASH_TABLE_MIN_RECLAIMABLE_MEM_SIZE {
736+
log::debug!("Shrinking {table_name} to {adjusted_capacity}");
737+
mem_tracker.modify(table, |table, _| table.shrink_to(adjusted_capacity));
738+
}
739+
}
740+
}
741+
742+
maybe_shrink(&mut self.txs_by_id, &mut self.mem_tracker, "txs_by_id");
743+
maybe_shrink(
744+
&mut self.seq_nos_by_tx,
745+
&mut self.mem_tracker,
746+
"seq_nos_by_tx",
747+
);
748+
}
749+
}
750+
751+
pub fn hash_map_bucket_size<K, V>(_: &StoreHashMap<K, V>) -> usize {
752+
std::mem::size_of::<(K, V)>()
753+
}
754+
755+
// Return the upper bound for the number of buckets in the map.
756+
pub fn hash_map_bucket_count_upper_bound<K, V>(map: &StoreHashMap<K, V>) -> usize
757+
where
758+
K: Eq + std::hash::Hash,
759+
{
760+
// Note: the actual number of buckets will be smaller than this, because `allocation_size` also
761+
// includes control bytes and padding.
762+
map.allocation_size() / hash_map_bucket_size(map)
666763
}
667764

765+
// Constants that determine whether store's hash tables should be shrunk and, if yes, to what capacity.
766+
pub const HASH_TABLE_MAX_BUCKET_COUNT_FACTOR: usize = 5;
767+
pub const HASH_TABLE_ADJUSTED_CAPACITY_FACTOR: usize = 2;
768+
pub const HASH_TABLE_MIN_RECLAIMABLE_MEM_SIZE: usize = 10_000;
769+
668770
#[cfg(test)]
669771
impl Drop for MempoolStore {
670772
fn drop(&mut self) {

mempool/src/pool/tx_pool/tests/accumulator.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,10 @@ async fn collect_transactions(#[case] seed: Seed) -> anyhow::Result<()> {
259259
.unwrap();
260260
let collected_txs = returned_accumulator.unwrap();
261261
let collected_txs = collected_txs.transactions();
262-
log::debug!("ancestor index: {:?}", mempool.store.txs_by_ancestor_score);
262+
log::debug!(
263+
"ancestor index: {:?}",
264+
mempool.store.txs_by_ancestor_score()
265+
);
263266
let expected_num_txs_collected = 6;
264267
assert_eq!(collected_txs.len(), expected_num_txs_collected);
265268
let total_tx_size: usize = collected_txs.iter().map(|tx| tx.encoded_size()).sum();

0 commit comments

Comments
 (0)