Skip to content

Commit 23003a7

Browse files
wkaltclaude
andauthored
perf: avoid materializing RoaringBitmap::full() in fragment allow-list (#6664)
The fragment-bitmap allow-list filter dominates the cost of merge_insert and vector-search prefilter on tables that have received writes since their index was built. The filter ANDs an AllowList of full fragments with the deletion BlockList. RowAddrMask::bitand expands AllowList & BlockList into AllowList - BlockList, and the per-fragment (Full - Partial) branch in RowAddrTreeMap::sub_assign materializes RoaringBitmap::full() for every fragment with deletions. Build the equivalent BlockList directly (deletions union rows in fragments outside the index bitmap) using only Full markers and a HashMap clone. Existing #6563 stale-index tests cover correctness. --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e59f4bd commit 23003a7

3 files changed

Lines changed: 257 additions & 12 deletions

File tree

rust/lance/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@ required-features = ["cli"]
158158
name = "scalar_index"
159159
harness = false
160160

161+
[[bench]]
162+
name = "merge_insert"
163+
harness = false
164+
161165
[[bench]]
162166
name = "scan"
163167
harness = false

rust/lance/benches/merge_insert.rs

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
//
4+
//! Benchmarks for `merge_insert` with a scalar index on the merge key.
5+
//!
6+
//! The interesting case is the dataset shape that exercises the fragment-
7+
//! bitmap allow-list path in `DatasetPreFilter::create_restricted_deletion_mask`:
8+
//!
9+
//! - some fragments live OUTSIDE the index's `fragment_bitmap`
10+
//! (i.e. data was appended after the index was built, or partially
11+
//! rewritten without re-indexing), AND
12+
//! - some fragments INSIDE the bitmap have a deletion file.
13+
//!
14+
//! Both conditions together force the slow `AllowList(Full) - BlockList(Partial)`
15+
//! computation. The other shapes (`clean`, `with_new_rows_only`,
16+
//! `with_deletions_only`) skip that branch and serve as controls.
17+
//!
18+
//! Run with `cargo bench --bench merge_insert`.
19+
20+
use std::sync::Arc;
21+
22+
use arrow_array::{Int64Array, RecordBatch, RecordBatchIterator};
23+
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
24+
use criterion::{Criterion, criterion_group, criterion_main};
25+
use lance::dataset::write::merge_insert::{MergeInsertBuilder, WhenMatched, WhenNotMatched};
26+
use lance::dataset::{Dataset, WriteMode, WriteParams};
27+
use lance::index::DatasetIndexExt;
28+
use lance_core::utils::tempfile::TempStrDir;
29+
use lance_index::IndexType;
30+
use lance_index::scalar::ScalarIndexParams;
31+
#[cfg(target_os = "linux")]
32+
use pprof::criterion::{Output, PProfProfiler};
33+
34+
// Many small fragments to amplify the slow path: each indexed fragment with
35+
// a deletion file produces one RoaringBitmap::full() allocation per
36+
// merge_insert call. Cost scales linearly with NUM_FRAGS.
37+
const ROWS_PER_FRAG: u64 = 100;
38+
const NUM_FRAGS: u64 = 200;
39+
// Minimal schema so the merge_insert baseline (sort, hash-join, write) is small
40+
// and the prefilter overhead dominates the measurement.
41+
fn schema() -> Arc<ArrowSchema> {
42+
Arc::new(ArrowSchema::new(vec![Field::new(
43+
"id",
44+
DataType::Int64,
45+
false,
46+
)]))
47+
}
48+
49+
fn make_batch(start_id: i64, n: usize) -> RecordBatch {
50+
let ids = Int64Array::from_iter_values(start_id..start_id + n as i64);
51+
RecordBatch::try_new(schema(), vec![Arc::new(ids)]).unwrap()
52+
}
53+
54+
fn make_batches(start_id: i64, total_rows: u64) -> Vec<RecordBatch> {
55+
let mut out = Vec::new();
56+
let mut remaining = total_rows;
57+
let mut next_start = start_id;
58+
while remaining > 0 {
59+
let n = remaining.min(ROWS_PER_FRAG) as usize;
60+
out.push(make_batch(next_start, n));
61+
next_start += n as i64;
62+
remaining -= n as u64;
63+
}
64+
out
65+
}
66+
67+
async fn build_indexed_base(path: &str) -> Dataset {
68+
let total = ROWS_PER_FRAG * NUM_FRAGS;
69+
let params = WriteParams {
70+
max_rows_per_file: ROWS_PER_FRAG as usize,
71+
max_rows_per_group: ROWS_PER_FRAG as usize,
72+
mode: WriteMode::Create,
73+
..Default::default()
74+
};
75+
let batches = make_batches(0, total);
76+
let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema());
77+
Dataset::write(reader, path, Some(params)).await.unwrap();
78+
79+
let mut ds = Dataset::open(path).await.unwrap();
80+
ds.create_index(
81+
&["id"],
82+
IndexType::BTree,
83+
None,
84+
&ScalarIndexParams::default(),
85+
true,
86+
)
87+
.await
88+
.unwrap();
89+
ds
90+
}
91+
92+
async fn append_rows(path: &str, base_id: i64, n: usize) -> Dataset {
93+
let batch = make_batch(base_id, n);
94+
let reader = RecordBatchIterator::new(std::iter::once(Ok(batch)), schema());
95+
let params = WriteParams {
96+
max_rows_per_file: n,
97+
max_rows_per_group: n,
98+
mode: WriteMode::Append,
99+
..Default::default()
100+
};
101+
Dataset::write(reader, path, Some(params)).await.unwrap();
102+
Dataset::open(path).await.unwrap()
103+
}
104+
105+
async fn delete_some_indexed_rows(ds: &mut Dataset) {
106+
// Delete a sparse pattern that lands in EVERY indexed fragment (one row
107+
// per fragment, since ROWS_PER_FRAG = 100 and we delete `id % 100 == 0`).
108+
// Each affected fragment gets its own deletion file inside the bitmap,
109+
// which is what scales the slow `RoaringBitmap::full()` materialization
110+
// path: one allocation per fragment per merge_insert call.
111+
ds.delete("id % 100 == 0").await.unwrap();
112+
}
113+
114+
/// One merge_insert op: 30 updates of existing IDs + 70 inserts of new IDs.
115+
async fn one_merge_insert(ds: Arc<Dataset>, base_existing: i64, base_new: i64) {
116+
let mut ids: Vec<i64> = (0..30).map(|i| base_existing + i).collect();
117+
ids.extend(base_new..base_new + 70);
118+
let id_arr = Int64Array::from(ids);
119+
let batch = RecordBatch::try_new(schema(), vec![Arc::new(id_arr)]).unwrap();
120+
let reader = RecordBatchIterator::new(std::iter::once(Ok(batch)), schema());
121+
122+
let mut builder = MergeInsertBuilder::try_new(ds, vec!["id".to_string()]).unwrap();
123+
builder
124+
.when_matched(WhenMatched::UpdateAll)
125+
.when_not_matched(WhenNotMatched::InsertAll);
126+
let job = builder.try_build().unwrap();
127+
let _ = job.execute_reader(reader).await.unwrap();
128+
}
129+
130+
async fn build_clean() -> (TempStrDir, Arc<Dataset>) {
131+
let dir = TempStrDir::default();
132+
let path = dir.as_str().to_string();
133+
let ds = build_indexed_base(&path).await;
134+
(dir, Arc::new(ds))
135+
}
136+
137+
async fn build_with_new_rows_only() -> (TempStrDir, Arc<Dataset>) {
138+
let dir = TempStrDir::default();
139+
let path = dir.as_str().to_string();
140+
build_indexed_base(&path).await;
141+
let base = (ROWS_PER_FRAG * NUM_FRAGS) as i64;
142+
let ds = append_rows(&path, base, 500).await;
143+
(dir, Arc::new(ds))
144+
}
145+
146+
async fn build_with_deletions_only() -> (TempStrDir, Arc<Dataset>) {
147+
let dir = TempStrDir::default();
148+
let path = dir.as_str().to_string();
149+
let mut ds = build_indexed_base(&path).await;
150+
delete_some_indexed_rows(&mut ds).await;
151+
(dir, Arc::new(ds))
152+
}
153+
154+
async fn build_with_new_rows_and_deletions() -> (TempStrDir, Arc<Dataset>) {
155+
let dir = TempStrDir::default();
156+
let path = dir.as_str().to_string();
157+
build_indexed_base(&path).await;
158+
let base = (ROWS_PER_FRAG * NUM_FRAGS) as i64;
159+
let mut ds = append_rows(&path, base, 500).await;
160+
delete_some_indexed_rows(&mut ds).await;
161+
(dir, Arc::new(ds))
162+
}
163+
164+
fn bench_one_shape<F, Fut>(c: &mut Criterion, name: &str, builder: F)
165+
where
166+
F: FnOnce() -> Fut,
167+
Fut: std::future::Future<Output = (TempStrDir, Arc<Dataset>)>,
168+
{
169+
let rt = tokio::runtime::Runtime::new().unwrap();
170+
let (_dir, ds) = rt.block_on(builder());
171+
// Cache a snapshot version so each iteration restores to the same baseline.
172+
let base_version = ds.version().version;
173+
let path = ds.uri().to_string();
174+
let total = rt.block_on(ds.count_rows(None)).unwrap() as i64;
175+
176+
c.bench_function(name, |b| {
177+
b.iter(|| {
178+
rt.block_on(async {
179+
// Restore to the base version so the bench measures a single
180+
// merge_insert against the same dataset shape every time.
181+
let bench_ds = Dataset::open(&path).await.unwrap();
182+
let mut bench_ds = bench_ds.checkout_version(base_version).await.unwrap();
183+
bench_ds.restore().await.unwrap();
184+
let arc = Arc::new(bench_ds);
185+
// base_existing in the indexed range, base_new beyond all data so it's an insert.
186+
one_merge_insert(arc, 100, total + 1_000_000).await;
187+
})
188+
})
189+
});
190+
}
191+
192+
fn bench_merge_insert(c: &mut Criterion) {
193+
bench_one_shape(c, "merge_insert/clean", build_clean);
194+
bench_one_shape(
195+
c,
196+
"merge_insert/with_new_rows_only",
197+
build_with_new_rows_only,
198+
);
199+
bench_one_shape(
200+
c,
201+
"merge_insert/with_deletions_only",
202+
build_with_deletions_only,
203+
);
204+
// The shape that exercises the AllowList(Full) - BlockList(Partial) path.
205+
bench_one_shape(
206+
c,
207+
"merge_insert/with_new_rows_and_deletions",
208+
build_with_new_rows_and_deletions,
209+
);
210+
}
211+
212+
#[cfg(target_os = "linux")]
213+
criterion_group!(
214+
name=benches;
215+
config = Criterion::default().significance_level(0.1).sample_size(10)
216+
.with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
217+
targets = bench_merge_insert);
218+
219+
#[cfg(not(target_os = "linux"))]
220+
criterion_group!(
221+
name=benches;
222+
config = Criterion::default().significance_level(0.1).sample_size(10);
223+
targets = bench_merge_insert);
224+
225+
criterion_main!(benches);

