Skip to content

Commit accef66

Browse files
authored
feat: add planned blob reads with source-level coalescing (#6352)
This PR improves blob I/O in two complementary ways: `BlobFile` instances that resolve to the same physical object now share a lazy `BlobSource` and can opportunistically coalesce concurrent reads before handing them to Lance's existing scheduler, and datasets now expose a planned `read_blobs` API for materializing blob payloads directly. It also adds explicit cursor-preserving range reads for `BlobFile` across Rust, Python, and Java, with end-to-end Python coverage for the new API and the edge cases it uncovered. This keeps the optimization aligned with Lance's existing scheduler model while giving callers a higher-level path for sequential and batched blob access. ## Python example ```python import lance dataset = lance.dataset("/path/to/dataset") blobs = dataset.read_blobs( "images", indices=[0, 4, 8], target_request_bytes=8 * 1024 * 1024, max_gap_bytes=64 * 1024, max_concurrency=4, preserve_order=True, ) for row_address, payload in blobs: print(row_address, len(payload)) ```
1 parent 0aedfa4 commit accef66

11 files changed

Lines changed: 1846 additions & 210 deletions

File tree

java/lance-jni/src/blocking_blob.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,41 @@ fn inner_blob_read_up_to<'local>(
177177
Ok(arr)
178178
}
179179

