Skip to content

Commit 63d6370

Browse files
feat: User-configurable coalesce range (#627)
* feat: User-configurable coalesce range * Future proof docs * Lint * Specify default in Python * Create helper function * Update obstore/python/obstore/_get.pyi Co-authored-by: Kyle Barron <kylebarron2@gmail.com> * Apply suggestions from code review Co-authored-by: Kyle Barron <kylebarron2@gmail.com> --------- Co-authored-by: Kyle Barron <kylebarron2@gmail.com>
1 parent 344f56f commit 63d6370

4 files changed

Lines changed: 88 additions & 18 deletions

File tree

obstore/python/obstore/_get.pyi

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,12 +366,14 @@ def get_ranges(
366366
starts: Sequence[int],
367367
ends: Sequence[int] | None = None,
368368
lengths: Sequence[int] | None = None,
369+
coalesce: int = 1024 * 1024,
369370
) -> list[Bytes]:
370371
"""Return the bytes stored at the specified location in the given byte ranges.
371372
372373
To improve performance this will:
373374
374-
- Transparently combine ranges less than 1MB apart into a single underlying request
375+
- Transparently combine ranges less than `coalesce` bytes apart into a single
376+
underlying request (defaults to 1MB)
375377
- Make multiple `fetch` requests in parallel (up to maximum of 10)
376378
377379
Args:
@@ -382,6 +384,7 @@ def get_ranges(
382384
starts: A sequence of `int` where each offset starts.
383385
ends: A sequence of `int` where each offset ends (exclusive). Either `ends` or `lengths` must be non-None.
384386
lengths: A sequence of `int` with the number of bytes of each byte range. Either `ends` or `lengths` must be non-None.
387+
coalesce: Maximum distance in bytes between ranges that will be coalesced into a single request. Defaults to 1MiB. Set to `0` to disable coalescing.
385388
386389
Returns:
387390
A sequence of `Bytes`, one for each range. This `Bytes` object implements the
@@ -397,6 +400,7 @@ async def get_ranges_async(
397400
starts: Sequence[int],
398401
ends: Sequence[int] | None = None,
399402
lengths: Sequence[int] | None = None,
403+
coalesce: int = 1024 * 1024,
400404
) -> list[Bytes]:
401405
"""Call `get_ranges` asynchronously.
402406

obstore/python/obstore/store.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ def get_ranges(
228228
starts: Sequence[int],
229229
ends: Sequence[int] | None = None,
230230
lengths: Sequence[int] | None = None,
231+
coalesce: int = 1024 * 1024,
231232
) -> list[Bytes]:
232233
"""Return the bytes stored at the specified location in the given byte ranges.
233234
@@ -239,6 +240,7 @@ def get_ranges(
239240
starts=starts,
240241
ends=ends,
241242
lengths=lengths,
243+
coalesce=coalesce,
242244
)
243245

244246
async def get_ranges_async(
@@ -248,6 +250,7 @@ async def get_ranges_async(
248250
starts: Sequence[int],
249251
ends: Sequence[int] | None = None,
250252
lengths: Sequence[int] | None = None,
253+
coalesce: int = 1024 * 1024,
251254
) -> list[Bytes]:
252255
"""Call `get_ranges` asynchronously.
253256
@@ -259,6 +262,7 @@ async def get_ranges_async(
259262
starts=starts,
260263
ends=ends,
261264
lengths=lengths,
265+
coalesce=coalesce,
262266
)
263267

264268
def head(self, path: str) -> ObjectMeta:

obstore/src/get.rs

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use chrono::{DateTime, Utc};
77
use futures::stream::{BoxStream, Fuse};
88
use futures::StreamExt;
99
use object_store::{
10-
Attributes, GetOptions, GetRange, GetResult, ObjectMeta, ObjectStore, ObjectStoreExt,
10+
coalesce_ranges, Attributes, GetOptions, GetRange, GetResult, ObjectMeta, ObjectStore,
11+
ObjectStoreExt, OBJECT_STORE_COALESCE_DEFAULT,
1112
};
1213
use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError};
1314
use pyo3::prelude::*;
@@ -430,45 +431,51 @@ fn params_to_range(
430431
}
431432
}
432433

