Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2536,18 +2536,14 @@ impl Dataset {

/// Get a snapshot of current IO statistics without resetting counters
fn io_stats_snapshot(&self) -> PyResult<IoStats> {
let object_store = rt()
.block_on(None, self.ds.object_store(None))?
.infer_error()?;
let object_store = self.ds.object_store(None).infer_error()?;
let stats = object_store.io_stats_snapshot();
Ok(IoStats::from_lance(stats))
}

/// Get incremental IO statistics for this dataset
fn io_stats_incremental(&self) -> PyResult<IoStats> {
let object_store = rt()
.block_on(None, self.ds.object_store(None))?
.infer_error()?;
let object_store = self.ds.object_store(None).infer_error()?;
let stats = object_store.io_stats_incremental();
Ok(IoStats::from_lance(stats))
}
Expand Down
2 changes: 1 addition & 1 deletion python/src/indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ async fn do_load_shuffled_vectors(

let mut ds = dataset.ds.as_ref().clone();
let index_dir = ds.indices_dir().child(index_id.to_string());
let object_store = ds.object_store(None).await.infer_error()?;
let object_store = ds.object_store(None).infer_error()?;
let files = list_index_files_with_sizes(object_store.as_ref(), &index_dir)
.await
.infer_error()?;
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/benches/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ fn file_reader_take(

async fn create_file_reader(dataset: &Dataset, file_path: &Path) -> FileReader {
// Create file reader v2.
let object_store = dataset.object_store(None).await.unwrap();
let object_store = dataset.object_store(None).unwrap();
let scheduler = ScanScheduler::new(object_store, SchedulerConfig::new(2 * 1024 * 1024 * 1024));
let file = scheduler
.open_file(file_path, &CachedFileSize::unknown())
Expand Down
105 changes: 66 additions & 39 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ pub struct Dataset {
pub(crate) store_params: Option<Box<ObjectStoreParams>>,
/// Optional runtime-only object store parameters keyed by base path URI.
pub(crate) base_store_params: Option<Arc<HashMap<String, ObjectStoreParams>>>,
/// Pre-initialized object stores for additional base paths, keyed by base id.
pub(crate) base_object_stores: Arc<HashMap<u32, Arc<ObjectStore>>>,
}

impl std::fmt::Debug for Dataset {
Expand Down Expand Up @@ -592,6 +594,7 @@ impl Dataset {
self.store_params.as_deref().cloned(),
self.base_store_params.clone(),
)
.await
}

pub(crate) async fn load_manifest(
Expand Down Expand Up @@ -699,7 +702,7 @@ impl Dataset {
}

#[allow(clippy::too_many_arguments)]
fn checkout_manifest(
async fn checkout_manifest(
object_store: Arc<ObjectStore>,
base_path: Path,
uri: String,
Expand All @@ -723,6 +726,13 @@ impl Dataset {
let metadata_cache = Arc::new(session.metadata_cache.for_dataset(&uri));
let index_cache = Arc::new(session.index_cache.for_dataset(&uri));
let fragment_bitmap = Arc::new(manifest.fragments.iter().map(|f| f.id as u32).collect());
let base_object_stores = Self::init_base_object_stores(
&session,
&manifest,
store_params.as_ref(),
base_store_params.as_ref(),
)
.await?;
Ok(Self {
object_store,
base: base_path,
Expand All @@ -738,6 +748,7 @@ impl Dataset {
file_reader_options,
store_params: store_params.map(Box::new),
base_store_params,
base_object_stores: Arc::new(base_object_stores),
})
}

Expand Down Expand Up @@ -1720,6 +1731,18 @@ impl Dataset {
.collect(),
)
});
cloned.base_object_stores = Arc::new(
self.base_object_stores
.iter()
.map(|(base_id, store)| {
let mut wrapped = store.as_ref().clone();
for wrapper in &wrappers {
wrapped.inner = wrapper.wrap(&wrapped.store_prefix, wrapped.inner.clone());
}
(*base_id, Arc::new(wrapped))
})
.collect(),
);
cloned
}

Expand All @@ -1742,21 +1765,15 @@ impl Dataset {
store_params
}

fn store_params_for_base(
&self,
fn build_store_params_for_base(
store_params: Option<&ObjectStoreParams>,
base_store_params: Option<&Arc<HashMap<String, ObjectStoreParams>>>,
base_path: Option<&lance_table::format::BasePath>,
) -> ObjectStoreParams {
// Base-specific bindings are exact ObjectStoreParams keyed by
// `BasePath.path`. If a base has no explicit binding then reads fall back
// to the dataset-level default store params.
base_path
.and_then(|base_path| {
self.base_store_params
.as_ref()
.and_then(|params| params.get(&base_path.path))
})
.and_then(|base_path| base_store_params.and_then(|params| params.get(&base_path.path)))
.cloned()
.unwrap_or_else(|| self.store_params.as_deref().cloned().unwrap_or_default())
.unwrap_or_else(|| store_params.cloned().unwrap_or_default())
}

/// Returns the initial storage options used when opening this dataset, if any.
Expand Down Expand Up @@ -1860,7 +1877,7 @@ impl Dataset {
let data_dir = self.data_file_dir_for_base(base_id)?;
let filepath = data_dir.child(path);

let object_store = self.object_store(base_id).await?;
let object_store = self.object_store(base_id)?;

// Get file size
let file_size = object_store.size(&filepath).await?;
Expand Down Expand Up @@ -1966,52 +1983,60 @@ impl Dataset {
}
}

async fn base_object_store(&self, base_id: u32) -> Result<Arc<ObjectStore>> {
let base_path = self.manifest.base_paths.get(&base_id).ok_or_else(|| {
Error::invalid_input(format!("Dataset base path with ID {} not found", base_id))
})?;
let store_params = self.store_params_for_base(Some(base_path));

let (store, _) = ObjectStore::from_uri_and_params(
self.session.store_registry(),
&base_path.path,
&store_params,
)
.await?;

Ok(store)
pub(crate) async fn init_base_object_stores(
session: &Session,
manifest: &Manifest,
store_params: Option<&ObjectStoreParams>,
base_store_params: Option<&Arc<HashMap<String, ObjectStoreParams>>>,
) -> Result<HashMap<u32, Arc<ObjectStore>>> {
let mut stores = HashMap::new();
for (base_id, base_path) in &manifest.base_paths {
let params =
Self::build_store_params_for_base(store_params, base_store_params, Some(base_path));
let (store, _) = ObjectStore::from_uri_and_params(
session.store_registry(),
&base_path.path,
&params,
)
.await?;
stores.insert(*base_id, store);
}
Ok(stores)
}

/// Resolve the object store for the primary dataset or an additional base.
///
/// Pass `None` to get the primary dataset object store. Pass `Some(base_id)`
/// when resolving a file whose metadata references an additional base.
pub async fn object_store(&self, base_id: Option<u32>) -> Result<Arc<ObjectStore>> {
pub fn object_store(&self, base_id: Option<u32>) -> Result<Arc<ObjectStore>> {
match base_id {
Some(base_id) => self.base_object_store(base_id).await,
Some(base_id) => self
.base_object_stores
.get(&base_id)
.cloned()
.ok_or_else(|| {
Error::invalid_input(format!("Dataset base path with ID {} not found", base_id))
}),
None => Ok(self.object_store.clone()),
}
}

pub(crate) async fn object_store_for_data_file(
pub(crate) fn object_store_for_data_file(
&self,
data_file: &DataFile,
) -> Result<Arc<ObjectStore>> {
self.object_store(data_file.base_id).await
self.object_store(data_file.base_id)
}

pub(crate) async fn object_store_for_deletion(
pub(crate) fn object_store_for_deletion(
&self,
deletion_file: &DeletionFile,
) -> Result<Arc<ObjectStore>> {
self.object_store(deletion_file.base_id).await
self.object_store(deletion_file.base_id)
}

pub(crate) async fn object_store_for_index(
&self,
index: &IndexMetadata,
) -> Result<Arc<ObjectStore>> {
self.object_store(index.base_id).await
pub(crate) fn object_store_for_index(&self, index: &IndexMetadata) -> Result<Arc<ObjectStore>> {
self.object_store(index.base_id)
}

pub(crate) fn dataset_dir_for_deletion(&self, deletion_file: &DeletionFile) -> Result<Path> {
Expand Down Expand Up @@ -2829,7 +2854,8 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'
dataset.file_reader_options.clone(),
dataset.store_params.as_deref().cloned(),
dataset.base_store_params.clone(),
)?;
)
.await?;
let loaded =
Arc::new(dataset_version.read_transaction().await?.ok_or_else(|| {
Error::internal(format!(
Expand Down Expand Up @@ -2862,6 +2888,7 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'
dataset.store_params.as_deref().cloned(),
dataset.base_store_params.clone(),
)
.await
} else {
// If we didn't get the latest manifest, we can still return the dataset
// with the current manifest.
Expand Down
4 changes: 2 additions & 2 deletions rust/lance/src/dataset/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1946,7 +1946,7 @@ async fn collect_blob_entries_v2(
let object_store = if let Some(store) = store_cache.get(&base_id) {
store.clone()
} else {
let store = dataset.object_store(Some(base_id)).await?;
let store = dataset.object_store(Some(base_id))?;
store_cache.insert(base_id, store.clone());
store
};
Expand Down Expand Up @@ -2047,7 +2047,7 @@ async fn resolve_blob_read_location(
if let Some(store) = store_cache.get(&base_id) {
store.clone()
} else {
let store = dataset.object_store(Some(base_id)).await?;
let store = dataset.object_store(Some(base_id))?;
store_cache.insert(base_id, store.clone());
store
}
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,5 +811,6 @@ impl DatasetBuilder {
store_params,
base_store_params,
)
.await
}
}
4 changes: 2 additions & 2 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ impl FileFragment {
.dataset
.data_file_dir(data_file)?
.child(data_file.path.as_str());
let object_store = self.dataset.object_store_for_data_file(data_file).await?;
let object_store = self.dataset.object_store_for_data_file(data_file)?;
let field_id_offset = Self::get_field_id_offset(data_file);
let reader = PreviousFileReader::try_new_with_fragment_id(
&object_store,
Expand Down Expand Up @@ -964,7 +964,7 @@ impl FileFragment {
let (store_scheduler, reader_priority) = if let Some(base_id) = data_file.base_id {
// TODO: make object stores for non-default bases reuse the same scan scheduler
// currently we always create a new one
let object_store = self.dataset.object_store(Some(base_id)).await?;
let object_store = self.dataset.object_store(Some(base_id))?;
let config = SchedulerConfig::max_bandwidth(&object_store);
(
ScanScheduler::new(object_store, config),
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl LanceIndexStoreExt for LanceIndexStore {
.child(index.uuid.to_string());
let cache = dataset.metadata_cache.file_metadata_cache(&index_dir);
let format_version = dataset_format_version(dataset);
let object_store = dataset.object_store_for_index(index).await?;
let object_store = dataset.object_store_for_index(index)?;
let store =
Self::with_format_version(object_store, index_dir, Arc::new(cache), format_version);
Ok(store.with_file_sizes(index.file_size_map()))
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/mem_wal/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl DatasetMemWalExt for Dataset {

async fn list_mem_wal_latest_shard_ids(&self) -> Result<Vec<Uuid>> {
let prefix = super::util::mem_wal_path(&self.branch_location().path);
let object_store = self.object_store(None).await?;
let object_store = self.object_store(None)?;
let list_result = object_store
.inner
.list_with_delimiter(Some(&prefix))
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ async fn can_use_binary_copy_impl(

// check file global buffer
let object_store = match data_file.base_id {
Some(base_id) => dataset.object_store(Some(base_id)).await?,
Some(base_id) => dataset.object_store(Some(base_id))?,
None => dataset.object_store.clone(),
};
let full_path = dataset
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/optimize/binary_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ pub async fn rewrite_files_binary_copy(
for frag in fragments.iter() {
for df in frag.files.iter() {
let object_store = if let Some(base_id) = df.base_id {
dataset.object_store(Some(base_id)).await?
dataset.object_store(Some(base_id))?
} else {
dataset.object_store.clone()
};
Expand Down
8 changes: 4 additions & 4 deletions rust/lance/src/dataset/tests/dataset_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async fn test_with_object_store_wrappers_wraps_base_store_params() {
let request_tracker = Arc::new(IOTracker::default());
let wrapped = dataset
.with_object_store_wrappers(vec![request_tracker.clone() as Arc<dyn WrappingObjectStore>]);
let base_store = wrapped.object_store(Some(1)).await.unwrap();
let base_store = wrapped.object_store(Some(1)).unwrap();
let base_location = base
.extract_path(wrapped.session().store_registry())
.unwrap()
Expand Down Expand Up @@ -460,9 +460,9 @@ async fn test_object_store_uses_runtime_base_store_params() {
.await
.unwrap();

let store_a = dataset.object_store(Some(1)).await.unwrap();
let store_a_again = dataset.object_store(Some(1)).await.unwrap();
let store_b = dataset.object_store(Some(2)).await.unwrap();
let store_a = dataset.object_store(Some(1)).unwrap();
let store_a_again = dataset.object_store(Some(1)).unwrap();
let store_b = dataset.object_store(Some(2)).unwrap();

assert!(Arc::ptr_eq(&store_a, &store_a_again));
assert!(!Arc::ptr_eq(&store_a, &store_b));
Expand Down
Loading
Loading