Skip to content

Commit e5ceacb

Browse files
authored
refactor: use exact base-scoped store bindings (#6422)
This changes per-base runtime configuration to use exact `ObjectStoreParams` bindings keyed by `BasePath.path` instead of per-base storage option overrides. Dataset-level and write-level store params now act only as fallbacks, while reads, target-base writes, and external blob resolution all consult the same base-scoped binding model. This keeps provider-specific runtime state out of the manifest and follows the direction in discussion #6307 to keep `BasePath` focused on identity.
1 parent 690a7c2 commit e5ceacb

6 files changed

Lines changed: 350 additions & 41 deletions

File tree

rust/lance/src/dataset.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ pub struct Dataset {
181181
/// Object store parameters used when opening this dataset.
182182
/// These are used when creating object stores for additional base paths.
183183
pub(crate) store_params: Option<Box<ObjectStoreParams>>,
184+
/// Optional runtime-only object store parameters keyed by base path URI.
185+
pub(crate) base_store_params: Option<Arc<HashMap<String, ObjectStoreParams>>>,
184186
}
185187

186188
impl std::fmt::Debug for Dataset {
@@ -190,6 +192,7 @@ impl std::fmt::Debug for Dataset {
190192
.field("base", &self.base)
191193
.field("version", &self.manifest.version)
192194
.field("cache_num_items", &self.session.approx_num_items())
195+
.field("base_store_params", &self.base_store_params.is_some())
193196
.finish()
194197
}
195198
}
@@ -584,6 +587,7 @@ impl Dataset {
584587
self.commit_handler.clone(),
585588
self.file_reader_options.clone(),
586589
self.store_params.as_deref().cloned(),
590+
self.base_store_params.clone(),
587591
)
588592
}
589593

@@ -702,6 +706,7 @@ impl Dataset {
702706
commit_handler: Arc<dyn CommitHandler>,
703707
file_reader_options: Option<FileReaderOptions>,
704708
store_params: Option<ObjectStoreParams>,
709+
base_store_params: Option<Arc<HashMap<String, ObjectStoreParams>>>,
705710
) -> Result<Self> {
706711
let refs = Refs::new(
707712
object_store.clone(),
@@ -729,6 +734,7 @@ impl Dataset {
729734
index_cache,
730735
file_reader_options,
731736
store_params: store_params.map(Box::new),
737+
base_store_params,
732738
})
733739
}
734740

@@ -1638,6 +1644,23 @@ impl Dataset {
16381644
cloned
16391645
}
16401646

