Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 66 additions & 39 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ pub struct Dataset {
pub(crate) store_params: Option<Box<ObjectStoreParams>>,
/// Optional runtime-only object store parameters keyed by base path URI.
pub(crate) base_store_params: Option<Arc<HashMap<String, ObjectStoreParams>>>,
/// Pre-initialized object stores for additional base paths, keyed by base id.
pub(crate) base_object_stores: Arc<HashMap<u32, Arc<ObjectStore>>>,
}

impl std::fmt::Debug for Dataset {
Expand Down Expand Up @@ -592,6 +594,7 @@ impl Dataset {
self.store_params.as_deref().cloned(),
self.base_store_params.clone(),
)
.await
}

pub(crate) async fn load_manifest(
Expand Down Expand Up @@ -699,7 +702,7 @@ impl Dataset {
}

#[allow(clippy::too_many_arguments)]
fn checkout_manifest(
async fn checkout_manifest(
object_store: Arc<ObjectStore>,
base_path: Path,
uri: String,
Expand All @@ -723,6 +726,13 @@ impl Dataset {
let metadata_cache = Arc::new(session.metadata_cache.for_dataset(&uri));
let index_cache = Arc::new(session.index_cache.for_dataset(&uri));
let fragment_bitmap = Arc::new(manifest.fragments.iter().map(|f| f.id as u32).collect());
let base_object_stores = Self::init_base_object_stores(
&session,
&manifest,
store_params.as_ref(),
base_store_params.as_ref(),
)
.await?;
Ok(Self {
object_store,
base: base_path,
Expand All @@ -738,6 +748,7 @@ impl Dataset {
file_reader_options,
store_params: store_params.map(Box::new),
base_store_params,
base_object_stores: Arc::new(base_object_stores),
})
}

Expand Down Expand Up @@ -1720,6 +1731,18 @@ impl Dataset {
.collect(),
)
});
cloned.base_object_stores = Arc::new(
self.base_object_stores
.iter()
.map(|(base_id, store)| {
let mut wrapped = store.as_ref().clone();
for wrapper in &wrappers {
wrapped.inner = wrapper.wrap(&wrapped.store_prefix, wrapped.inner.clone());
}
(*base_id, Arc::new(wrapped))
})
.collect(),
);
cloned
}

Expand All @@ -1742,21 +1765,15 @@ impl Dataset {
store_params
}

fn store_params_for_base(
&self,
fn build_store_params_for_base(
store_params: Option<&ObjectStoreParams>,
base_store_params: Option<&Arc<HashMap<String, ObjectStoreParams>>>,
base_path: Option<&lance_table::format::BasePath>,
) -> ObjectStoreParams {
// Base-specific bindings are exact ObjectStoreParams keyed by
// `BasePath.path`. If a base has no explicit binding then reads fall back
// to the dataset-level default store params.
base_path
.and_then(|base_path| {
self.base_store_params
.as_ref()
.and_then(|params| params.get(&base_path.path))
})
.and_then(|base_path| base_store_params.and_then(|params| params.get(&base_path.path)))
.cloned()
.unwrap_or_else(|| self.store_params.as_deref().cloned().unwrap_or_default())
.unwrap_or_else(|| store_params.cloned().unwrap_or_default())
}

/// Returns the initial storage options used when opening this dataset, if any.
Expand Down Expand Up @@ -1860,7 +1877,7 @@ impl Dataset {
let data_dir = self.data_file_dir_for_base(base_id)?;
let filepath = data_dir.child(path);

let object_store = self.object_store(base_id).await?;
let object_store = self.object_store(base_id)?;

// Get file size
let file_size = object_store.size(&filepath).await?;
Expand Down Expand Up @@ -1966,52 +1983,60 @@ impl Dataset {
}
}

async fn base_object_store(&self, base_id: u32) -> Result<Arc<ObjectStore>> {
let base_path = self.manifest.base_paths.get(&base_id).ok_or_else(|| {
Error::invalid_input(format!("Dataset base path with ID {} not found", base_id))
})?;
let store_params = self.store_params_for_base(Some(base_path));

let (store, _) = ObjectStore::from_uri_and_params(
self.session.store_registry(),
&base_path.path,
&store_params,
)
.await?;

Ok(store)
pub(crate) async fn init_base_object_stores(
session: &Session,
manifest: &Manifest,
store_params: Option<&ObjectStoreParams>,
base_store_params: Option<&Arc<HashMap<String, ObjectStoreParams>>>,
) -> Result<HashMap<u32, Arc<ObjectStore>>> {
let mut stores = HashMap::new();
for (base_id, base_path) in &manifest.base_paths {
let params =
Self::build_store_params_for_base(store_params, base_store_params, Some(base_path));
let (store, _) = ObjectStore::from_uri_and_params(
session.store_registry(),
&base_path.path,
&params,
)
.await?;
stores.insert(*base_id, store);
}
Ok(stores)
}

/// Resolve the object store for the primary dataset or an additional base.
///
/// Pass `None` to get the primary dataset object store. Pass `Some(base_id)`
/// when resolving a file whose metadata references an additional base.
pub async fn object_store(&self, base_id: Option<u32>) -> Result<Arc<ObjectStore>> {
pub fn object_store(&self, base_id: Option<u32>) -> Result<Arc<ObjectStore>> {
match base_id {
Some(base_id) => self.base_object_store(base_id).await,
Some(base_id) => self
.base_object_stores
.get(&base_id)
.cloned()
.ok_or_else(|| {
Error::invalid_input(format!("Dataset base path with ID {} not found", base_id))
}),
None => Ok(self.object_store.clone()),
}
}

pub(crate) async fn object_store_for_data_file(
pub(crate) fn object_store_for_data_file(
&self,
data_file: &DataFile,
) -> Result<Arc<ObjectStore>> {
self.object_store(data_file.base_id).await
self.object_store(data_file.base_id)
}

pub(crate) async fn object_store_for_deletion(
pub(crate) fn object_store_for_deletion(
&self,
deletion_file: &DeletionFile,
) -> Result<Arc<ObjectStore>> {
self.object_store(deletion_file.base_id).await
self.object_store(deletion_file.base_id)
}

pub(crate) async fn object_store_for_index(
&self,
index: &IndexMetadata,
) -> Result<Arc<ObjectStore>> {
self.object_store(index.base_id).await
pub(crate) fn object_store_for_index(&self, index: &IndexMetadata) -> Result<Arc<ObjectStore>> {
self.object_store(index.base_id)
}

pub(crate) fn dataset_dir_for_deletion(&self, deletion_file: &DeletionFile) -> Result<Path> {
Expand Down Expand Up @@ -2829,7 +2854,8 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'
dataset.file_reader_options.clone(),
dataset.store_params.as_deref().cloned(),
dataset.base_store_params.clone(),
)?;
)
.await?;
let loaded =
Arc::new(dataset_version.read_transaction().await?.ok_or_else(|| {
Error::internal(format!(
Expand Down Expand Up @@ -2862,6 +2888,7 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'
dataset.store_params.as_deref().cloned(),
dataset.base_store_params.clone(),
)
.await
} else {
// If we didn't get the latest manifest, we can still return the dataset
// with the current manifest.
Expand Down
4 changes: 2 additions & 2 deletions rust/lance/src/dataset/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1946,7 +1946,7 @@ async fn collect_blob_entries_v2(
let object_store = if let Some(store) = store_cache.get(&base_id) {
store.clone()
} else {
let store = dataset.object_store(Some(base_id)).await?;
let store = dataset.object_store(Some(base_id))?;
store_cache.insert(base_id, store.clone());
store
};
Expand Down Expand Up @@ -2047,7 +2047,7 @@ async fn resolve_blob_read_location(
if let Some(store) = store_cache.get(&base_id) {
store.clone()
} else {
let store = dataset.object_store(Some(base_id)).await?;
let store = dataset.object_store(Some(base_id))?;
store_cache.insert(base_id, store.clone());
store
}
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,5 +811,6 @@ impl DatasetBuilder {
store_params,
base_store_params,
)
.await
}
}
4 changes: 2 additions & 2 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ impl FileFragment {
.dataset
.data_file_dir(data_file)?
.child(data_file.path.as_str());
let object_store = self.dataset.object_store_for_data_file(data_file).await?;
let object_store = self.dataset.object_store_for_data_file(data_file)?;
let field_id_offset = Self::get_field_id_offset(data_file);
let reader = PreviousFileReader::try_new_with_fragment_id(
&object_store,
Expand Down Expand Up @@ -964,7 +964,7 @@ impl FileFragment {
let (store_scheduler, reader_priority) = if let Some(base_id) = data_file.base_id {
// TODO: make object stores for non-default bases reuse the same scan scheduler
// currently we always create a new one
let object_store = self.dataset.object_store(Some(base_id)).await?;
let object_store = self.dataset.object_store(Some(base_id))?;
let config = SchedulerConfig::max_bandwidth(&object_store);
(
ScanScheduler::new(object_store, config),
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl LanceIndexStoreExt for LanceIndexStore {
.child(index.uuid.to_string());
let cache = dataset.metadata_cache.file_metadata_cache(&index_dir);
let format_version = dataset_format_version(dataset);
let object_store = dataset.object_store_for_index(index).await?;
let object_store = dataset.object_store_for_index(index)?;
let store =
Self::with_format_version(object_store, index_dir, Arc::new(cache), format_version);
Ok(store.with_file_sizes(index.file_size_map()))
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/mem_wal/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl DatasetMemWalExt for Dataset {

async fn list_mem_wal_latest_shard_ids(&self) -> Result<Vec<Uuid>> {
let prefix = super::util::mem_wal_path(&self.branch_location().path);
let object_store = self.object_store(None).await?;
let object_store = self.object_store(None)?;
let list_result = object_store
.inner
.list_with_delimiter(Some(&prefix))
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ async fn can_use_binary_copy_impl(

// check file global buffer
let object_store = match data_file.base_id {
Some(base_id) => dataset.object_store(Some(base_id)).await?,
Some(base_id) => dataset.object_store(Some(base_id))?,
None => dataset.object_store.clone(),
};
let full_path = dataset
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/optimize/binary_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ pub async fn rewrite_files_binary_copy(
for frag in fragments.iter() {
for df in frag.files.iter() {
let object_store = if let Some(base_id) = df.base_id {
dataset.object_store(Some(base_id)).await?
dataset.object_store(Some(base_id))?
} else {
dataset.object_store.clone()
};
Expand Down
8 changes: 4 additions & 4 deletions rust/lance/src/dataset/tests/dataset_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async fn test_with_object_store_wrappers_wraps_base_store_params() {
let request_tracker = Arc::new(IOTracker::default());
let wrapped = dataset
.with_object_store_wrappers(vec![request_tracker.clone() as Arc<dyn WrappingObjectStore>]);
let base_store = wrapped.object_store(Some(1)).await.unwrap();
let base_store = wrapped.object_store(Some(1)).unwrap();
let base_location = base
.extract_path(wrapped.session().store_registry())
.unwrap()
Expand Down Expand Up @@ -460,9 +460,9 @@ async fn test_object_store_uses_runtime_base_store_params() {
.await
.unwrap();

let store_a = dataset.object_store(Some(1)).await.unwrap();
let store_a_again = dataset.object_store(Some(1)).await.unwrap();
let store_b = dataset.object_store(Some(2)).await.unwrap();
let store_a = dataset.object_store(Some(1)).unwrap();
let store_a_again = dataset.object_store(Some(1)).unwrap();
let store_b = dataset.object_store(Some(2)).unwrap();

assert!(Arc::ptr_eq(&store_a, &store_a_again));
assert!(!Arc::ptr_eq(&store_a, &store_b));
Expand Down
13 changes: 12 additions & 1 deletion rust/lance/src/dataset/write/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,21 @@ impl<'a> CommitBuilder<'a> {
},
);

let manifest = Arc::new(manifest);
let base_object_stores = Arc::new(
Dataset::init_base_object_stores(
&session,
&manifest,
self.store_params.as_ref(),
None,
)
.await?,
);
Ok(Dataset {
object_store,
base: base_path,
uri: uri.to_string(),
manifest: Arc::new(manifest),
manifest,
manifest_location,
session,
commit_handler,
Expand All @@ -405,6 +415,7 @@ impl<'a> CommitBuilder<'a> {
file_reader_options: None,
store_params: self.store_params.clone().map(Box::new),
base_store_params: None,
base_object_stores,
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1659,7 +1659,7 @@ impl DatasetIndexInternalExt for Dataset {
// Fall back to file existence check for older indices without file metadata
let index_dir = self.indice_files_dir(&index_meta)?;
let index_file = index_dir.child(uuid).child(INDEX_FILE_NAME);
let object_store = self.object_store_for_index(&index_meta).await?;
let object_store = self.object_store_for_index(&index_meta)?;
object_store.exists(&index_file).await?
};

Expand Down Expand Up @@ -1712,7 +1712,7 @@ impl DatasetIndexInternalExt for Dataset {
.load_index(uuid)
.await?
.ok_or_else(|| Error::index(format!("Index with id {} does not exist", uuid)))?;
let object_store = self.object_store_for_index(&index_meta).await?;
let object_store = self.object_store_for_index(&index_meta)?;

// Check sized cache first (v2+ indices with serializable state).
let state_key = IvfIndexStateCacheKey::new(uuid, frag_reuse_uuid.as_ref());
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/index/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn metadata_is_vector_index(dataset: &Dataset, index: &IndexMetadata) -> R
let index_file = index_dir
.child(index.uuid.to_string())
.child(INDEX_FILE_NAME);
let object_store = dataset.object_store_for_index(index).await?;
let object_store = dataset.object_store_for_index(index)?;
object_store.exists(&index_file).await
}

Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/index/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ pub(crate) async fn infer_scalar_index_details(
let bitmap_page_lookup = index_dir.child(BITMAP_LOOKUP_NAME);
let inverted_list_lookup = index_dir.child(METADATA_FILE);
let legacy_inverted_list_lookup = index_dir.child(INVERT_LIST_FILE);
let object_store = dataset.object_store_for_index(index).await?;
let object_store = dataset.object_store_for_index(index)?;
let index_details = if let DataType::List(_) = col.data_type() {
prost_types::Any::from_msg(&LabelListIndexDetails::default()).unwrap()
} else if object_store.exists(&bitmap_page_lookup).await? {
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/index/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,7 @@ pub(crate) async fn open_vector_index_v2(
.await?
.ok_or_else(|| Error::index(format!("Index with id {} does not exist", uuid)))?;
let index_dir = dataset.indice_files_dir(&index_meta)?;
let object_store = dataset.object_store_for_index(&index_meta).await?;
let object_store = dataset.object_store_for_index(&index_meta)?;

let index: Arc<dyn VectorIndex> = match index_metadata.index_type.as_str() {
"IVF_HNSW_PQ" => {
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ async fn migrate_indices(dataset: &Dataset, indices: &mut [IndexMetadata]) -> Re
let index_dir = dataset
.indice_files_dir(index)?
.child(index.uuid.to_string());
let object_store = dataset.object_store_for_index(index).await?;
let object_store = dataset.object_store_for_index(index)?;
list_index_files_with_sizes(&object_store, &index_dir).await
}
.await;
Expand Down
Loading
Loading