Skip to content

Commit 3d43f56

Browse files
authored
Compact more aggressively in TopK based upon memory usage (#20381)
## Which issue does this PR close? Sort of addresses #19386 ## Rationale for this change Compaction in TopK values currently uses some hard set heuristics to decide when to compact. Instead we can use the memory size of the batches as a bound. ## What changes are included in this PR? Adjusts TopK compaction to compact more aggressively, based upon memory size. ## Are these changes tested? Yes and a test has been added. ## Are there any user-facing changes? No ## Benchmarks I'm struggling (in this PR and other PRs...) to get some good reliable benchmarks through. However with this one it seems like it's *mostly* the same speed as `main` or a little faster in some cases: ``` + critcmp main topk_memory_batch group main topk_memory_batch ----- ---- ----------------- aggregate 10000000 time-series rows 1.05 45.5±2.13ms ? ?/sec 1.00 43.5±2.09ms ? ?/sec aggregate 10000000 worst-case rows 1.06 43.9±1.12ms ? ?/sec 1.00 41.5±1.03ms ? ?/sec distinct 10000000 rows asc [TopK] 1.00 4.3±0.25ms ? ?/sec 1.00 4.3±0.07ms ? ?/sec distinct 10000000 rows asc [no TopK] 1.00 42.9±1.83ms ? ?/sec 1.06 45.5±1.65ms ? ?/sec distinct 10000000 rows desc [TopK] 1.00 4.3±0.07ms ? ?/sec 1.01 4.3±0.06ms ? ?/sec distinct 10000000 rows desc [no TopK] 1.00 42.4±1.48ms ? ?/sec 1.05 44.7±1.88ms ? ?/sec top k=10 aggregate 10000000 time-series rows 1.13 10.6±0.54ms ? ?/sec 1.00 9.4±0.64ms ? ?/sec top k=10 aggregate 10000000 time-series rows [Utf8View] 1.08 11.0±0.65ms ? ?/sec 1.00 10.2±0.48ms ? ?/sec top k=10 aggregate 10000000 worst-case rows 1.04 16.8±1.47ms ? ?/sec 1.00 16.1±1.44ms ? ?/sec top k=10 aggregate 10000000 worst-case rows [Utf8View] 1.10 17.9±1.51ms ? ?/sec 1.00 16.2±1.16ms ? ?/sec top k=10 string aggregate 10000000 time-series rows [Utf8View] 1.11 9.1±0.36ms ? ?/sec 1.00 8.2±0.35ms ? ?/sec top k=10 string aggregate 10000000 time-series rows [Utf8] 1.12 7.8±0.48ms ? ?/sec 1.00 6.9±0.26ms ? ?/sec top k=10 string aggregate 10000000 worst-case rows [Utf8View] 1.06 8.6±0.21ms ? ?/sec 1.00 8.1±0.20ms ? ?/sec top k=10 string aggregate 10000000 worst-case rows [Utf8] 1.07 7.4±0.17ms ? ?/sec 1.00 6.9±0.13ms ? ?/sec ```
1 parent 49876bf commit 3d43f56

1 file changed

Lines changed: 206 additions & 26 deletions

File tree

  • datafusion/physical-plan/src/topk

datafusion/physical-plan/src/topk/mod.rs

Lines changed: 206 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ impl TopK {
221221
expr,
222222
row_converter,
223223
scratch_rows,
224-
heap: TopKHeap::new(k, batch_size),
224+
heap: TopKHeap::new(k),
225225
common_sort_prefix_converter: prefix_row_converter,
226226
common_sort_prefix: Arc::from(common_sort_prefix),
227227
finished: false,
@@ -663,8 +663,6 @@ impl TopKMetrics {
663663
struct TopKHeap {
664664
/// The maximum number of elements to store in this heap.
665665
k: usize,
666-
/// The target number of rows for output batches
667-
batch_size: usize,
668666
/// Storage for up at most `k` items using a BinaryHeap. Reversed
669667
/// so that the smallest k so far is on the top
670668
inner: BinaryHeap<TopKRow>,
@@ -675,11 +673,10 @@ struct TopKHeap {
675673
}
676674

677675
impl TopKHeap {
678-
fn new(k: usize, batch_size: usize) -> Self {
676+
fn new(k: usize) -> Self {
679677
assert!(k > 0);
680678
Self {
681679
k,
682-
batch_size,
683680
inner: BinaryHeap::new(),
684681
store: RecordBatchStore::new(),
685682
owned_bytes: 0,
@@ -792,24 +789,26 @@ impl TopKHeap {
792789
/// Compact this heap, rewriting all stored batches into a single
793790
/// input batch
794791
pub fn maybe_compact(&mut self) -> Result<()> {
795-
// we compact if the number of "unused" rows in the store is
796-
// past some pre-defined threshold. Target holding up to
797-
// around 20 batches, but handle cases of large k where some
798-
// batches might be partially full
799-
let max_unused_rows = (20 * self.batch_size) + self.k;
800-
let unused_rows = self.store.unused_rows();
801-
802-
// don't compact if the store has one extra batch or
803-
// unused rows is under the threshold
804-
if self.store.len() <= 2 || unused_rows < max_unused_rows {
792+
// Don't compact if there's only one batch (compacting into itself is pointless)
793+
if self.store.len() <= 1 {
794+
return Ok(());
795+
}
796+
797+
let total_rows = self.store.total_rows;
798+
let num_rows = self.inner.len();
799+
800+
// Compact when current store memory exceeds 2x what the compacted
801+
// result would need. The multiplier avoids compacting when the
802+
// savings would be marginal.
803+
if total_rows <= num_rows * 2 {
805804
return Ok(());
806805
}
806+
807807
// at first, compact the entire thing always into a new batch
808808
// (maybe we can get fancier in the future about ignoring
809809
// batches that have a high usage ratio already
810810

811811
// Note: new batch is in the same order as inner
812-
let num_rows = self.inner.len();
813812
let (new_batch, mut topk_rows) = self.emit_with_state()?;
814813
let Some(new_batch) = new_batch else {
815814
return Ok(());
@@ -969,6 +968,8 @@ struct RecordBatchStore {
969968
batches: HashMap<u32, RecordBatchEntry>,
970969
/// total size of all record batches tracked by this store
971970
batches_size: usize,
971+
/// row count of all the batches
972+
total_rows: usize,
972973
}
973974

974975
impl RecordBatchStore {
@@ -977,6 +978,7 @@ impl RecordBatchStore {
977978
next_id: 0,
978979
batches: HashMap::new(),
979980
batches_size: 0,
981+
total_rows: 0,
980982
}
981983
}
982984

@@ -994,6 +996,7 @@ impl RecordBatchStore {
994996
// uses of 0 means that none of the rows in the batch were stored in the topk
995997
if entry.uses > 0 {
996998
self.batches_size += get_record_batch_memory_size(&entry.batch);
999+
self.total_rows += entry.batch.num_rows();
9971000
self.batches.insert(entry.id, entry);
9981001
}
9991002
}
@@ -1002,6 +1005,7 @@ impl RecordBatchStore {
10021005
fn clear(&mut self) {
10031006
self.batches.clear();
10041007
self.batches_size = 0;
1008+
self.total_rows = 0;
10051009
}
10061010

10071011
fn get(&self, id: u32) -> Option<&RecordBatchEntry> {
@@ -1013,15 +1017,6 @@ impl RecordBatchStore {
10131017
self.batches.len()
10141018
}
10151019

1016-
/// Returns the total number of rows in batches minus the number
1017-
/// which are in use
1018-
fn unused_rows(&self) -> usize {
1019-
self.batches
1020-
.values()
1021-
.map(|batch_entry| batch_entry.batch.num_rows() - batch_entry.uses)
1022-
.sum()
1023-
}
1024-
10251020
/// returns true if the store has nothing stored
10261021
fn is_empty(&self) -> bool {
10271022
self.batches.is_empty()
@@ -1045,6 +1040,11 @@ impl RecordBatchStore {
10451040
.batches_size
10461041
.checked_sub(get_record_batch_memory_size(&old_entry.batch))
10471042
.unwrap();
1043+
1044+
self.total_rows = self
1045+
.total_rows
1046+
.checked_sub(old_entry.batch.num_rows())
1047+
.unwrap();
10481048
}
10491049
}
10501050

@@ -1060,7 +1060,7 @@ impl RecordBatchStore {
10601060
#[cfg(test)]
10611061
mod tests {
10621062
use super::*;
1063-
use arrow::array::{Float64Array, Int32Array};
1063+
use arrow::array::{BooleanArray, Float64Array, Int32Array};
10641064
use arrow::datatypes::{DataType, Field, Schema};
10651065
use arrow_schema::SortOptions;
10661066
use datafusion_common::assert_batches_eq;
@@ -1243,4 +1243,184 @@ mod tests {
12431243

12441244
Ok(())
12451245
}
1246+
1247+
/// Tests that memory-based compaction triggers when a large batch
1248+
/// has very few rows referenced by the top-k heap.
1249+
#[tokio::test]
1250+
async fn test_topk_memory_compaction() -> Result<()> {
1251+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1252+
1253+
let sort_expr = PhysicalSortExpr {
1254+
expr: col("a", schema.as_ref())?,
1255+
options: SortOptions::default(),
1256+
};
1257+
1258+
let full_expr = LexOrdering::from([sort_expr.clone()]);
1259+
let prefix = vec![sort_expr];
1260+
1261+
let runtime = Arc::new(RuntimeEnv::default());
1262+
let metrics = ExecutionPlanMetricsSet::new();
1263+
1264+
let k = 5;
1265+
let mut topk = TopK::try_new(
1266+
0,
1267+
Arc::clone(&schema),
1268+
prefix,
1269+
full_expr,
1270+
k,
1271+
8192,
1272+
runtime,
1273+
&metrics,
1274+
Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
1275+
DynamicFilterPhysicalExpr::new(vec![], lit(true)),
1276+
)))),
1277+
)?;
1278+
1279+
// Insert a large batch (100,000 rows) with values 1..=100_000.
1280+
// Only the smallest 5 values (1..=5) will end up in the heap.
1281+
let large_values: Vec<i32> = (1..=100_000).collect();
1282+
let array1: ArrayRef = Arc::new(Int32Array::from(large_values));
1283+
let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![array1])?;
1284+
topk.insert_batch(batch1)?;
1285+
1286+
// After the first batch, store has 1 batch — compaction should
1287+
// not trigger (guard: store.len() <= 1).
1288+
assert_eq!(
1289+
topk.heap.store.len(),
1290+
1,
1291+
"should have 1 batch before second insert"
1292+
);
1293+
1294+
// Insert a second batch whose values displace entries in the heap.
1295+
// -1 and 0 are smaller than the current top-5 (1..=5), so they
1296+
// produce 2 replacements. With replacements > 0, `insert_batch`
1297+
// calls `insert_batch_entry` (briefly making store.len() == 2)
1298+
// and then `maybe_compact`, which should collapse it back to 1.
1299+
let array2: ArrayRef = Arc::new(Int32Array::from(vec![-1, 0]));
1300+
let batch2 = RecordBatch::try_new(Arc::clone(&schema), vec![array2])?;
1301+
let replacements_before = topk.metrics.row_replacements.value();
1302+
topk.insert_batch(batch2)?;
1303+
1304+
// Sanity check: batch2 was actually integrated. Without
1305+
// replacements, `maybe_compact` is never called and the
1306+
// store-length assertion below would pass vacuously.
1307+
assert!(
1308+
topk.metrics.row_replacements.value() > replacements_before,
1309+
"batch2 must produce replacements so compaction is exercised"
1310+
);
1311+
1312+
// The compacted-estimate guard is `total_rows <= num_rows * 2`,
1313+
// i.e. 100_002 <= 10, which is false, so compaction fires and
1314+
// collapses the two stored batches back into one.
1315+
assert_eq!(
1316+
topk.heap.store.len(),
1317+
1,
1318+
"store should be compacted to 1 batch"
1319+
);
1320+
1321+
// Verify the emitted results are correct (top 5 ascending).
1322+
let results: Vec<_> = topk.emit()?.try_collect().await?;
1323+
assert_batches_eq!(
1324+
&[
1325+
"+----+", "| a |", "+----+", "| -1 |", "| 0 |", "| 1 |", "| 2 |",
1326+
"| 3 |", "+----+",
1327+
],
1328+
&results
1329+
);
1330+
1331+
Ok(())
1332+
}
1333+
1334+
/// Negative path: when stored rows are close to the heap size,
1335+
/// compaction must NOT fire even with multiple batches present,
1336+
/// because the savings would be marginal
1337+
/// (guard: `total_rows <= num_rows * 2`).
1338+
///
1339+
/// Uses a bit-packed `BooleanArray` so that future changes to the
1340+
/// compaction heuristic that reintroduce a per-byte estimate
1341+
/// (where integer truncation could misbehave on sub-byte types)
1342+
/// are caught here.
1343+
#[tokio::test]
1344+
async fn test_topk_memory_compaction_skipped_when_marginal() -> Result<()> {
1345+
let schema =
1346+
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
1347+
1348+
let sort_expr = PhysicalSortExpr {
1349+
expr: col("a", schema.as_ref())?,
1350+
options: SortOptions::default(),
1351+
};
1352+
let full_expr = LexOrdering::from([sort_expr.clone()]);
1353+
let prefix = vec![sort_expr];
1354+
1355+
let runtime = Arc::new(RuntimeEnv::default());
1356+
let metrics = ExecutionPlanMetricsSet::new();
1357+
1358+
let k = 10;
1359+
let mut topk = TopK::try_new(
1360+
0,
1361+
Arc::clone(&schema),
1362+
prefix,
1363+
full_expr,
1364+
k,
1365+
8192,
1366+
runtime,
1367+
&metrics,
1368+
Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
1369+
DynamicFilterPhysicalExpr::new(vec![], lit(true)),
1370+
)))),
1371+
)?;
1372+
1373+
// Two small batches; every row from both batches ends up referenced
1374+
// by the heap, so total_rows == num_rows == 10.
1375+
let batch1 = RecordBatch::try_new(
1376+
Arc::clone(&schema),
1377+
vec![
1378+
Arc::new(BooleanArray::from(vec![false, false, true, true, true]))
1379+
as ArrayRef,
1380+
],
1381+
)?;
1382+
topk.insert_batch(batch1)?;
1383+
1384+
let batch2 = RecordBatch::try_new(
1385+
Arc::clone(&schema),
1386+
vec![
1387+
Arc::new(BooleanArray::from(vec![false, false, false, true, true]))
1388+
as ArrayRef,
1389+
],
1390+
)?;
1391+
topk.insert_batch(batch2)?;
1392+
1393+
// Guard `total_rows <= num_rows * 2` should hold (10 <= 20),
1394+
// so compaction is skipped and BOTH batches remain in the store.
1395+
assert_eq!(
1396+
topk.heap.store.len(),
1397+
2,
1398+
"store must keep 2 batches when savings would be marginal"
1399+
);
1400+
assert_eq!(topk.heap.inner.len(), 10, "heap should hold all 10 rows");
1401+
1402+
// Output is still correct (5 falses then 5 trues ascending).
1403+
let results: Vec<_> = topk.emit()?.try_collect().await?;
1404+
assert_batches_eq!(
1405+
&[
1406+
"+-------+",
1407+
"| a |",
1408+
"+-------+",
1409+
"| false |",
1410+
"| false |",
1411+
"| false |",
1412+
"| false |",
1413+
"| false |",
1414+
"| true |",
1415+
"| true |",
1416+
"| true |",
1417+
"| true |",
1418+
"| true |",
1419+
"+-------+",
1420+
],
1421+
&results
1422+
);
1423+
1424+
Ok(())
1425+
}
12461426
}

0 commit comments

Comments
 (0)