Skip to content

Commit 64d3ecb

Browse files
authored
feat!: move object store registry to the session, re-use stores (#3689)
BREAKING CHANGE: removes `object_store_registry` from `WriteParams` and `ReadParams`. The registry is now taken from the `Session`, which is already on those parameters. Also, most `ObjectStore` constructors now return `Arc<ObjectStore>`. Closes #3684 * Add a cache of in-use datasets within the registry. * Move `ObjectStoreRegistry` onto the `Session` object. Combined with the cache, this lets datasets using the same session share object stores, as long as they use the same parameters.
1 parent e415ddf commit 64d3ecb

34 files changed

Lines changed: 738 additions & 396 deletions

File tree

.github/workflows/rust.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ jobs:
173173
- nightly
174174
defaults:
175175
run:
176-
working-directory: ./rust/lance
176+
working-directory: ./rust
177177
steps:
178178
- uses: actions/checkout@v4
179179
- uses: Swatinem/rust-cache@v2
@@ -200,7 +200,7 @@ jobs:
200200
runs-on: windows-latest
201201
defaults:
202202
run:
203-
working-directory: rust/lance
203+
working-directory: rust
204204
steps:
205205
- uses: actions/checkout@v4
206206
- uses: Swatinem/rust-cache@v2

java/core/lance-jni/src/blocking_dataset.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ impl BlockingDataset {
116116
read_version: Option<u64>,
117117
storage_options: HashMap<String, String>,
118118
) -> Result<Self> {
119-
let object_store_registry = Arc::new(ObjectStoreRegistry::default());
120119
let inner = RT.block_on(Dataset::commit(
121120
uri,
122121
operation,
@@ -126,7 +125,7 @@ impl BlockingDataset {
126125
..Default::default()
127126
}),
128127
None,
129-
object_store_registry,
128+
Default::default(),
130129
false, // TODO: support enable_v2_manifest_paths
131130
))?;
132131
Ok(Self { inner })

java/core/lance-jni/src/file_reader.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ fn inner_open<'local>(env: &mut JNIEnv<'local>, file_uri: JString) -> Result<JOb
8989

