Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion obstore/python/obstore/_get.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,14 @@ def get_ranges(
starts: Sequence[int],
ends: Sequence[int] | None = None,
lengths: Sequence[int] | None = None,
coalesce: int = 1024 * 1024,
) -> list[Bytes]:
"""Return the bytes stored at the specified location in the given byte ranges.

To improve performance this will:

- Transparently combine ranges less than 1MB apart into a single underlying request
- Transparently combine ranges less than `coalesce` bytes apart into a single
underlying request (defaults to 1MB)
- Make multiple `fetch` requests in parallel (up to maximum of 10)

Args:
Expand All @@ -382,6 +384,7 @@ def get_ranges(
starts: A sequence of `int` where each offset starts.
ends: A sequence of `int` where each offset ends (exclusive). Either `ends` or `lengths` must be non-None.
lengths: A sequence of `int` with the number of bytes of each byte range. Either `ends` or `lengths` must be non-None.
coalesce: Maximum distance in bytes between ranges that will be coalesced into a single request. Defaults to 1MB (1048576 bytes). Set to `0` to disable coalescing.
Comment thread
maxrjones marked this conversation as resolved.
Outdated

Returns:
A sequence of `Bytes`, one for each range. This `Bytes` object implements the
Expand All @@ -397,6 +400,7 @@ async def get_ranges_async(
starts: Sequence[int],
ends: Sequence[int] | None = None,
lengths: Sequence[int] | None = None,
coalesce: int = 1024 * 1024,
) -> list[Bytes]:
"""Call `get_ranges` asynchronously.

Expand Down
4 changes: 4 additions & 0 deletions obstore/python/obstore/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def get_ranges(
starts: Sequence[int],
ends: Sequence[int] | None = None,
lengths: Sequence[int] | None = None,
coalesce: int = 1024 * 1024,
) -> list[Bytes]:
"""Return the bytes stored at the specified location in the given byte ranges.

Expand All @@ -239,6 +240,7 @@ def get_ranges(
starts=starts,
ends=ends,
lengths=lengths,
coalesce=coalesce,
)

async def get_ranges_async(
Expand All @@ -248,6 +250,7 @@ async def get_ranges_async(
starts: Sequence[int],
ends: Sequence[int] | None = None,
lengths: Sequence[int] | None = None,
coalesce: int = 1024 * 1024,
) -> list[Bytes]:
"""Call `get_ranges` asynchronously.

Expand All @@ -259,6 +262,7 @@ async def get_ranges_async(
starts=starts,
ends=ends,
lengths=lengths,
coalesce=coalesce,
)

def head(self, path: str) -> ObjectMeta:
Expand Down
41 changes: 24 additions & 17 deletions obstore/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use chrono::{DateTime, Utc};
use futures::stream::{BoxStream, Fuse};
use futures::StreamExt;
use object_store::{
Attributes, GetOptions, GetRange, GetResult, ObjectMeta, ObjectStore, ObjectStoreExt,
coalesce_ranges, Attributes, GetOptions, GetRange, GetResult, ObjectMeta, ObjectStore,
ObjectStoreExt, OBJECT_STORE_COALESCE_DEFAULT,
};
use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration, PyValueError};
use pyo3::prelude::*;
Expand Down Expand Up @@ -430,45 +431,51 @@ fn params_to_range(
}
}

async fn _get_ranges(
store: PyObjectStore,
path: PyPath,
ranges: &[Range<u64>],
coalesce: u64,
) -> PyObjectStoreResult<Vec<PyBytes>> {
let out = coalesce_ranges(
ranges,
|range| store.as_ref().get_range(path.as_ref(), range),
coalesce,
)
.await?;
Ok(out.into_iter().map(|buf| buf.into()).collect())
}

#[pyfunction]
#[pyo3(signature = (store, path, *, starts, ends=None, lengths=None))]
#[pyo3(signature = (store, path, *, starts, ends=None, lengths=None, coalesce=OBJECT_STORE_COALESCE_DEFAULT))]
pub(crate) fn get_ranges(
py: Python,
store: PyObjectStore,
path: PyPath,
starts: Vec<u64>,
ends: Option<Vec<u64>>,
lengths: Option<Vec<u64>>,
) -> PyObjectStoreResult<Vec<pyo3_bytes::PyBytes>> {
coalesce: u64,
) -> PyObjectStoreResult<Vec<PyBytes>> {
let runtime = get_runtime();
let ranges = params_to_ranges(starts, ends, lengths)?;
py.detach(|| {
let out = runtime.block_on(store.as_ref().get_ranges(path.as_ref(), &ranges))?;
Ok::<_, PyObjectStoreError>(out.into_iter().map(|buf| buf.into()).collect())
})
py.detach(|| runtime.block_on(_get_ranges(store, path, &ranges, coalesce)))
}