rust/lance/src/index/prefilter.rs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -241,12 +241,20 @@ impl DatasetPreFilter {
241241
// outside the index bitmap. This can happen when a fragment's data was
242242
// modified but the index was not rewritten (e.g. after DataReplacement
243243
// or partial merge_insert).
244-
let needs_allow_list = if restrict_to_fragments {
244+
//
245+
// We materialize the set of non-index fragments here so the slow path
246+
// below can fold them into the deletion mask as Full blocks instead of
247+
// computing AllowList(Full) - BlockList(Partial), which forces
248+
// RoaringBitmap::full() per fragment in RowAddrTreeMap::sub_assign and
249+
// is the dominant cost on every merge_insert call.
250+
let non_index_frags: Option<RoaringBitmap> = if restrict_to_fragments {
245251
let dataset_frag_ids: RoaringBitmap = frag_map.keys().copied().collect();
246-
!dataset_frag_ids.is_subset(&fragments)
252+
let outside = &dataset_frag_ids - &fragments;
253+
(!outside.is_empty()).then_some(outside)
247254
} else {
248-
false
255+
None
249256
};
257+
let needs_allow_list = non_index_frags.is_some();
250258
for frag_id in fragments.iter() {
251259
let frag = frag_map.get(&frag_id);
252260
if let Some(frag) = frag {
@@ -270,21 +278,29 @@ impl DatasetPreFilter {
270278
}
271279
Some(async move { Ok(Arc::new(RowAddrMask::from_allowed(allow_list))) }.boxed())
272280
} else {
273-
// There are deletions/missing frags. Build the deletion mask and
274-
// optionally combine it with the fragment allow-list.
281+
// There are deletions/missing frags. Build the deletion mask and,
282+
// if needed, fold the non-index fragments into it as Full blocks.
283+
// Equivalent to BlockList(deletions) | BlockList(non_index) — but
284+
// expressed via insert_fragment so we never materialize a
285+
// RoaringBitmap::full() per fragment.
275286
let fut =
276287
Self::do_create_deletion_mask(dataset, missing_frags, frags_with_deletion_files);
277-
if needs_allow_list {
288+
if let Some(non_index_frags) = non_index_frags {
278289
Some(
279290
async move {
280291
let deletion_mask = fut.await?;
281-
let mut allow_list = RowAddrTreeMap::new();
282-
for frag_id in fragments.iter() {
283-
allow_list.insert_fragment(frag_id);
292+
let mut combined = match &*deletion_mask {
293+
RowAddrMask::BlockList(b) => b.clone(),
294+
RowAddrMask::AllowList(_) => {
295+
// do_create_deletion_mask only returns BlockList; this is
296+
// defensive and should be unreachable.
297+
return Ok(deletion_mask);
298+
}
299+
};
300+
for frag_id in non_index_frags.iter() {
301+
combined.insert_fragment(frag_id);
284302
}
285-
Ok(Arc::new(
286-
(*deletion_mask).clone() & RowAddrMask::from_allowed(allow_list),
287-
))
303+
Ok(Arc::new(RowAddrMask::from_block(combined)))
288304
}
289305
.boxed(),
290306
)

0 commit comments

Comments
 (0)