Skip to content

Commit 6c18cd0

Browse files
committed
feat: open_reader accepts optional size to skip HEAD request
1 parent 996978a commit 6c18cd0

3 files changed

Lines changed: 126 additions & 15 deletions

File tree

obstore/python/obstore/_buffered.pyi

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def open_reader(
2121
path: str,
2222
*,
2323
buffer_size: int = 1024 * 1024,
24+
size: int | None = None,
2425
) -> ReadableFile:
2526
"""Open a readable file object from the specified location.
2627
@@ -30,6 +31,11 @@ def open_reader(
3031
3132
Keyword Args:
3233
buffer_size: The minimum number of bytes to read in a single request. Up to `buffer_size` bytes will be buffered in memory.
34+
size: Optional byte size of the object. When provided, skips the HEAD request used to fetch the file size. Useful for callers that already know the size from external metadata.
35+
36+
The caller is responsible for accuracy: a value larger than the actual file surfaces as a read-time range error, a value smaller causes silent truncation.
37+
38+
When `size` is provided, the resulting reader's `meta` attribute omits `last_modified` (since it was not fetched). Callers that need that field should call `open_reader` without `size`. Defaults to `None`.
3339
3440
Returns:
3541
ReadableFile
@@ -41,6 +47,7 @@ async def open_reader_async(
4147
path: str,
4248
*,
4349
buffer_size: int = 1024 * 1024,
50+
size: int | None = None,
4451
) -> AsyncReadableFile:
4552
"""Call `open_reader` asynchronously, returning a readable file object with asynchronous operations.
4653

obstore/src/buffered.rs

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ use std::io::SeekFrom;
22
use std::sync::Arc;
33

44
use bytes::Bytes;
5+
use chrono::{DateTime, Utc};
6+
use indexmap::IndexMap;
57
use object_store::buffered::{BufReader, BufWriter};
68
use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt};
79
use pyo3::exceptions::{PyIOError, PyStopAsyncIteration, PyStopIteration};
810
use pyo3::prelude::*;
9-
use pyo3::types::PyString;
11+
use pyo3::types::{PyDict, PyString};
1012
use pyo3::{intern, IntoPyObjectExt};
1113
use pyo3_async_runtimes::tokio::{future_into_py, get_runtime};
1214
use pyo3_bytes::PyBytes;
@@ -15,62 +17,79 @@ use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Line
1517
use tokio::sync::Mutex;
1618

1719
use crate::attributes::PyAttributes;
18-
use crate::list::PyObjectMeta;
1920
use crate::tags::PyTagSet;
2021

2122
#[pyfunction]
22-
#[pyo3(signature = (store, path, *, buffer_size=1024 * 1024))]
23+
#[pyo3(signature = (store, path, *, buffer_size=1024 * 1024, size=None))]
2324
pub(crate) fn open_reader(
2425
py: Python,
2526
store: PyObjectStore,
2627
path: PyPath,
2728
buffer_size: usize,
29+
size: Option<u64>,
2830
) -> PyObjectStoreResult<PyReadableFile> {
2931
let store = store.into_inner();
3032
let runtime = get_runtime();
31-
let (reader, meta) = py.detach(|| runtime.block_on(create_reader(store, path, buffer_size)))?;
32-
Ok(PyReadableFile::new(reader, meta, false))
33+
let was_hinted = size.is_some();
34+
let (reader, meta) =
35+
py.detach(|| runtime.block_on(create_reader(store, path, buffer_size, size)))?;
36+
Ok(PyReadableFile::new(reader, meta, was_hinted, false))
3337
}
3438

3539
#[pyfunction]
36-
#[pyo3(signature = (store, path, *, buffer_size=1024 * 1024))]
40+
#[pyo3(signature = (store, path, *, buffer_size=1024 * 1024, size=None))]
3741
pub(crate) fn open_reader_async(
3842
py: Python,
3943
store: PyObjectStore,
4044
path: PyPath,
4145
buffer_size: usize,
46+
size: Option<u64>,
4247
) -> PyResult<Bound<PyAny>> {
4348
let store = store.into_inner();
49+
let was_hinted = size.is_some();
4450
future_into_py(py, async move {
45-
let (reader, meta) = create_reader(store, path, buffer_size).await?;
46-
Ok(PyReadableFile::new(reader, meta, true))
51+
let (reader, meta) = create_reader(store, path, buffer_size, size).await?;
52+
Ok(PyReadableFile::new(reader, meta, was_hinted, true))
4753
})
4854
}
4955

5056
async fn create_reader(
5157
store: Arc<dyn ObjectStore>,
5258
path: PyPath,
5359
capacity: usize,
60+
size: Option<u64>,
5461
) -> PyObjectStoreResult<(BufReader, ObjectMeta)> {
55-
let meta = store
56-
.head(path.as_ref())
57-
.await
58-
.map_err(PyObjectStoreError::ObjectStoreError)?;
62+
let meta = match size {
63+
Some(size) => ObjectMeta {
64+
location: path.as_ref().clone(),
65+
last_modified: DateTime::<Utc>::from_timestamp(0, 0)
66+
.expect("unix epoch is a valid DateTime"),
67+
size,
68+
e_tag: None,
69+
version: None,
70+
},
71+
None => store
72+
.head(path.as_ref())
73+
.await
74+
.map_err(PyObjectStoreError::ObjectStoreError)?,
75+
};
5976
Ok((BufReader::with_capacity(store, &meta, capacity), meta))
6077
}
6178