180+
#[unsafe(no_mangle)]
181+
pub extern "system" fn Java_org_lance_BlobFile_nativeReadRange<'local>(
182+
mut env: JNIEnv<'local>,
183+
jblob: JObject,
184+
offset: jlong,
185+
len: jint,
186+
) -> jbyteArray {
187+
ok_or_throw_with_return!(
188+
env,
189+
inner_blob_read_range(&mut env, jblob, offset, len).map(|arr| arr.into_raw()),
190+
JByteArray::default().into_raw()
191+
)
192+
}
193+
194+
fn inner_blob_read_range<'local>(
195+
env: &mut JNIEnv<'local>,
196+
jblob: JObject,
197+
offset: jlong,
198+
len: jint,
199+
) -> Result<JByteArray<'local>> {
200+
let end = (offset as u64)
201+
.checked_add(len as u64)
202+
.ok_or_else(|| lance_core::Error::invalid_input("offset + len overflowed".to_string()))?;
203+
let bytes = {
204+
let blob = unsafe { env.get_rust_field::<_, _, BlockingBlobFile>(jblob, NATIVE_BLOB) }?;
205+
RT.block_on(blob.inner.read_range(offset as u64..end))?
206+
};
207+
let arr = env.new_byte_array(bytes.len() as jint)?;
208+
let u8_slice: &[u8] = bytes.as_ref();
209+
let i8_slice: &[i8] = unsafe { transmute(u8_slice) };
210+
211+
env.set_byte_array_region(&arr, 0, i8_slice)?;
212+
Ok(arr)
213+
}
214+
180215
#[unsafe(no_mangle)]
181216
pub extern "system" fn Java_org_lance_BlobFile_nativeSeek(
182217
mut env: JNIEnv,

java/src/main/java/org/lance/BlobFile.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ public byte[] readUpTo(int len) throws IOException {
5959
return nativeReadUpTo(len);
6060
}
6161

62+
/** Read a blob-local range without changing the current cursor. */
63+
public byte[] readRange(long offset, int len) throws IOException {
64+
if (offset < 0) throw new IllegalArgumentException("offset must be non-negative");
65+
if (len < 0) throw new IllegalArgumentException("len must be non-negative");
66+
return nativeReadRange(offset, len);
67+
}
68+
6269
/** Seek to a new cursor position. */
6370
public void seek(long newCursor) throws IOException {
6471
if (newCursor < 0) throw new IllegalArgumentException("newCursor must be non-negative");

python/python/lance/blob.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,10 @@ def size(self) -> int:
282282
def readall(self) -> bytes:
283283
return self.inner.readall()
284284

285+
def read_range(self, offset: int, length: int) -> bytes:
286+
"""Read a blob-local byte range without changing the current cursor."""
287+
return self.inner.read_range(offset, length)
288+
285289
def readinto(self, b: bytearray) -> int:
286290
return self.inner.read_into(b)
287291

python/python/lance/dataset.py

Lines changed: 84 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,23 @@ def _is_null_blob_description(description: Any) -> bool:
227227
return False
228228

229229

230+
def _resolve_blob_selection(
231+
ids: Optional[Union[List[int], pa.Array]],
232+
addresses: Optional[Union[List[int], pa.Array]],
233+
indices: Optional[Union[List[int], pa.Array]],
234+
) -> Tuple[str, Union[List[int], pa.Array]]:
235+
if sum([bool(v is not None) for v in [ids, addresses, indices]]) != 1:
236+
raise ValueError("Exactly one of ids, indices, or addresses must be specified")
237+
238+
if ids is not None:
239+
return "ids", ids
240+
if addresses is not None:
241+
return "addresses", addresses
242+
if indices is not None:
243+
return "indices", indices
244+
raise ValueError("Either ids, addresses, or indices must be specified")
245+
246+
230247
class MergeInsertBuilder(_MergeInsertBuilder):
231248
def execute(self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None):
232249
"""Executes the merge insert operation
@@ -1910,21 +1927,77 @@ def take_blobs(
19101927
-------
19111928
blob_files : List[BlobFile]
19121929
"""
1913-
if sum([bool(v is not None) for v in [ids, addresses, indices]]) != 1:
1914-
raise ValueError(
1915-
"Exactly one of ids, indices, or addresses must be specified"
1916-
)
1930+
selection_kind, selection_values = _resolve_blob_selection(
1931+
ids, addresses, indices
1932+
)
19171933

1918-
if ids is not None:
1919-
lance_blob_files = self._ds.take_blobs(ids, blob_column)
1920-
elif addresses is not None:
1921-
lance_blob_files = self._ds.take_blobs_by_addresses(addresses, blob_column)
1922-
elif indices is not None:
1923-
lance_blob_files = self._ds.take_blobs_by_indices(indices, blob_column)
1934+
if selection_kind == "ids":
1935+
lance_blob_files = self._ds.take_blobs(selection_values, blob_column)
1936+
elif selection_kind == "addresses":
1937+
lance_blob_files = self._ds.take_blobs_by_addresses(
1938+
selection_values, blob_column
1939+
)
19241940
else:
1925-
raise ValueError("Either ids, addresses, or indices must be specified")
1941+
lance_blob_files = self._ds.take_blobs_by_indices(
1942+
selection_values, blob_column
1943+
)
19261944
return [BlobFile(lance_blob_file) for lance_blob_file in lance_blob_files]
19271945

1946+
def read_blobs(
1947+
self,
1948+
blob_column: str,
1949+
ids: Optional[Union[List[int], pa.Array]] = None,
1950+
addresses: Optional[Union[List[int], pa.Array]] = None,
1951+
indices: Optional[Union[List[int], pa.Array]] = None,
1952+
*,
1953+
io_buffer_size: Optional[int] = None,
1954+
preserve_order: Optional[bool] = None,
1955+
) -> List[Tuple[int, bytes]]:
1956+
"""
1957+
Read blobs directly into memory using Lance's planned blob reader.
1958+
1959+
Unlike :py:meth:`take_blobs`, which returns file-like :py:class:`lance.BlobFile`
1960+
handles for random access, this API plans and executes batched reads and
1961+
returns materialized blob payloads.
1962+
1963+
Exactly one of ids, addresses, or indices must be specified.
1964+
1965+
Parameters
1966+
----------
1967+
blob_column : str
1968+
The name of the blob column to read.
1969+
ids : Integer Array or array-like
1970+
Row IDs to read in the dataset.
1971+
addresses : Integer Array or array-like
1972+
The (unstable) row addresses to read in the dataset.
1973+
indices : Integer Array or array-like
1974+
The offset / indices of the row in the dataset.
1975+
io_buffer_size : int, optional
1976+
Override the scheduler I/O buffer size used while materializing blobs.
1977+
preserve_order : bool, optional
1978+
If True, returned rows follow the requested selection order.
1979+
1980+
Returns
1981+
-------
1982+
blobs : List[Tuple[int, bytes]]
1983+
A list of ``(row_address, blob_bytes)`` pairs.
1984+
"""
1985+
selection_kind, selection_values = _resolve_blob_selection(
1986+
ids, addresses, indices
1987+
)
1988+
1989+
kwargs = {
1990+
"io_buffer_size": io_buffer_size,
1991+
"preserve_order": preserve_order,
1992+
}
1993+
if selection_kind == "ids":
1994+
return self._ds.read_blobs(selection_values, blob_column, **kwargs)
1995+
if selection_kind == "addresses":
1996+
return self._ds.read_blobs_by_addresses(
1997+
selection_values, blob_column, **kwargs
1998+
)
1999+
return self._ds.read_blobs_by_indices(selection_values, blob_column, **kwargs)
2000+
19282001
def head(self, num_rows, **kwargs):
19292002
"""
19302003
Load the first N rows of the dataset.

python/python/lance/lance/__init__.pyi

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,27 @@ class _Dataset:
288288
row_indices: List[int],
289289
blob_column: str,
290290
) -> List[LanceBlobFile]: ...
291+
def read_blobs(
292+
self,
293+
row_ids: List[int],
294+
blob_column: str,
295+
io_buffer_size: Optional[int] = None,
296+
preserve_order: Optional[bool] = None,
297+
) -> List[Tuple[int, bytes]]: ...
298+
def read_blobs_by_addresses(
299+
self,
300+
row_addresses: List[int],
301+
blob_column: str,
302+
io_buffer_size: Optional[int] = None,
303+
preserve_order: Optional[bool] = None,
304+
) -> List[Tuple[int, bytes]]: ...
305+
def read_blobs_by_indices(
306+
self,
307+
row_indices: List[int],
308+
blob_column: str,
309+
io_buffer_size: Optional[int] = None,
310+
preserve_order: Optional[bool] = None,
311+
) -> List[Tuple[int, bytes]]: ...
291312
def take_scan(
292313
self,
293314
row_slices: Iterable[Tuple[int, int]],

python/python/tests/test_blob.py

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,32 @@
1616
lance_dataset_module = importlib.import_module("lance.dataset")
1717

1818

19+
def _blob_row_ids(dataset):
20+
return dataset.to_table(columns=[], with_row_id=True).column("_rowid").to_pylist()
21+
22+
23+
def _blob_row_addresses(dataset):
24+
return (
25+
dataset.to_table(columns=["idx"], with_row_address=True)
26+
.column("_rowaddr")
27+
.to_pylist()
28+
)
29+
30+
31+
def _out_of_order_blob_selection(dataset_with_blobs, selection_kind):
32+
addresses = _blob_row_addresses(dataset_with_blobs)
33+
expected = [(addresses[4], b"quux"), (addresses[0], b"foo")]
34+
35+
if selection_kind == "ids":
36+
return [
37+
_blob_row_ids(dataset_with_blobs)[4],
38+
_blob_row_ids(dataset_with_blobs)[0],
39+
], expected
40+
if selection_kind == "addresses":
41+
return [addresses[4], addresses[0]], expected
42+
return [4, 0], expected
43+
44+
1945
def test_blob_read_from_binary():
2046
values = [b"foo", b"bar", b"baz"]
2147
data = pa.table(
@@ -251,6 +277,134 @@ def test_blob_by_indices(tmp_path, dataset_with_blobs):
251277
assert f1.read() == f2.read()
252278

253279

280+
@pytest.mark.parametrize(
281+
("selection_kind", "selection_values", "expected"),
282+
[
283+
("ids", [0, (1 << 32) + 1], [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
284+
("addresses", [0, (1 << 32) + 1], [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
285+
("indices", [0, 4], [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
286+
],
287+
)
288+
def test_read_blobs(dataset_with_blobs, selection_kind, selection_values, expected):
289+
kwargs = {selection_kind: selection_values}
290+
291+
blobs = dataset_with_blobs.read_blobs(
292+
"blobs",
293+
**kwargs,
294+
io_buffer_size=1024,
295+
preserve_order=True,
296+
)
297+
298+
assert blobs == expected
299+
300+
301+
def test_read_blobs_requires_single_selector(dataset_with_blobs):
302+
with pytest.raises(
303+
ValueError, match="Exactly one of ids, indices, or addresses must be specified"
304+
):
305+
dataset_with_blobs.read_blobs("blobs", ids=[0], indices=[0])
306+
307+
308+
def test_read_blobs_requires_selector(dataset_with_blobs):
309+
with pytest.raises(
310+
ValueError, match="Exactly one of ids, indices, or addresses must be specified"
311+
):
312+
dataset_with_blobs.read_blobs("blobs")
313+
314+
315+
def test_read_blobs_rejects_non_blob_column(dataset_with_blobs):
316+
with pytest.raises(ValueError, match="not a blob column"):
317+
dataset_with_blobs.read_blobs("idx", indices=[0])
318+
319+
320+
@pytest.mark.parametrize(
321+
("selection_kind", "selection_values", "expected"),
322+
[
323+
(
324+
"ids",
325+
pa.array([0, (1 << 32) + 1], type=pa.uint64()),
326+
[(0, b"foo"), ((1 << 32) + 1, b"quux")],
327+
),
328+
(
329+
"addresses",
330+
pa.array([0, (1 << 32) + 1], type=pa.uint64()),
331+
[(0, b"foo"), ((1 << 32) + 1, b"quux")],
332+
),
333+
(
334+
"indices",
335+
pa.array([0, 4], type=pa.uint64()),
336+
[(0, b"foo"), ((1 << 32) + 1, b"quux")],
337+
),
338+
],
339+
)
340+
def test_read_blobs_accepts_arrow_array_selectors(
341+
dataset_with_blobs, selection_kind, selection_values, expected
342+
):
343+
kwargs = {selection_kind: selection_values}
344+
345+
blobs = dataset_with_blobs.read_blobs("blobs", **kwargs)
346+
347+
assert blobs == expected
348+
349+
350+
@pytest.mark.parametrize(
351+
("selection_kind", "selection_values"),
352+
[
353+
("ids", []),
354+
("addresses", []),
355+
("indices", []),
356+
("ids", pa.array([], type=pa.uint64())),
357+
("addresses", pa.array([], type=pa.uint64())),
358+
("indices", pa.array([], type=pa.uint64())),
359+
],
360+
)
361+
def test_read_blobs_accepts_empty_selection(
362+
dataset_with_blobs, selection_kind, selection_values
363+
):
364+
kwargs = {selection_kind: selection_values}
365+
366+
assert dataset_with_blobs.read_blobs("blobs", **kwargs) == []
367+
368+
369+
@pytest.mark.parametrize(
370+
("planner_kwargs", "error_message"),
371+
[
372+
({"io_buffer_size": 0}, "io_buffer_size must be greater than 0"),
373+
],
374+
)
375+
def test_read_blobs_rejects_invalid_planner_options(
376+
dataset_with_blobs, planner_kwargs, error_message
377+
):
378+
with pytest.raises(ValueError, match=error_message):
379+
dataset_with_blobs.read_blobs("blobs", indices=[0], **planner_kwargs)
380+
381+
382+
@pytest.mark.parametrize("selection_kind", ["ids", "addresses", "indices"])
383+
def test_read_blobs_preserves_input_order(dataset_with_blobs, selection_kind):
384+
selection_values, expected = _out_of_order_blob_selection(
385+
dataset_with_blobs, selection_kind
386+
)
387+
kwargs = {selection_kind: selection_values}
388+
389+
blobs = dataset_with_blobs.read_blobs("blobs", **kwargs, preserve_order=True)
390+
391+
assert blobs == expected
392+
393+
394+
@pytest.mark.parametrize("selection_kind", ["ids", "addresses", "indices"])
395+
def test_read_blobs_without_preserve_order_returns_same_rows(
396+
dataset_with_blobs, selection_kind
397+
):
398+
selection_values, expected = _out_of_order_blob_selection(
399+
dataset_with_blobs, selection_kind
400+
)
401+
kwargs = {selection_kind: selection_values}
402+
403+
blobs = dataset_with_blobs.read_blobs("blobs", **kwargs, preserve_order=False)
404+
405+
assert sorted(blobs) == sorted(expected)
406+
407+
254408
def test_blob_file_seek(tmp_path, dataset_with_blobs):
255409
row_ids = (
256410
dataset_with_blobs.to_table(columns=[], with_row_id=True)
@@ -466,6 +620,12 @@ def test_blob_extension_write_external_slice(tmp_path):
466620
with blob_file as f:
467621
assert f.read() == expected
468622

623+
assert ds.read_blobs("blob", indices=[0, 1, 2]) == [
624+
(0, b"alpha"),
625+
(1, b"bravo"),
626+
(2, b"charlie"),
627+
]
628+
469629

470630
def test_blob_extension_write_external_slice_ingest(tmp_path):
471631
tar_path = tmp_path / "container.tar"
@@ -548,6 +708,8 @@ def test_blob_extension_take_blobs_multi_base(payload, is_dataset_root, tmp_path
548708
with blobs[0] as f:
549709
assert f.read() == payload
550710

711+
assert ds.read_blobs("blob", indices=[0]) == [(0, payload)]
712+
551713

552714
@pytest.fixture
553715
def dataset_for_pandas_blob_tests(tmp_path):

0 commit comments

Comments
 (0)