434+
async fn _get_ranges(
435+
store: PyObjectStore,
436+
path: PyPath,
437+
ranges: &[Range<u64>],
438+
coalesce: u64,
439+
) -> PyObjectStoreResult<Vec<PyBytes>> {
440+
let out = coalesce_ranges(
441+
ranges,
442+
|range| store.as_ref().get_range(path.as_ref(), range),
443+
coalesce,
444+
)
445+
.await?;
446+
Ok(out.into_iter().map(|buf| buf.into()).collect())
447+
}
448+
433449
#[pyfunction]
434-
#[pyo3(signature = (store, path, *, starts, ends=None, lengths=None))]
450+
#[pyo3(signature = (store, path, *, starts, ends=None, lengths=None, coalesce=OBJECT_STORE_COALESCE_DEFAULT))]
435451
pub(crate) fn get_ranges(
436452
py: Python,
437453
store: PyObjectStore,
438454
path: PyPath,
439455
starts: Vec<u64>,
440456
ends: Option<Vec<u64>>,
441457
lengths: Option<Vec<u64>>,
442-
) -> PyObjectStoreResult<Vec<pyo3_bytes::PyBytes>> {
458+
coalesce: u64,
459+
) -> PyObjectStoreResult<Vec<PyBytes>> {
443460
let runtime = get_runtime();
444461
let ranges = params_to_ranges(starts, ends, lengths)?;
445-
py.detach(|| {
446-
let out = runtime.block_on(store.as_ref().get_ranges(path.as_ref(), &ranges))?;
447-
Ok::<_, PyObjectStoreError>(out.into_iter().map(|buf| buf.into()).collect())
448-
})
462+
py.detach(|| runtime.block_on(_get_ranges(store, path, &ranges, coalesce)))
449463
}
450464

451465
#[pyfunction]
452-
#[pyo3(signature = (store, path, *, starts, ends=None, lengths=None))]
466+
#[pyo3(signature = (store, path, *, starts, ends=None, lengths=None, coalesce=OBJECT_STORE_COALESCE_DEFAULT))]
453467
pub(crate) fn get_ranges_async(
454468
py: Python,
455469
store: PyObjectStore,
456470
path: PyPath,
457471
starts: Vec<u64>,
458472
ends: Option<Vec<u64>>,
459473
lengths: Option<Vec<u64>>,
474+
coalesce: u64,
460475
) -> PyResult<Bound<PyAny>> {
461476
let ranges = params_to_ranges(starts, ends, lengths)?;
462477
pyo3_async_runtimes::tokio::future_into_py(py, async move {
463-
let out = store
464-
.as_ref()
465-
.get_ranges(path.as_ref(), &ranges)
466-
.await
467-
.map_err(PyObjectStoreError::ObjectStoreError)?;
468-
Ok(out
469-
.into_iter()
470-
.map(pyo3_bytes::PyBytes::new)
471-
.collect::<Vec<_>>())
478+
Ok(_get_ranges(store, path, &ranges, coalesce).await?)
472479
})
473480
}
474481

tests/test_get.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import pytest
24

35
from obstore.store import MemoryStore
@@ -136,6 +138,59 @@ def test_get_ranges():
136138
assert memoryview(buffer) == data[start:end]
137139

138140

141+
COALESCE_CASES = [
142+
# (starts, ends, coalesce) — close ranges
143+
([5, 10, 15, 20], [15, 20, 25, 30], 0),
144+
([5, 10, 15, 20], [15, 20, 25, 30], 1024 * 1024),
145+
# widely-spaced ranges
146+
([0, 1000, 2000, 3000], [10, 1010, 2010, 3010], 0),
147+
([0, 1000, 2000, 3000], [10, 1010, 2010, 3010], 500),
148+
([0, 1000, 2000, 3000], [10, 1010, 2010, 3010], 2000),
149+
]
150+
151+
152+
@pytest.mark.parametrize(("starts", "ends", "coalesce"), COALESCE_CASES)
153+
def test_get_ranges_with_coalesce(
154+
starts: list[int],
155+
ends: list[int],
156+
coalesce: int,
157+
):
158+
store = MemoryStore()
159+
160+
data = b"the quick brown fox jumps over the lazy dog," * 100
161+
path = "big-data.txt"
162+
163+
store.put(path, data)
164+
165+
buffers = store.get_ranges(path, starts=starts, ends=ends, coalesce=coalesce)
166+
for start, end, buffer in zip(starts, ends, buffers):
167+
assert memoryview(buffer) == data[start:end]
168+
169+
170+
@pytest.mark.asyncio
171+
@pytest.mark.parametrize(("starts", "ends", "coalesce"), COALESCE_CASES)
172+
async def test_get_ranges_async_with_coalesce(
173+
starts: list[int],
174+
ends: list[int],
175+
coalesce: int,
176+
):
177+
store = MemoryStore()
178+
179+
data = b"the quick brown fox jumps over the lazy dog," * 100
180+
path = "big-data.txt"
181+
182+
await store.put_async(path, data)
183+
184+
buffers = await store.get_ranges_async(
185+
path,
186+
starts=starts,
187+
ends=ends,
188+
coalesce=coalesce,
189+
)
190+
for start, end, buffer in zip(starts, ends, buffers):
191+
assert memoryview(buffer) == data[start:end]
192+
193+
139194
def test_get_range_invalid_range():
140195
store = MemoryStore()
141196

0 commit comments

Comments
 (0)