Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion obstore/python/obstore/_get.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,14 @@ def get_ranges(
starts: Sequence[int],
ends: Sequence[int] | None = None,
lengths: Sequence[int] | None = None,
coalesce: int = 1024 * 1024,
) -> list[Bytes]:
"""Return the bytes stored at the specified location in the given byte ranges.

To improve performance this will:

- Transparently combine ranges less than 1MB apart into a single underlying request
- Transparently combine ranges less than `coalesce` bytes apart into a single
underlying request (defaults to 1MB)
- Make multiple `fetch` requests in parallel (up to maximum of 10)

Args:
Expand All @@ -382,6 +384,7 @@ def get_ranges(
starts: A sequence of `int` where each offset starts.
ends: A sequence of `int` where each offset ends (exclusive). Either `ends` or `lengths` must be non-None.
lengths: A sequence of `int` with the number of bytes of each byte range. Either `ends` or `lengths` must be non-None.
coalesce: Maximum distance in bytes between ranges that will be coalesced into a single request. Defaults to 1MiB. Set to `0` to disable coalescing.

Returns:
A sequence of `Bytes`, one for each range. This `Bytes` object implements the
Expand All @@ -397,6 +400,7 @@ async def get_ranges_async(
starts: Sequence[int],
ends: Sequence[int] | None = None,
lengths: Sequence[int] | None = None,
coalesce: int = 1024 * 1024,
) -> list[Bytes]:
"""Call `get_ranges` asynchronously.

Expand Down
4 changes: 4 additions & 0 deletions obstore/python/obstore/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def get_ranges(
starts: Sequence[int],
ends: Sequence[int] | None = None,
lengths: Sequence[int] | None = None,
coalesce: int = 1024 * 1024,
) -> list[Bytes]:
"""Return the bytes stored at the specified location in the given byte ranges.

Expand All @@ -239,6 +240,7 @@ def get_ranges(
starts=starts,
ends=ends,
lengths=lengths,
coalesce=coalesce,
)

async def get_ranges_async(
Expand All @@ -248,6 +250,7 @@ async def get_ranges_async(
starts: Sequence[int],
ends: Sequence[int] | None = None,
lengths: Sequence[int] | None = None,
coalesce: int = 1024 * 1024,
) -> list[Bytes]:
"""Call `get_ranges` asynchronously.

Expand All @@ -259,6 +262,7 @@ async def get_ranges_async(
starts=starts,
ends=ends,
lengths=lengths,
coalesce=coalesce,
)

def head(self, path: str) -> ObjectMeta:
Expand Down
41 changes: 24 additions & 17 deletions obstore/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use chrono::{DateTime, Utc};
use futures::stream::{BoxStream, Fuse};
use futures::StreamExt;
use object_store::{
Attributes, GetOptions, GetRange, GetResult, ObjectMeta, ObjectStore, ObjectStoreExt,
coalesce_ranges, Attributes, GetOptions, GetRange, GetResult, ObjectMeta, ObjectStore,
ObjectStoreExt, OBJECT_STORE_COALESCE_DEFAULT,
};
use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError};
use pyo3::prelude::*;
Expand Down Expand Up @@ -430,45 +431,51 @@ fn params_to_range(
}
}

async fn _get_ranges(
store: PyObjectStore,
path: PyPath,
ranges: &[Range<u64>],
coalesce: u64,
) -> PyObjectStoreResult<Vec<PyBytes>> {
let out = coalesce_ranges(
ranges,
|range| store.as_ref().get_range(path.as_ref(), range),
coalesce,
)
.await?;
Ok(out.into_iter().map(|buf| buf.into()).collect())
}

#[pyfunction]
#[pyo3(signature = (store, path, *, starts, ends=None, lengths=None))]
#[pyo3(signature = (store, path, *, starts, ends=None, lengths=None, coalesce=OBJECT_STORE_COALESCE_DEFAULT))]
pub(crate) fn get_ranges(
py: Python,
store: PyObjectStore,
path: PyPath,
starts: Vec<u64>,
ends: Option<Vec<u64>>,
lengths: Option<Vec<u64>>,
) -> PyObjectStoreResult<Vec<pyo3_bytes::PyBytes>> {
coalesce: u64,
) -> PyObjectStoreResult<Vec<PyBytes>> {
let runtime = get_runtime();
let ranges = params_to_ranges(starts, ends, lengths)?;
py.detach(|| {
let out = runtime.block_on(store.as_ref().get_ranges(path.as_ref(), &ranges))?;
Ok::<_, PyObjectStoreError>(out.into_iter().map(|buf| buf.into()).collect())
})
py.detach(|| runtime.block_on(_get_ranges(store, path, &ranges, coalesce)))
}

#[pyfunction]
#[pyo3(signature = (store, path, *, starts, ends=None, lengths=None))]
#[pyo3(signature = (store, path, *, starts, ends=None, lengths=None, coalesce=OBJECT_STORE_COALESCE_DEFAULT))]
pub(crate) fn get_ranges_async(
py: Python,
store: PyObjectStore,
path: PyPath,
starts: Vec<u64>,
ends: Option<Vec<u64>>,
lengths: Option<Vec<u64>>,
coalesce: u64,
) -> PyResult<Bound<PyAny>> {
let ranges = params_to_ranges(starts, ends, lengths)?;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let out = store
.as_ref()
.get_ranges(path.as_ref(), &ranges)
.await
.map_err(PyObjectStoreError::ObjectStoreError)?;
Ok(out
.into_iter()
.map(pyo3_bytes::PyBytes::new)
.collect::<Vec<_>>())
Ok(_get_ranges(store, path, &ranges, coalesce).await?)
})
}

Expand Down
55 changes: 55 additions & 0 deletions tests/test_get.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import pytest

from obstore.store import MemoryStore
Expand Down Expand Up @@ -136,6 +138,59 @@ def test_get_ranges():
assert memoryview(buffer) == data[start:end]


COALESCE_CASES = [
# (starts, ends, coalesce) — close ranges
([5, 10, 15, 20], [15, 20, 25, 30], 0),
([5, 10, 15, 20], [15, 20, 25, 30], 1024 * 1024),
# widely-spaced ranges
([0, 1000, 2000, 3000], [10, 1010, 2010, 3010], 0),
([0, 1000, 2000, 3000], [10, 1010, 2010, 3010], 500),
([0, 1000, 2000, 3000], [10, 1010, 2010, 3010], 2000),
]


@pytest.mark.parametrize(("starts", "ends", "coalesce"), COALESCE_CASES)
def test_get_ranges_with_coalesce(
starts: list[int],
ends: list[int],
coalesce: int,
):
store = MemoryStore()

data = b"the quick brown fox jumps over the lazy dog," * 100
path = "big-data.txt"

store.put(path, data)

buffers = store.get_ranges(path, starts=starts, ends=ends, coalesce=coalesce)
for start, end, buffer in zip(starts, ends, buffers):
assert memoryview(buffer) == data[start:end]


@pytest.mark.asyncio
@pytest.mark.parametrize(("starts", "ends", "coalesce"), COALESCE_CASES)
async def test_get_ranges_async_with_coalesce(
starts: list[int],
ends: list[int],
coalesce: int,
):
store = MemoryStore()

data = b"the quick brown fox jumps over the lazy dog," * 100
path = "big-data.txt"

await store.put_async(path, data)

buffers = await store.get_ranges_async(
path,
starts=starts,
ends=ends,
coalesce=coalesce,
)
for start, end, buffer in zip(starts, ends, buffers):
assert memoryview(buffer) == data[start:end]


def test_get_range_invalid_range():
store = MemoryStore()

Expand Down