Skip to content

Commit e592a0b

Browse files
Benchmark coordinate columns for spatial filters. (#40)
1 parent cbb4165 commit e592a0b

7 files changed

Lines changed: 249 additions & 15 deletions

File tree

Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,13 @@ test = false
4949
name = "bbox_local"
5050
harness = false
5151
test = false
52+
53+
[[bench]]
54+
name = "bbox_columns_local"
55+
harness = false
56+
test = false
57+
58+
[[bench]]
59+
name = "bbox_columns_s3"
60+
harness = false
61+
test = false

benches/bbox_columns_local.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
mod common;
2+
3+
use common::{
4+
ArraysToGenerate, BBOX_COLUMNS_SQL, generate_icechunk_store_local, run_bench,
5+
run_memory_profile,
6+
};
7+
use criterion::{Criterion, criterion_group, criterion_main};
8+
use datafusion::prelude::SessionContext;
9+
use std::sync::Arc;
10+
use tokio::runtime::Runtime;
11+
use zarr_datafusion_search::table_provider::ZarrTableProvider;
12+
13+
fn bbox_columns_bench_local(c: &mut Criterion) {
14+
let rt = Runtime::new().unwrap();
15+
let (session, _temp_dir) =
16+
generate_icechunk_store_local(&rt, ArraysToGenerate::BboxColumns).unwrap();
17+
let table_provider = Arc::new(
18+
rt.block_on(ZarrTableProvider::new_icechunk(session, "/meta"))
19+
.unwrap(),
20+
);
21+
22+
let ctx = SessionContext::new();
23+
geodatafusion::register(&ctx);
24+
ctx.register_table("zarr_data", table_provider).unwrap();
25+
26+
run_memory_profile(&rt, &ctx, BBOX_COLUMNS_SQL);
27+
run_bench(
28+
c,
29+
&rt,
30+
&ctx,
31+
"bbox_columns_bench_local",
32+
"bbox_columns_bench_local",
33+
BBOX_COLUMNS_SQL,
34+
);
35+
}
36+
37+
criterion_group!(benches_local, bbox_columns_bench_local);
38+
criterion_main!(benches_local);

benches/bbox_columns_s3.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
mod common;
2+
3+
use common::{BBOX_COLUMNS_SQL, run_bench};
4+
use criterion::{Criterion, criterion_group, criterion_main};
5+
use datafusion::prelude::SessionContext;
6+
use icechunk::{ObjectStorage, Repository, repository::VersionInfo};
7+
use std::collections::HashMap;
8+
use std::sync::Arc;
9+
use tokio::runtime::Runtime;
10+
use zarr_datafusion_search::table_provider::ZarrTableProvider;
11+
12+
fn bbox_columns_bench_s3(c: &mut Criterion) {
13+
let bucket = "zarr-datafusion-search".to_string();
14+
let prefix = "".to_string();
15+
16+
let rt = Runtime::new().unwrap();
17+
let storage = rt
18+
.block_on(ObjectStorage::new_s3(
19+
bucket,
20+
Some(prefix),
21+
None, // credentials - uses default AWS credential chain
22+
None, // config - uses default S3 options
23+
))
24+
.unwrap();
25+
let repo = rt
26+
.block_on(Repository::open_or_create(
27+
None,
28+
Arc::new(storage),
29+
HashMap::new(),
30+
))
31+
.unwrap();
32+
let session = rt
33+
.block_on(repo.readonly_session(&VersionInfo::BranchTipRef("main".to_string())))
34+
.unwrap();
35+
36+
let table_provider = Arc::new(
37+
rt.block_on(ZarrTableProvider::new_icechunk(session, "/meta"))
38+
.unwrap(),
39+
);
40+
41+
let ctx = SessionContext::new();
42+
ctx.register_table("zarr_data", table_provider).unwrap();
43+
run_bench(
44+
c,
45+
&rt,
46+
&ctx,
47+
"bbox_columns_bench_s3",
48+
"bbox_columns_bench_s3",
49+
BBOX_COLUMNS_SQL,
50+
);
51+
}
52+
53+
criterion_group!(benches_s3, bbox_columns_bench_s3);
54+
criterion_main!(benches_s3);

benches/bbox_local.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ use zarr_datafusion_search::table_provider::ZarrTableProvider;
1111

1212
fn bbox_bench_local(c: &mut Criterion) {
1313
let rt = Runtime::new().unwrap();
14-
let (session, _temp_dir) =
15-
generate_icechunk_store_local(&rt, ArraysToGenerate::BboxOnly).unwrap();
14+
let (session, _temp_dir) = generate_icechunk_store_local(&rt, ArraysToGenerate::Bbox).unwrap();
1615
let table_provider = Arc::new(
1716
rt.block_on(ZarrTableProvider::new_icechunk(session, "/meta"))
1817
.unwrap(),

benches/common/mod.rs

Lines changed: 115 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use zarrs::metadata_ext::data_type::NumpyTimeUnit;
2121
use zarrs_icechunk::AsyncIcechunkStore;
2222

2323
mod sentinel2_geometry;
24-
use sentinel2_geometry::generate_wkb_polygons;
24+
use sentinel2_geometry::{generate_bbox_columns, generate_wkb_polygons};
2525

2626
#[global_allocator]
2727
static ALLOC: dhat::Alloc = dhat::Alloc;
@@ -32,9 +32,10 @@ const CHUNK_SIZE: u64 = 1_000_000; // 1M elements per chunk
3232

3333
#[derive(Debug, Clone, Copy)]
3434
pub enum ArraysToGenerate {
35-
DatetimeOnly,
36-
BboxOnly,
37-
Both,
35+
Datetime,
36+
Bbox,
37+
BboxColumns,
38+
All,
3839
}
3940

4041
// This generates:
@@ -84,10 +85,7 @@ fn generate_icechunk_store(
8485
let array_shape = vec![date_data.len() as u64];
8586
let chunk_shape = vec![CHUNK_SIZE];
8687

87-
if matches!(
88-
arrays,
89-
ArraysToGenerate::DatetimeOnly | ArraysToGenerate::Both
90-
) {
88+
if matches!(arrays, ArraysToGenerate::Datetime | ArraysToGenerate::All) {
9189
let date_blosc_codec: Arc<dyn zarrs::array::codec::BytesToBytesCodecTraits> = Arc::new(
9290
BloscCodec::new(
9391
BloscCompressor::Zstd,
@@ -119,7 +117,7 @@ fn generate_icechunk_store(
119117
))?;
120118
}
121119

122-
if matches!(arrays, ArraysToGenerate::BboxOnly | ArraysToGenerate::Both) {
120+
if matches!(arrays, ArraysToGenerate::Bbox) {
123121
let bbox_data = generate_wkb_polygons(array_shape[0] as usize);
124122

125123
let bbox_blosc_codec: Arc<dyn zarrs::array::codec::BytesToBytesCodecTraits> = Arc::new(
@@ -150,6 +148,88 @@ fn generate_icechunk_store(
150148
))?;
151149
}
152150

151+
if matches!(
152+
arrays,
153+
ArraysToGenerate::BboxColumns | ArraysToGenerate::All
154+
) {
155+
let (xmin, xmax, ymin, ymax) = generate_bbox_columns(array_shape[0] as usize);
156+
157+
let f64_blosc_codec: Arc<dyn zarrs::array::codec::BytesToBytesCodecTraits> = Arc::new(
158+
BloscCodec::new(
159+
BloscCompressor::Zstd,
160+
BloscCompressionLevel::try_from(9).unwrap(),
161+
None,
162+
BloscShuffleMode::NoShuffle,
163+
None,
164+
)
165+
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?,
166+
);
167+
168+
// Create and store xmin array
169+
let xmin_array = ArrayBuilder::new(
170+
array_shape.clone(),
171+
chunk_shape.clone(),
172+
DataType::Float64,
173+
FillValue::from(0.0f64),
174+
)
175+
.bytes_to_bytes_codecs(vec![f64_blosc_codec.clone()])
176+
.build(store.clone(), "/meta/xmin")?;
177+
178+
rt.block_on(xmin_array.async_store_metadata())?;
179+
rt.block_on(xmin_array.async_store_array_subset_elements(
180+
&ArraySubset::new_with_shape(array_shape.clone()),
181+
&xmin,
182+
))?;
183+
184+
// Create and store xmax array
185+
let xmax_array = ArrayBuilder::new(
186+
array_shape.clone(),
187+
chunk_shape.clone(),
188+
DataType::Float64,
189+
FillValue::from(0.0f64),
190+
)
191+
.bytes_to_bytes_codecs(vec![f64_blosc_codec.clone()])
192+
.build(store.clone(), "/meta/xmax")?;
193+
194+
rt.block_on(xmax_array.async_store_metadata())?;
195+
rt.block_on(xmax_array.async_store_array_subset_elements(
196+
&ArraySubset::new_with_shape(array_shape.clone()),
197+
&xmax,
198+
))?;
199+
200+
// Create and store ymin array
201+
let ymin_array = ArrayBuilder::new(
202+
array_shape.clone(),
203+
chunk_shape.clone(),
204+
DataType::Float64,
205+
FillValue::from(0.0f64),
206+
)
207+
.bytes_to_bytes_codecs(vec![f64_blosc_codec.clone()])
208+
.build(store.clone(), "/meta/ymin")?;
209+
210+
rt.block_on(ymin_array.async_store_metadata())?;
211+
rt.block_on(ymin_array.async_store_array_subset_elements(
212+
&ArraySubset::new_with_shape(array_shape.clone()),
213+
&ymin,
214+
))?;
215+
216+
// Create and store ymax array
217+
let ymax_array = ArrayBuilder::new(
218+
array_shape.clone(),
219+
chunk_shape.clone(),
220+
DataType::Float64,
221+
FillValue::from(0.0f64),
222+
)
223+
.bytes_to_bytes_codecs(vec![f64_blosc_codec])
224+
.build(store.clone(), "/meta/ymax")?;
225+
226+
rt.block_on(ymax_array.async_store_metadata())?;
227+
rt.block_on(ymax_array.async_store_array_subset_elements(
228+
&ArraySubset::new_with_shape(array_shape.clone()),
229+
&ymax,
230+
))?;
231+
}
232+
153233
rt.block_on(async {
154234
store
155235
.session()
@@ -176,6 +256,21 @@ pub fn generate_icechunk_store_local(
176256
Ok((session, temp_dir))
177257
}
178258

259+
pub fn generate_icechunk_store_s3(
260+
rt: &Runtime,
261+
bucket: String,
262+
prefix: String,
263+
) -> Result<Session, Box<dyn std::error::Error>> {
264+
let storage = rt.block_on(ObjectStorage::new_s3(
265+
bucket,
266+
Some(prefix),
267+
None, // credentials - uses default AWS credential chain
268+
None, // config - uses default S3 options
269+
))?;
270+
let session = generate_icechunk_store(rt, Arc::new(storage), ArraysToGenerate::All)?;
271+
Ok(session)
272+
}
273+
179274
pub fn run_bench(
180275
c: &mut Criterion,
181276
rt: &Runtime,
@@ -194,7 +289,10 @@ pub fn run_bench(
194289
group.bench_function(bench_name, |b| {
195290
b.to_async(rt).iter(|| async {
196291
let df = ctx.sql(black_box(sql)).await.unwrap();
197-
df.collect().await.unwrap()
292+
let batches = df.collect().await.unwrap();
293+
// let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
294+
// println!("Query returned {} rows", row_count);
295+
batches
198296
});
199297
});
200298

@@ -218,11 +316,16 @@ pub fn run_memory_profile(rt: &Runtime, ctx: &SessionContext, sql: &str) {
218316

219317
pub static DATETIME_SQL: &str = "\
220318
SELECT date FROM zarr_data WHERE \
221-
date < CAST('2025-10-11' AS DATE) \
222-
and date > CAST('2025-09-01' AS DATE)\
319+
date < CAST('2025-01-01' AS DATE) \
320+
and date > CAST('2024-12-25' AS DATE)\
223321
";
224322

225323
pub static BBOX_SQL: &str = "\
226324
SELECT bbox FROM zarr_data \
227325
WHERE ST_Intersects(bbox, ST_GeomFromText('POLYGON((0 -7, 0 7, 5 7, 5 -7, 0 -7))')) \
228326
";
327+
328+
pub static BBOX_COLUMNS_SQL: &str = "\
329+
SELECT xmin, xmax, ymin, ymax FROM zarr_data \
330+
WHERE xmin <= 5 AND xmax >= 0 AND ymin <= 7 AND ymax >= -7 \
331+
";

benches/common/sentinel2_geometry.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,33 @@ pub fn generate_wkb_polygons(n: usize) -> Vec<Vec<u8>> {
115115

116116
out
117117
}
118+
119+
/// Generate `n` sets of bounding box coordinates (xmin, xmax, ymin, ymax)
120+
/// from randomly sampled orbital tiles.
121+
///
122+
/// Returns a tuple of four vectors:
123+
/// - xmin: minimum longitude values
124+
/// - xmax: maximum longitude values
125+
/// - ymin: minimum latitude values
126+
/// - ymax: maximum latitude values
127+
pub fn generate_bbox_columns(n: usize) -> (Vec<f64>, Vec<f64>, Vec<f64>, Vec<f64>) {
128+
let tiles = build_tile_table();
129+
let n_tiles = tiles.len();
130+
assert!(n_tiles > 0);
131+
132+
let mut rng = rand::thread_rng();
133+
let mut xmin = Vec::with_capacity(n);
134+
let mut xmax = Vec::with_capacity(n);
135+
let mut ymin = Vec::with_capacity(n);
136+
let mut ymax = Vec::with_capacity(n);
137+
138+
for _ in 0..n {
139+
let tile = &tiles[rng.gen_range(0..n_tiles)];
140+
xmin.push(tile.min().x);
141+
xmax.push(tile.max().x);
142+
ymin.push(tile.min().y);
143+
ymax.push(tile.max().y);
144+
}
145+
146+
(xmin, xmax, ymin, ymax)
147+
}

benches/datetime_local.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use zarr_datafusion_search::table_provider::ZarrTableProvider;
1212
fn datetime_bench_local(c: &mut Criterion) {
1313
let rt = Runtime::new().unwrap();
1414
let (session, _temp_dir) =
15-
generate_icechunk_store_local(&rt, ArraysToGenerate::DatetimeOnly).unwrap();
15+
generate_icechunk_store_local(&rt, ArraysToGenerate::Datetime).unwrap();
1616
let table_provider = Arc::new(
1717
rt.block_on(ZarrTableProvider::new_icechunk(session, "/meta"))
1818
.unwrap(),

0 commit comments

Comments
 (0)