1647+
fn store_params_for_base(
1648+
&self,
1649+
base_path: Option<&lance_table::format::BasePath>,
1650+
) -> ObjectStoreParams {
1651+
// Base-specific bindings are exact ObjectStoreParams keyed by
1652+
// `BasePath.path`. If a base has no explicit binding then reads fall back
1653+
// to the dataset-level default store params.
1654+
base_path
1655+
.and_then(|base_path| {
1656+
self.base_store_params
1657+
.as_ref()
1658+
.and_then(|params| params.get(&base_path.path))
1659+
})
1660+
.cloned()
1661+
.unwrap_or_else(|| self.store_params.as_deref().cloned().unwrap_or_default())
1662+
}
1663+
16411664
/// Returns the initial storage options used when opening this dataset, if any.
16421665
///
16431666
/// This returns the static initial options without triggering any refresh.
@@ -1848,11 +1871,12 @@ impl Dataset {
18481871
let base_path = self.manifest.base_paths.get(&base_id).ok_or_else(|| {
18491872
Error::invalid_input(format!("Dataset base path with ID {} not found", base_id))
18501873
})?;
1874+
let store_params = self.store_params_for_base(Some(base_path));
18511875

18521876
let (store, _) = ObjectStore::from_uri_and_params(
18531877
self.session.store_registry(),
18541878
&base_path.path,
1855-
&self.store_params.as_deref().cloned().unwrap_or_default(),
1879+
&store_params,
18561880
)
18571881
.await?;
18581882

@@ -2673,6 +2697,7 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'
26732697
dataset.commit_handler.clone(),
26742698
dataset.file_reader_options.clone(),
26752699
dataset.store_params.as_deref().cloned(),
2700+
dataset.base_store_params.clone(),
26762701
)?;
26772702
let loaded =
26782703
Arc::new(dataset_version.read_transaction().await?.ok_or_else(|| {
@@ -2704,6 +2729,7 @@ pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'
27042729
dataset.commit_handler.clone(),
27052730
dataset.file_reader_options.clone(),
27062731
dataset.store_params.as_deref().cloned(),
2732+
dataset.base_store_params.clone(),
27072733
)
27082734
} else {
27092735
// If we didn't get the latest manifest, we can still return the dataset

rust/lance/src/dataset/blob.rs

Lines changed: 86 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,39 +45,37 @@ pub(super) struct ExternalBaseCandidate {
4545
pub base_id: u32,
4646
pub store_prefix: String,
4747
pub base_path: Path,
48+
pub store_params: ObjectStoreParams,
4849
}
4950

5051
#[derive(Debug)]
5152
pub(super) struct ExternalBaseResolver {
5253
candidates: Vec<ExternalBaseCandidate>,
5354
store_registry: Arc<ObjectStoreRegistry>,
54-
store_params: ObjectStoreParams,
5555
}
5656

5757
impl ExternalBaseResolver {
5858
pub(super) fn new(
5959
candidates: Vec<ExternalBaseCandidate>,
6060
store_registry: Arc<ObjectStoreRegistry>,
61-
store_params: ObjectStoreParams,
6261
) -> Self {
6362
Self {
6463
candidates,
6564
store_registry,
66-
store_params,
6765
}
6866
}
6967

7068
pub(crate) async fn resolve_external_uri(
7169
&self,
7270
uri: &str,
7371
) -> Result<Option<ResolvedExternalBase>> {
74-
let uri_store_prefix = self
75-
.store_registry
76-
.calculate_object_store_prefix(uri, self.store_params.storage_options())?;
7772
let uri_path = ObjectStore::extract_path_from_uri(self.store_registry.clone(), uri)?;
7873

7974
let mut best_match: Option<(usize, ResolvedExternalBase)> = None;
8075
for candidate in &self.candidates {
76+
let uri_store_prefix = self
77+
.store_registry
78+
.calculate_object_store_prefix(uri, candidate.store_params.storage_options())?;
8179
if candidate.store_prefix != uri_store_prefix {
8280
continue;
8381
}
@@ -1325,6 +1323,7 @@ fn data_file_key_from_path(path: &str) -> &str {
13251323

13261324
#[cfg(test)]
13271325
mod tests {
1326+
use std::collections::HashMap;
13281327
use std::sync::Arc;
13291328

13301329
use arrow::{
@@ -1342,7 +1341,9 @@ mod tests {
13421341
ARROW_EXT_NAME_KEY, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY, BLOB_V2_EXT_NAME, DataTypeExt,
13431342
};
13441343
use lance_core::datatypes::BlobKind;
1345-
use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
1344+
use lance_io::object_store::{
1345+
ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor,
1346+
};
13461347
use lance_io::stream::RecordBatchStream;
13471348
use lance_table::format::BasePath;
13481349
use object_store::{
@@ -1358,7 +1359,7 @@ mod tests {
13581359
use lance_datagen::{BatchCount, RowCount, array};
13591360
use lance_file::version::LanceFileVersion;
13601361

1361-
use super::{BlobFile, data_file_key_from_path};
1362+
use super::{BlobFile, ExternalBaseCandidate, ExternalBaseResolver, data_file_key_from_path};
13621363
use crate::{
13631364
Dataset,
13641365
blob::{BlobArrayBuilder, blob_field},
@@ -1378,9 +1379,86 @@ mod tests {
13781379
expected: Vec<u8>,
13791380
}
13801381

1382+
#[cfg(feature = "azure")]
1383+
fn azure_store_params(account_name: &str) -> ObjectStoreParams {
1384+
ObjectStoreParams {
1385+
storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
1386+
HashMap::from([
1387+
("account_name".to_string(), account_name.to_string()),
1388+
("account_key".to_string(), "dGVzdA==".to_string()),
1389+
]),
1390+
))),
1391+
..Default::default()
1392+
}
1393+
}
1394+
13811395
#[derive(Debug)]
13821396
struct RejectEmptyRangeObjectStore;
13831397

1398+
#[cfg(feature = "azure")]
1399+
#[tokio::test]
1400+
async fn test_external_base_resolver_uses_candidate_store_params() {
1401+
let store_registry = Arc::new(ObjectStoreRegistry::default());
1402+
let base_a = BasePath::new(
1403+
1,
1404+
"az://container/path-a".to_string(),
1405+
Some("base-a".to_string()),
1406+
false,
1407+
);
1408+
let base_b = BasePath::new(
1409+
2,
1410+
"az://container/path-b".to_string(),
1411+
Some("base-b".to_string()),
1412+
false,
1413+
);
1414+
1415+
let base_a_params = azure_store_params("account-a");
1416+
let base_b_params = azure_store_params("account-b");
1417+
1418+
let (store_a, extracted_a) =
1419+
ObjectStore::from_uri_and_params(store_registry.clone(), &base_a.path, &base_a_params)
1420+
.await
1421+
.unwrap();
1422+
let (store_b, extracted_b) =
1423+
ObjectStore::from_uri_and_params(store_registry.clone(), &base_b.path, &base_b_params)
1424+
.await
1425+
.unwrap();
1426+
1427+
let resolver = ExternalBaseResolver::new(
1428+
vec![
1429+
ExternalBaseCandidate {
1430+
base_id: base_a.id,
1431+
store_prefix: store_a.store_prefix.clone(),
1432+
base_path: extracted_a,
1433+
store_params: base_a_params,
1434+
},
1435+
ExternalBaseCandidate {
1436+
base_id: base_b.id,
1437+
store_prefix: store_b.store_prefix.clone(),
1438+
base_path: extracted_b,
1439+
store_params: base_b_params,
1440+
},
1441+
],
1442+
store_registry,
1443+
);
1444+
1445+
let resolved_a = resolver
1446+
.resolve_external_uri("az://container/path-a/file.bin")
1447+
.await
1448+
.unwrap()
1449+
.unwrap();
1450+
let resolved_b = resolver
1451+
.resolve_external_uri("az://container/path-b/file.bin")
1452+
.await
1453+
.unwrap()
1454+
.unwrap();
1455+
1456+
assert_eq!(resolved_a.base_id, 1);
1457+
assert_eq!(resolved_a.relative_path, "file.bin");
1458+
assert_eq!(resolved_b.base_id, 2);
1459+
assert_eq!(resolved_b.relative_path, "file.bin");
1460+
}
1461+
13841462
impl std::fmt::Display for RejectEmptyRangeObjectStore {
13851463
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
13861464
write!(f, "RejectEmptyRangeObjectStore")

0 commit comments

Comments
 (0)