Skip to content

Commit 70de37e

Browse files
authored
feat: trim fragment reuse index after remapping (#3911)
Closes #3836
1 parent 8b0bcb6 commit 70de37e

4 files changed

Lines changed: 241 additions & 3 deletions

File tree

rust/lance-index/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ impl IndexType {
182182
Self::LabelList => 0,
183183
Self::Inverted => 0,
184184
Self::NGram => 0,
185+
Self::FragmentReuse => 0,
185186

186187
// for now all vector indices are built by the same builder,
187188
// so they share the same version.
@@ -191,8 +192,6 @@ impl IndexType {
191192
| Self::IvfPq
192193
| Self::IvfHnswSq
193194
| Self::IvfHnswPq => 0,
194-
195-
Self::FragmentReuse => 0,
196195
}
197196
}
198197
}

rust/lance/src/dataset/index.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright The Lance Authors
33

4+
pub mod frag_reuse;
5+
46
use std::collections::{HashMap, HashSet};
57
use std::sync::Arc;
68

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
4+
use crate::dataset::transaction::{Operation, Transaction};
5+
use crate::index::frag_reuse::{build_frag_reuse_index_metadata, load_frag_reuse_index_details};
6+
use crate::Dataset;
7+
use lance_core::Error;
8+
use lance_index::frag_reuse::{FragReuseIndexDetails, FragReuseVersion, FRAG_REUSE_INDEX_NAME};
9+
use lance_index::DatasetIndexExt;
10+
use lance_table::format::Index;
11+
use log::warn;
12+
use roaring::RoaringBitmap;
13+
use snafu::location;
14+
15+
/// Cleanup a fragment reuse index based on the current condition of the indices.
16+
/// If all the indices currently available are already caught up to as a specific reuse version,
17+
/// all older reuse versions (inclusive) can be cleaned up.
18+
///
19+
/// An index is considered caught up against a specific reuse version if
20+
/// 1. the index is created after or at the same dataset version as the reuse version
21+
/// 2. there is no old fragment in the version that is covered by the index and can be remapped.
22+
/// If an index's fragment bitmap is missing, we will consider it as caught up.
23+
/// Otherwise, we will never be able to clean up the reuse version.
24+
///
25+
/// Note that there could be a race condition that an index is being added during the cleanup,
26+
/// This will make that specific index not efficient until the next reindex,
27+
/// but it will not cause any correctness problem.
28+
pub async fn cleanup_frag_reuse_index(dataset: &mut Dataset) -> lance_core::Result<()> {
29+
let indices = dataset.load_indices().await?;
30+
let frag_reuse_index_meta = match indices.iter().find(|idx| idx.name == FRAG_REUSE_INDEX_NAME) {
31+
None => return Ok(()),
32+
Some(idx) => idx,
33+
};
34+
35+
let frag_reuse_details = load_frag_reuse_index_details(dataset, frag_reuse_index_meta)
36+
.await
37+
.unwrap();
38+
39+
let mut retained_versions = Vec::new();
40+
let mut fragment_bitmaps = RoaringBitmap::new();
41+
for version in frag_reuse_details.versions.iter() {
42+
let check_results = indices
43+
.iter()
44+
.map(|idx| is_index_remap_caught_up(version, idx))
45+
.collect::<Vec<_>>();
46+
47+
if check_results
48+
.iter()
49+
.any(|r| matches!(r, Err(Error::InvalidInput { .. })))
50+
{
51+
// If the check fails, the reuse version is likely corrupted, do not retain it.
52+
continue;
53+
}
54+
55+
if !check_results.into_iter().all(|r| r.unwrap()) {
56+
fragment_bitmaps.extend(
57+
version
58+
.new_frags
59+
.iter()
60+
.map(|id| *id as u32)
61+
.collect::<Vec<_>>(),
62+
);
63+
retained_versions.push(version.clone());
64+
}
65+
}
66+
67+
// Return early if there is nothing to cleanup
68+
if retained_versions.len() == frag_reuse_details.versions.len() {
69+
return Ok(());
70+
}
71+
72+
let frag_reuse_index_details = FragReuseIndexDetails {
73+
versions: retained_versions,
74+
};
75+
76+
let new_index_meta =
77+
build_frag_reuse_index_metadata(dataset, frag_reuse_index_details, fragment_bitmaps)
78+
.await?;
79+
80+
let transaction = Transaction::new(
81+
dataset.manifest.version,
82+
Operation::CreateIndex {
83+
new_indices: vec![new_index_meta],
84+
removed_indices: vec![frag_reuse_index_meta.clone()],
85+
},
86+
None,
87+
None,
88+
);
89+
90+
dataset
91+
.apply_commit(transaction, &Default::default(), &Default::default())
92+
.await?;
93+
94+
Ok(())
95+
}
96+
97+
fn is_index_remap_caught_up(
98+
frag_reuse_version: &FragReuseVersion,
99+
index_meta: &Index,
100+
) -> lance_core::Result<bool> {
101+
if index_meta.name == FRAG_REUSE_INDEX_NAME {
102+
return Ok(true);
103+
}
104+
105+
if index_meta.dataset_version < frag_reuse_version.dataset_version {
106+
return Ok(false);
107+
}
108+
109+
match index_meta.fragment_bitmap.clone() {
110+
Some(index_frag_bitmap) => {
111+
let mut old_frag_in_index = 0;
112+
for old_frag in frag_reuse_version.old_frags.iter() {
113+
if index_frag_bitmap.contains(old_frag.id as u32) {
114+
old_frag_in_index += 1;
115+
}
116+
}
117+
118+
if old_frag_in_index == 0 {
119+
Ok(true)
120+
} else {
121+
if old_frag_in_index != frag_reuse_version.old_frags.len() {
122+
// This should never happen because we always commit a full rewrite group
123+
// and we always reindex either the entire group or nothing.
124+
// We use invalid input to be consistent with
125+
// dataset::transaction::recalculate_fragment_bitmap
126+
return Err(Error::invalid_input(
127+
format!("The compaction plan included a rewrite group that was a split of indexed and non-indexed data: {:?}",
128+
frag_reuse_version.old_frags.iter().map(|frag| frag.id).collect::<Vec<_>>()),
129+
location!()));
130+
}
131+
Ok(false)
132+
}
133+
}
134+
None => {
135+
warn!(
136+
"Index {} ({}) missing fragment bitmap, cannot determine if it is caught up with the fragment reuse version, consider retraining the index",
137+
index_meta.name, index_meta.uuid
138+
);
139+
Ok(true)
140+
}
141+
}
142+
}
143+
144+
#[cfg(test)]
145+
mod tests {
146+
use super::*;
147+
use crate::dataset::optimize::{compact_files, remapping, CompactionOptions};
148+
use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount};
149+
use all_asserts::{assert_false, assert_true};
150+
use arrow_array::types::{Float32Type, Int32Type};
151+
use lance_datagen::Dimension;
152+
use lance_index::scalar::ScalarIndexParams;
153+
use lance_index::IndexType;
154+
155+
#[tokio::test]
156+
async fn test_cleanup_frag_reuse_index() {
157+
let mut dataset = lance_datagen::gen()
158+
.col(
159+
"vec",
160+
lance_datagen::array::rand_vec::<Float32Type>(Dimension::from(128)),
161+
)
162+
.col("i", lance_datagen::array::step::<Int32Type>())
163+
.into_ram_dataset(FragmentCount::from(6), FragmentRowCount::from(1000))
164+
.await
165+
.unwrap();
166+
167+
// Create an index to be remapped
168+
let index_name = Some("scalar".into());
169+
dataset
170+
.create_index(
171+
&["i"],
172+
IndexType::Scalar,
173+
index_name.clone(),
174+
&ScalarIndexParams::default(),
175+
false,
176+
)
177+
.await
178+
.unwrap();
179+
180+
// Compact and check index not caught up
181+
compact_files(
182+
&mut dataset,
183+
CompactionOptions {
184+
target_rows_per_fragment: 2_000,
185+
defer_index_remap: true,
186+
..Default::default()
187+
},
188+
None,
189+
)
190+
.await
191+
.unwrap();
192+
let indices_after_compact = dataset.load_indices().await.unwrap();
193+
let frag_reuse_index_meta = indices_after_compact
194+
.iter()
195+
.find(|idx| idx.name == FRAG_REUSE_INDEX_NAME)
196+
.expect("Fragment reuse index must exist");
197+
let frag_reuse_details = load_frag_reuse_index_details(&dataset, frag_reuse_index_meta)
198+
.await
199+
.unwrap();
200+
assert_eq!(frag_reuse_details.versions.len(), 1);
201+
let indices = dataset.load_indices().await.unwrap();
202+
let scalar_index = indices.iter().find(|idx| idx.name == "scalar").unwrap();
203+
assert_false!(
204+
is_index_remap_caught_up(&frag_reuse_details.versions[0], scalar_index).unwrap()
205+
);
206+
207+
// Remap and check index is caught up
208+
remapping::remap_column_index(&mut dataset, &["i"], index_name.clone())
209+
.await
210+
.unwrap();
211+
let indices = dataset.load_indices().await.unwrap();
212+
let scalar_index = indices.iter().find(|idx| idx.name == "scalar").unwrap();
213+
assert_true!(
214+
is_index_remap_caught_up(&frag_reuse_details.versions[0], scalar_index).unwrap()
215+
);
216+
217+
// Cleanup frag reuse index and check there is no reuse version
218+
cleanup_frag_reuse_index(&mut dataset).await.unwrap();
219+
let indices_after_cleanup = dataset.load_indices().await.unwrap();
220+
let frag_reuse_index_meta = indices_after_cleanup
221+
.iter()
222+
.find(|idx| idx.name == FRAG_REUSE_INDEX_NAME)
223+
.expect("Fragment reuse index must exist");
224+
let frag_reuse_details = load_frag_reuse_index_details(&dataset, frag_reuse_index_meta)
225+
.await
226+
.unwrap();
227+
assert_eq!(frag_reuse_details.versions.len(), 0);
228+
}
229+
}

rust/lance/src/index/frag_reuse.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ pub async fn build_new_frag_reuse_index(
106106
new_fragment_bitmap: RoaringBitmap,
107107
changed_row_addrs: RoaringTreemap,
108108
) -> lance_core::Result<Index> {
109-
let index_id = uuid::Uuid::new_v4();
110109
let mut serialized = Vec::with_capacity(changed_row_addrs.serialized_size());
111110
changed_row_addrs.serialize_into(&mut serialized)?;
112111

@@ -136,6 +135,15 @@ pub async fn build_new_frag_reuse_index(
136135
}
137136
};
138137

138+
build_frag_reuse_index_metadata(dataset, new_index_details, new_fragment_bitmap).await
139+
}
140+
141+
pub(crate) async fn build_frag_reuse_index_metadata(
142+
dataset: &mut Dataset,
143+
new_index_details: FragReuseIndexDetails,
144+
new_fragment_bitmap: RoaringBitmap,
145+
) -> lance_core::Result<Index> {
146+
let index_id = uuid::Uuid::new_v4();
139147
let new_index_details_proto = InlineContent::from(&new_index_details);
140148
let proto = if new_index_details_proto.encoded_len() > 204800 {
141149
let file_path = dataset

0 commit comments

Comments
 (0)