9090
let reader = RT.block_on(async move {
9191
let (obj_store, path) = ObjectStore::from_uri(&file_uri_str).await?;
92-
let obj_store = Arc::new(obj_store);
9392
let config = SchedulerConfig::max_bandwidth(&obj_store);
9493
let scan_scheduler = ScanScheduler::new(obj_store, config);
9594

python/src/dataset.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2196,11 +2196,7 @@ impl PyFullTextQuery {
21962196

21972197
#[staticmethod]
21982198
#[pyo3(signature = (positive, negative,negative_boost=None))]
2199-
fn boost_query(
2200-
positive: PyFullTextQuery,
2201-
negative: PyFullTextQuery,
2202-
negative_boost: Option<f32>,
2203-
) -> PyResult<Self> {
2199+
fn boost_query(positive: Self, negative: Self, negative_boost: Option<f32>) -> PyResult<Self> {
22042200
Ok(Self {
22052201
inner: BoostQuery::new(positive.inner, negative.inner, negative_boost).into(),
22062202
})

python/src/file.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ fn path_to_parent(path: &Path) -> PyResult<(Path, String)> {
333333

334334
pub async fn object_store_from_uri_or_path_no_options(
335335
uri_or_path: impl AsRef<str>,
336-
) -> PyResult<(ObjectStore, Path)> {
336+
) -> PyResult<(Arc<ObjectStore>, Path)> {
337337
object_store_from_uri_or_path(uri_or_path, None).await
338338
}
339339

@@ -344,7 +344,7 @@ pub async fn object_store_from_uri_or_path_no_options(
344344
pub async fn object_store_from_uri_or_path(
345345
uri_or_path: impl AsRef<str>,
346346
storage_options: Option<HashMap<String, String>>,
347-
) -> PyResult<(ObjectStore, Path)> {
347+
) -> PyResult<(Arc<ObjectStore>, Path)> {
348348
if let Ok(mut url) = Url::parse(uri_or_path.as_ref()) {
349349
if url.scheme().len() > 1 {
350350
let path = object_store::path::Path::parse(url.path()).map_err(|e| {
@@ -376,7 +376,7 @@ pub async fn object_store_from_uri_or_path(
376376
let path = Path::parse(uri_or_path.as_ref()).map_err(|e| {
377377
PyIOError::new_err(format!("Invalid path `{}`: {}", uri_or_path.as_ref(), e))
378378
})?;
379-
let object_store = ObjectStore::local();
379+
let object_store = Arc::new(ObjectStore::local());
380380
Ok((object_store, path))
381381
}
382382

@@ -393,7 +393,7 @@ impl LanceFileReader {
393393
let (object_store, path) =
394394
object_store_from_uri_or_path(uri_or_path, storage_options).await?;
395395
let scheduler = ScanScheduler::new(
396-
Arc::new(object_store),
396+
object_store,
397397
SchedulerConfig {
398398
io_buffer_size_bytes: 2 * 1024 * 1024 * 1024,
399399
},

python/src/indices.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ pub fn transform_vectors(
242242
)?
243243
}
244244

245+
#[allow(deprecated)]
245246
async fn do_shuffle_transformed_vectors(
246247
unsorted_filenames: Vec<String>,
247248
dir_path: &str,

rust/lance-file/benches/reader.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@ fn bench_reader(c: &mut Criterion) {
3333

3434
let tempdir = tempfile::tempdir().unwrap();
3535
let test_path = tempdir.path();
36-
let (object_store, base_path) =
37-
ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap();
36+
let (object_store, base_path) = rt
37+
.block_on(ObjectStore::from_uri(
38+
test_path.as_os_str().to_str().unwrap(),
39+
))
40+
.unwrap();
41+
3842
let file_path = base_path.child("foo.lance");
3943
let object_writer = rt.block_on(object_store.create(&file_path)).unwrap();
4044

@@ -59,7 +63,7 @@ fn bench_reader(c: &mut Criterion) {
5963
let data = &data;
6064
rt.block_on(async move {
6165
let store_scheduler = ScanScheduler::new(
62-
Arc::new(object_store.clone()),
66+
object_store.clone(),
6367
SchedulerConfig::default_for_testing(),
6468
);
6569
let scheduler = store_scheduler.open_file(file_path).await.unwrap();
@@ -125,8 +129,11 @@ fn bench_random_access(c: &mut Criterion) {
125129

126130
let tempdir = tempfile::tempdir().unwrap();
127131
let test_path = tempdir.path();
128-
let (object_store, base_path) =
129-
ObjectStore::from_path(test_path.as_os_str().to_str().unwrap()).unwrap();
132+
let (object_store, base_path) = rt
133+
.block_on(ObjectStore::from_uri(
134+
test_path.as_os_str().to_str().unwrap(),
135+
))
136+
.unwrap();
130137
let file_path = base_path.child("foo.lance");
131138
let object_writer = rt.block_on(object_store.create(&file_path)).unwrap();
132139

@@ -150,10 +157,8 @@ fn bench_random_access(c: &mut Criterion) {
150157
let object_store = &object_store;
151158
let file_path = &file_path;
152159
let reader = rt.block_on(async move {
153-
let store_scheduler = ScanScheduler::new(
154-
Arc::new(object_store.clone()),
155-
SchedulerConfig::default_for_testing(),
156-
);
160+
let store_scheduler =
161+
ScanScheduler::new(object_store.clone(), SchedulerConfig::default_for_testing());
157162
let scheduler = store_scheduler.open_file(file_path).await.unwrap();
158163
Arc::new(
159164
FileReader::try_open(

rust/lance-file/src/reader.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,7 @@ mod tests {
766766
};
767767
use arrow_array::{BooleanArray, Int32Array};
768768
use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema};
769+
use lance_io::object_store::ObjectStoreParams;
769770

770771
#[tokio::test]
771772
async fn test_take() {
@@ -1364,8 +1365,17 @@ mod tests {
13641365

13651366
#[tokio::test]
13661367
async fn test_take_boolean_beyond_chunk() {
1367-
let mut store = ObjectStore::memory();
1368-
store.set_block_size(256);
1368+
let store = ObjectStore::from_uri_and_params(
1369+
Arc::new(Default::default()),
1370+
"memory://",
1371+
&ObjectStoreParams {
1372+
block_size: Some(256),
1373+
..Default::default()
1374+
},
1375+
)
1376+
.await
1377+
.unwrap()
1378+
.0;
13691379
let path = Path::from("/take_bools");
13701380

13711381
let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(

rust/lance-index/benches/inverted.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ fn bench_inverted(c: &mut Criterion) {
3434
let index_dir = Path::from_filesystem_path(tempdir.path()).unwrap();
3535
let store = rt.block_on(async {
3636
Arc::new(LanceIndexStore::new(
37-
ObjectStore::local(),
37+
Arc::new(ObjectStore::local()),
3838
index_dir,
3939
FileMetadataCache::no_cache(),
4040
))

rust/lance-index/benches/ngram.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ fn bench_ngram(c: &mut Criterion) {
3131
let index_dir = Path::from_filesystem_path(tempdir.path()).unwrap();
3232
let store = rt.block_on(async {
3333
Arc::new(LanceIndexStore::new(
34-
ObjectStore::local(),
34+
Arc::new(ObjectStore::local()),
3535
index_dir,
3636
FileMetadataCache::no_cache(),
3737
))

0 commit comments

Comments
 (0)