#[pyfunction]
#[pyo3(signature = (store, path, *, starts, ends=None, lengths=None))]
#[pyo3(signature = (store, path, *, starts, ends=None, lengths=None, coalesce=OBJECT_STORE_COALESCE_DEFAULT))]
pub(crate) fn get_ranges_async(
py: Python,
store: PyObjectStore,
path: PyPath,
starts: Vec<u64>,
ends: Option<Vec<u64>>,
lengths: Option<Vec<u64>>,
coalesce: u64,
) -> PyResult<Bound<PyAny>> {
let ranges = params_to_ranges(starts, ends, lengths)?;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let out = store
.as_ref()
.get_ranges(path.as_ref(), &ranges)
.await
.map_err(PyObjectStoreError::ObjectStoreError)?;
Ok(out
.into_iter()
.map(pyo3_bytes::PyBytes::new)
.collect::<Vec<_>>())
Ok(_get_ranges(store, path, &ranges, coalesce).await?)
})
}

Expand Down
55 changes: 55 additions & 0 deletions tests/test_get.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import pytest

from obstore.store import MemoryStore
Expand Down Expand Up @@ -136,6 +138,59 @@
assert memoryview(buffer) == data[start:end]


COALESCE_CASES = [
# (starts, ends, coalesce) — close ranges
([5, 10, 15, 20], [15, 20, 25, 30], 0),
([5, 10, 15, 20], [15, 20, 25, 30], 1024 * 1024),
# widely-spaced ranges
([0, 1000, 2000, 3000], [10, 1010, 2010, 3010], 0),
([0, 1000, 2000, 3000], [10, 1010, 2010, 3010], 500),
([0, 1000, 2000, 3000], [10, 1010, 2010, 3010], 2000),
]


@pytest.mark.parametrize(("starts", "ends", "coalesce"), COALESCE_CASES)
def test_get_ranges_with_coalesce(
starts: list[int],
ends: list[int],
coalesce: int | None,
Comment thread
maxrjones marked this conversation as resolved.
Outdated
):
store = MemoryStore()

data = b"the quick brown fox jumps over the lazy dog," * 100
path = "big-data.txt"

store.put(path, data)

buffers = store.get_ranges(path, starts=starts, ends=ends, coalesce=coalesce)

Check failure on line 165 in tests/test_get.py

View workflow job for this annotation

GitHub Actions / Build and test Python (3.12)

Argument of type "int | None" cannot be assigned to parameter "coalesce" of type "int" in function "get_ranges"   Type "int | None" is not assignable to type "int"     "None" is not assignable to "int" (reportArgumentType)
for start, end, buffer in zip(starts, ends, buffers):
assert memoryview(buffer) == data[start:end]


@pytest.mark.asyncio
@pytest.mark.parametrize(("starts", "ends", "coalesce"), COALESCE_CASES)
async def test_get_ranges_async_with_coalesce(
starts: list[int],
ends: list[int],
coalesce: int | None,
Comment thread
maxrjones marked this conversation as resolved.
Outdated
):
store = MemoryStore()

data = b"the quick brown fox jumps over the lazy dog," * 100
path = "big-data.txt"

await store.put_async(path, data)

buffers = await store.get_ranges_async(
path,
starts=starts,
ends=ends,
coalesce=coalesce,

Check failure on line 188 in tests/test_get.py

View workflow job for this annotation

GitHub Actions / Build and test Python (3.12)

Argument of type "int | None" cannot be assigned to parameter "coalesce" of type "int" in function "get_ranges_async"   Type "int | None" is not assignable to type "int"     "None" is not assignable to "int" (reportArgumentType)
)
for start, end, buffer in zip(starts, ends, buffers):
assert memoryview(buffer) == data[start:end]


def test_get_range_invalid_range():
store = MemoryStore()

Expand Down
Loading