Skip to content

Commit 772ae44

Browse files
Add support for geoindex rtrees. (#42)
* Add geoindex rtree generation to benchmarks. * Remove randomization from geometry generation. * Initial pass at spatial rtree utilization. * Remove unneeded column skipping and ensure projection columns are kept. * Refactor process_chunk_with_row_filter_async to reduce aray scan. * Refactor geometry expression parsing. * Add geoindex option to test store. * Use explict members of the ArraysToGenerate enum rather than all. * Include low level wkb benchmark that demonstrates slow vlen decoding. * Formatting fixes. * Bbox S3 benchmark. * Use geoindex utility to downcast bbox during rtree construction. * Make all imports top level. * Use MEGA_BYTE constant. * Use geoindex bbox casting utility for tests. * Move all imports to top level. * Refactor bespoke wkt parsing logic to use wkt crate. * Include test for invalid rtree index error handling.
1 parent 41f7b3e commit 772ae44

16 files changed

Lines changed: 1044 additions & 106 deletions

Cargo.lock

Lines changed: 32 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ arrow-schema = "57.0.0"
1010
async-trait = "0.1.89"
1111
datafusion = "52.0"
1212
futures = "0.3.31"
13+
geo-index = "0.2"
14+
geo-traits = "0.3"
1315
geodatafusion = "0.3"
16+
wkt = { version = "0.14", default-features = false }
1417
geoarrow-schema = "0.7.0"
1518
icechunk = "0.3.16"
1619
object_store = "0.12.4"
@@ -29,6 +32,7 @@ chrono = "0.4.44"
2932
criterion = { version = "0.5.1", features = ["async_tokio"] }
3033
dhat = "0.3"
3134
geo = "0.31"
35+
geo-index = "0.2"
3236
geoarrow-array = "0.7.0"
3337
rand = "0.8"
3438
tempfile = "3"
@@ -59,3 +63,10 @@ test = false
5963
name = "bbox_columns_s3"
6064
harness = false
6165
test = false
66+
67+
[[bench]]
68+
name = "bbox_zarrs_raw"
69+
harness = false
70+
test = false
71+
72+

benches/bbox_columns_local.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use zarr_datafusion_search::table_provider::ZarrTableProvider;
1313
fn bbox_columns_bench_local(c: &mut Criterion) {
1414
let rt = Runtime::new().unwrap();
1515
let (session, _temp_dir) =
16-
generate_icechunk_store_local(&rt, ArraysToGenerate::BboxColumns).unwrap();
16+
generate_icechunk_store_local(&rt, &[ArraysToGenerate::BboxColumns]).unwrap();
1717
let table_provider = Arc::new(
1818
rt.block_on(ZarrTableProvider::new_icechunk(session, "/meta"))
1919
.unwrap(),

benches/bbox_local.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,15 @@ use zarr_datafusion_search::table_provider::ZarrTableProvider;
1111

1212
fn bbox_bench_local(c: &mut Criterion) {
1313
let rt = Runtime::new().unwrap();
14-
let (session, _temp_dir) = generate_icechunk_store_local(&rt, ArraysToGenerate::Bbox).unwrap();
14+
let (session, _temp_dir) = generate_icechunk_store_local(
15+
&rt,
16+
&[
17+
ArraysToGenerate::Datetime,
18+
ArraysToGenerate::Bbox,
19+
ArraysToGenerate::RtreeIndex,
20+
],
21+
)
22+
.unwrap();
1523
let table_provider = Arc::new(
1624
rt.block_on(ZarrTableProvider::new_icechunk(session, "/meta"))
1725
.unwrap(),

benches/bbox_s3.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
mod common;
2+
3+
use common::{BBOX_SQL, run_bench};
4+
use criterion::{Criterion, criterion_group, criterion_main};
5+
use datafusion::prelude::SessionContext;
6+
use icechunk::{ObjectStorage, Repository, repository::VersionInfo};
7+
use std::collections::HashMap;
8+
use std::sync::Arc;
9+
use tokio::runtime::Runtime;
10+
use zarr_datafusion_search::table_provider::ZarrTableProvider;
11+
12+
fn bbox_bench_s3(c: &mut Criterion) {
13+
let bucket = "zarr-datafusion-search".to_string();
14+
let prefix = "".to_string();
15+
16+
let rt = Runtime::new().unwrap();
17+
let storage = rt
18+
.block_on(ObjectStorage::new_s3(
19+
bucket,
20+
Some(prefix),
21+
None, // credentials - uses default AWS credential chain
22+
None, // config - uses default S3 options
23+
))
24+
.unwrap();
25+
let repo = rt
26+
.block_on(Repository::open_or_create(
27+
None,
28+
Arc::new(storage),
29+
HashMap::new(),
30+
))
31+
.unwrap();
32+
let session = rt
33+
.block_on(repo.readonly_session(&VersionInfo::BranchTipRef("main".to_string())))
34+
.unwrap();
35+
36+
let table_provider = Arc::new(
37+
rt.block_on(ZarrTableProvider::new_icechunk(session, "/meta"))
38+
.unwrap(),
39+
);
40+
41+
let ctx = SessionContext::new();
42+
ctx.register_table("zarr_data", table_provider).unwrap();
43+
run_bench(c, &rt, &ctx, "bbox_bench_s3", "bbox_bench_s3", BBOX_SQL);
44+
}
45+
46+
criterion_group!(benches_s3, bbox_bench_s3);
47+
criterion_main!(benches_s3);

benches/bbox_zarrs_raw.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
mod common;
2+
3+
use common::{ArraysToGenerate, generate_icechunk_store_local};
4+
use criterion::{Criterion, SamplingMode, criterion_group, criterion_main};
5+
use std::sync::Arc;
6+
use tokio::runtime::Runtime;
7+
use zarrs::array::Array;
8+
use zarrs_icechunk::AsyncIcechunkStore;
9+
10+
fn bbox_zarrs_raw_bench(c: &mut Criterion) {
11+
let rt = Runtime::new().unwrap();
12+
let (session, _temp_dir) =
13+
generate_icechunk_store_local(&rt, &[ArraysToGenerate::Bbox]).unwrap();
14+
let store = Arc::new(AsyncIcechunkStore::new(session));
15+
16+
// Criterion benchmark
17+
let mut group = c.benchmark_group("bbox_zarrs_raw");
18+
group.sample_size(10);
19+
group.sampling_mode(SamplingMode::Flat);
20+
group.warm_up_time(std::time::Duration::from_secs(1));
21+
group.measurement_time(std::time::Duration::from_secs(2));
22+
23+
group.bench_function("fetch_all_bbox_chunks", |b| {
24+
b.to_async(&rt).iter(|| async {
25+
let bbox_array = Array::async_open(store.clone(), "/meta/bbox")
26+
.await
27+
.unwrap();
28+
29+
let chunk_grid_shape = bbox_array.chunk_grid_shape();
30+
let num_chunks: u64 = chunk_grid_shape.iter().product();
31+
32+
let mut total_rows = 0usize;
33+
let max_rows = 3_000_000;
34+
35+
for chunk_idx in 0..num_chunks {
36+
if total_rows >= max_rows {
37+
break;
38+
}
39+
40+
let chunk_indices = vec![chunk_idx];
41+
let subset = bbox_array.chunk_subset_bounded(&chunk_indices).unwrap();
42+
43+
let data: Vec<Vec<u8>> = bbox_array
44+
.async_retrieve_array_subset_elements(&subset)
45+
.await
46+
.unwrap();
47+
48+
total_rows += data.len();
49+
}
50+
51+
total_rows.min(max_rows)
52+
});
53+
});
54+
55+
group.finish();
56+
}
57+
58+
criterion_group!(benches, bbox_zarrs_raw_bench);
59+
criterion_main!(benches);

0 commit comments

Comments
 (0)