6279
#[pyclass(name = "ReadableFile", frozen)]
6380
pub(crate) struct PyReadableFile {
6481
reader: Arc<Mutex<BufReader>>,
6582
meta: ObjectMeta,
83+
was_hinted: bool,
6684
r#async: bool,
6785
}
6886

6987
impl PyReadableFile {
70-
fn new(reader: BufReader, meta: ObjectMeta, r#async: bool) -> Self {
88+
fn new(reader: BufReader, meta: ObjectMeta, was_hinted: bool, r#async: bool) -> Self {
7189
Self {
7290
reader: Arc::new(Mutex::new(reader)),
7391
meta,
92+
was_hinted,
7493
r#async,
7594
}
7695
}
@@ -92,8 +111,19 @@ impl PyReadableFile {
92111
fn close(&self) {}
93112

94113
#[getter]
95-
fn meta(&self) -> PyObjectMeta {
96-
self.meta.clone().into()
114+
fn meta<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
115+
let mut dict = IndexMap::with_capacity(5);
116+
dict.insert("path", self.meta.location.as_ref().into_bound_py_any(py)?);
117+
if !self.was_hinted {
118+
dict.insert(
119+
"last_modified",
120+
self.meta.last_modified.into_bound_py_any(py)?,
121+
);
122+
}
123+
dict.insert("size", self.meta.size.into_bound_py_any(py)?);
124+
dict.insert("e_tag", self.meta.e_tag.clone().into_bound_py_any(py)?);
125+
dict.insert("version", self.meta.version.clone().into_bound_py_any(py)?);
126+
dict.into_pyobject(py)
97127
}
98128

99129
#[pyo3(signature = (size = None, /))]

tests/test_buffered.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,77 @@ async def test_read_past_eof_async():
112112
buf = BytesIO(data)
113113
expected = buf.read(20)
114114
assert memoryview(expected) == memoryview(buffer)
115+
116+
117+
def test_open_reader_size_hint_sync():
118+
store = MemoryStore()
119+
data = b"x" * 1000
120+
path = "sized.bin"
121+
obs.put(store, path, data)
122+
123+
file = obs.open_reader(store, path, size=len(data))
124+
assert file.size == len(data)
125+
assert memoryview(data) == memoryview(file.read())
126+
127+
128+
@pytest.mark.asyncio
129+
async def test_open_reader_size_hint_async():
130+
store = MemoryStore()
131+
data = b"x" * 1000
132+
path = "sized.bin"
133+
await obs.put_async(store, path, data)
134+
135+
file = await obs.open_reader_async(store, path, size=len(data))
136+
assert file.size == len(data)
137+
assert memoryview(data) == memoryview(await file.read())
138+
139+
140+
def test_open_reader_size_hint_larger_than_actual_errors_on_read():
141+
store = MemoryStore()
142+
data = b"x" * 1000
143+
path = "sized.bin"
144+
obs.put(store, path, data)
145+
146+
file = obs.open_reader(store, path, size=5000)
147+
assert file.size == 5000
148+
with pytest.raises(OSError, match="range"):
149+
file.read()
150+
151+
152+
def test_open_reader_size_hint_smaller_than_actual_truncates():
153+
store = MemoryStore()
154+
data = b"x" * 1000
155+
path = "sized.bin"
156+
obs.put(store, path, data)
157+
158+
file = obs.open_reader(store, path, size=500)
159+
assert file.size == 500
160+
buffer = file.read()
161+
assert memoryview(data[:500]) == memoryview(buffer)
162+
163+
164+
def test_open_reader_size_hint_zero_byte_file():
165+
store = MemoryStore()
166+
path = "empty.bin"
167+
obs.put(store, path, b"")
168+
169+
file = obs.open_reader(store, path, size=0)
170+
assert file.size == 0
171+
assert memoryview(b"") == memoryview(file.read())
172+
173+
174+
def test_open_reader_meta_last_modified_depends_on_size_hint():
175+
store = MemoryStore()
176+
data = b"x" * 1000
177+
path = "sized.bin"
178+
obs.put(store, path, data)
179+
180+
hinted = obs.open_reader(store, path, size=len(data))
181+
unhinted = obs.open_reader(store, path)
182+
183+
assert "last_modified" not in hinted.meta
184+
assert "last_modified" in unhinted.meta
185+
assert hinted.meta["size"] == len(data)
186+
assert unhinted.meta["size"] == len(data)
187+
assert hinted.meta["e_tag"] is None
188+
assert hinted.meta["path"] == path

0 commit comments

Comments
 (0)