Skip to content

Commit 3886ec3

Browse files
committed
test: cover python read blobs api
1 parent 2b46f57 commit 3886ec3

7 files changed

Lines changed: 1004 additions & 98 deletions

File tree

python/python/lance/dataset.py

Lines changed: 89 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,23 @@ def _is_null_blob_description(description: Any) -> bool:
227227
return False
228228

229229

230+
def _resolve_blob_selection(
231+
ids: Optional[Union[List[int], pa.Array]],
232+
addresses: Optional[Union[List[int], pa.Array]],
233+
indices: Optional[Union[List[int], pa.Array]],
234+
) -> Tuple[str, Union[List[int], pa.Array]]:
235+
if sum([bool(v is not None) for v in [ids, addresses, indices]]) != 1:
236+
raise ValueError("Exactly one of ids, indices, or addresses must be specified")
237+
238+
if ids is not None:
239+
return "ids", ids
240+
if addresses is not None:
241+
return "addresses", addresses
242+
if indices is not None:
243+
return "indices", indices
244+
raise ValueError("Either ids, addresses, or indices must be specified")
245+
246+
230247
class MergeInsertBuilder(_MergeInsertBuilder):
231248
def execute(self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None):
232249
"""Executes the merge insert operation
@@ -1910,21 +1927,82 @@ def take_blobs(
19101927
-------
19111928
blob_files : List[BlobFile]
19121929
"""
1913-
if sum([bool(v is not None) for v in [ids, addresses, indices]]) != 1:
1914-
raise ValueError(
1915-
"Exactly one of ids, indices, or addresses must be specified"
1916-
)
1930+
selection_kind, selection_values = _resolve_blob_selection(ids, addresses, indices)
19171931

1918-
if ids is not None:
1919-
lance_blob_files = self._ds.take_blobs(ids, blob_column)
1920-
elif addresses is not None:
1921-
lance_blob_files = self._ds.take_blobs_by_addresses(addresses, blob_column)
1922-
elif indices is not None:
1923-
lance_blob_files = self._ds.take_blobs_by_indices(indices, blob_column)
1932+
if selection_kind == "ids":
1933+
lance_blob_files = self._ds.take_blobs(selection_values, blob_column)
1934+
elif selection_kind == "addresses":
1935+
lance_blob_files = self._ds.take_blobs_by_addresses(
1936+
selection_values, blob_column
1937+
)
19241938
else:
1925-
raise ValueError("Either ids, addresses, or indices must be specified")
1939+
lance_blob_files = self._ds.take_blobs_by_indices(
1940+
selection_values, blob_column
1941+
)
19261942
return [BlobFile(lance_blob_file) for lance_blob_file in lance_blob_files]
19271943

1944+
def read_blobs(
1945+
self,
1946+
blob_column: str,
1947+
ids: Optional[Union[List[int], pa.Array]] = None,
1948+
addresses: Optional[Union[List[int], pa.Array]] = None,
1949+
indices: Optional[Union[List[int], pa.Array]] = None,
1950+
*,
1951+
target_request_bytes: Optional[int] = None,
1952+
max_gap_bytes: Optional[int] = None,
1953+
max_concurrency: Optional[int] = None,
1954+
preserve_order: Optional[bool] = None,
1955+
) -> List[Tuple[int, bytes]]:
1956+
"""
1957+
Read blobs directly into memory using Lance's planned blob reader.
1958+
1959+
Unlike :py:meth:`take_blobs`, which returns file-like :py:class:`lance.BlobFile`
1960+
handles for random access, this API plans and executes batched reads and
1961+
returns materialized blob payloads.
1962+
1963+
Exactly one of ids, addresses, or indices must be specified.
1964+
1965+
Parameters
1966+
----------
1967+
blob_column : str
1968+
The name of the blob column to read.
1969+
ids : Integer Array or array-like
1970+
Row IDs to read in the dataset.
1971+
addresses : Integer Array or array-like
1972+
The (unstable) row addresses to read in the dataset.
1973+
indices : Integer Array or array-like
1974+
The offset / indices of the row in the dataset.
1975+
target_request_bytes : int, optional
1976+
Target maximum size of each merged object-store read.
1977+
max_gap_bytes : int, optional
1978+
Maximum gap allowed between neighboring blob ranges when merging.
1979+
max_concurrency : int, optional
1980+
Maximum number of merged blob read tasks to execute concurrently.
1981+
preserve_order : bool, optional
1982+
If False, Lance may reorder reads by physical layout to reduce object
1983+
store requests.
1984+
1985+
Returns
1986+
-------
1987+
blobs : List[Tuple[int, bytes]]
1988+
A list of ``(row_address, blob_bytes)`` pairs.
1989+
"""
1990+
selection_kind, selection_values = _resolve_blob_selection(ids, addresses, indices)
1991+
1992+
kwargs = {
1993+
"target_request_bytes": target_request_bytes,
1994+
"max_gap_bytes": max_gap_bytes,
1995+
"max_concurrency": max_concurrency,
1996+
"preserve_order": preserve_order,
1997+
}
1998+
if selection_kind == "ids":
1999+
return self._ds.read_blobs(selection_values, blob_column, **kwargs)
2000+
if selection_kind == "addresses":
2001+
return self._ds.read_blobs_by_addresses(
2002+
selection_values, blob_column, **kwargs
2003+
)
2004+
return self._ds.read_blobs_by_indices(selection_values, blob_column, **kwargs)
2005+
19282006
def head(self, num_rows, **kwargs):
19292007
"""
19302008
Load the first N rows of the dataset.

python/python/lance/lance/__init__.pyi

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,33 @@ class _Dataset:
288288
row_indices: List[int],
289289
blob_column: str,
290290
) -> List[LanceBlobFile]: ...
291+
def read_blobs(
292+
self,
293+
row_ids: List[int],
294+
blob_column: str,
295+
target_request_bytes: Optional[int] = None,
296+
max_gap_bytes: Optional[int] = None,
297+
max_concurrency: Optional[int] = None,
298+
preserve_order: Optional[bool] = None,
299+
) -> List[Tuple[int, bytes]]: ...
300+
def read_blobs_by_addresses(
301+
self,
302+
row_addresses: List[int],
303+
blob_column: str,
304+
target_request_bytes: Optional[int] = None,
305+
max_gap_bytes: Optional[int] = None,
306+
max_concurrency: Optional[int] = None,
307+
preserve_order: Optional[bool] = None,
308+
) -> List[Tuple[int, bytes]]: ...
309+
def read_blobs_by_indices(
310+
self,
311+
row_indices: List[int],
312+
blob_column: str,
313+
target_request_bytes: Optional[int] = None,
314+
max_gap_bytes: Optional[int] = None,
315+
max_concurrency: Optional[int] = None,
316+
preserve_order: Optional[bool] = None,
317+
) -> List[Tuple[int, bytes]]: ...
291318
def take_scan(
292319
self,
293320
row_slices: Iterable[Tuple[int, int]],

python/python/tests/test_blob.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,29 @@
1616
lance_dataset_module = importlib.import_module("lance.dataset")
1717

1818

19+
def _blob_row_ids(dataset):
20+
return dataset.to_table(columns=[], with_row_id=True).column("_rowid").to_pylist()
21+
22+
23+
def _blob_row_addresses(dataset):
24+
return (
25+
dataset.to_table(columns=["idx"], with_row_address=True)
26+
.column("_rowaddr")
27+
.to_pylist()
28+
)
29+
30+
31+
def _out_of_order_blob_selection(dataset_with_blobs, selection_kind):
32+
addresses = _blob_row_addresses(dataset_with_blobs)
33+
expected = [(addresses[4], b"quux"), (addresses[0], b"foo")]
34+
35+
if selection_kind == "ids":
36+
return [_blob_row_ids(dataset_with_blobs)[4], _blob_row_ids(dataset_with_blobs)[0]], expected
37+
if selection_kind == "addresses":
38+
return [addresses[4], addresses[0]], expected
39+
return [4, 0], expected
40+
41+
1942
def test_blob_read_from_binary():
2043
values = [b"foo", b"bar", b"baz"]
2144
data = pa.table(
@@ -251,6 +274,125 @@ def test_blob_by_indices(tmp_path, dataset_with_blobs):
251274
assert f1.read() == f2.read()
252275

253276

277+
@pytest.mark.parametrize(
278+
("selection_kind", "selection_values", "expected"),
279+
[
280+
("ids", [0, (1 << 32) + 1], [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
281+
("addresses", [0, (1 << 32) + 1], [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
282+
("indices", [0, 4], [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
283+
],
284+
)
285+
def test_read_blobs(dataset_with_blobs, selection_kind, selection_values, expected):
286+
kwargs = {selection_kind: selection_values}
287+
288+
blobs = dataset_with_blobs.read_blobs(
289+
"blobs",
290+
**kwargs,
291+
target_request_bytes=1024,
292+
max_gap_bytes=64,
293+
max_concurrency=2,
294+
preserve_order=True,
295+
)
296+
297+
assert blobs == expected
298+
299+
300+
def test_read_blobs_requires_single_selector(dataset_with_blobs):
301+
with pytest.raises(
302+
ValueError, match="Exactly one of ids, indices, or addresses must be specified"
303+
):
304+
dataset_with_blobs.read_blobs("blobs", ids=[0], indices=[0])
305+
306+
307+
def test_read_blobs_requires_selector(dataset_with_blobs):
308+
with pytest.raises(
309+
ValueError, match="Exactly one of ids, indices, or addresses must be specified"
310+
):
311+
dataset_with_blobs.read_blobs("blobs")
312+
313+
314+
def test_read_blobs_rejects_non_blob_column(dataset_with_blobs):
315+
with pytest.raises(ValueError, match="not a blob column"):
316+
dataset_with_blobs.read_blobs("idx", indices=[0])
317+
318+
319+
@pytest.mark.parametrize(
320+
("selection_kind", "selection_values", "expected"),
321+
[
322+
("ids", pa.array([0, (1 << 32) + 1], type=pa.uint64()), [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
323+
("addresses", pa.array([0, (1 << 32) + 1], type=pa.uint64()), [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
324+
("indices", pa.array([0, 4], type=pa.uint64()), [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
325+
],
326+
)
327+
def test_read_blobs_accepts_arrow_array_selectors(
328+
dataset_with_blobs, selection_kind, selection_values, expected
329+
):
330+
kwargs = {selection_kind: selection_values}
331+
332+
blobs = dataset_with_blobs.read_blobs("blobs", **kwargs)
333+
334+
assert blobs == expected
335+
336+
337+
@pytest.mark.parametrize(
338+
("selection_kind", "selection_values"),
339+
[
340+
("ids", []),
341+
("addresses", []),
342+
("indices", []),
343+
("ids", pa.array([], type=pa.uint64())),
344+
("addresses", pa.array([], type=pa.uint64())),
345+
("indices", pa.array([], type=pa.uint64())),
346+
],
347+
)
348+
def test_read_blobs_accepts_empty_selection(
349+
dataset_with_blobs, selection_kind, selection_values
350+
):
351+
kwargs = {selection_kind: selection_values}
352+
353+
assert dataset_with_blobs.read_blobs("blobs", **kwargs) == []
354+
355+
356+
@pytest.mark.parametrize(
357+
("planner_kwargs", "error_message"),
358+
[
359+
({"target_request_bytes": 0}, "target_request_bytes must be greater than 0"),
360+
({"max_concurrency": 0}, "max_concurrency must be greater than 0"),
361+
],
362+
)
363+
def test_read_blobs_rejects_invalid_planner_options(
364+
dataset_with_blobs, planner_kwargs, error_message
365+
):
366+
with pytest.raises(ValueError, match=error_message):
367+
dataset_with_blobs.read_blobs("blobs", indices=[0], **planner_kwargs)
368+
369+
370+
@pytest.mark.parametrize("selection_kind", ["ids", "addresses", "indices"])
371+
def test_read_blobs_preserves_input_order(dataset_with_blobs, selection_kind):
372+
selection_values, expected = _out_of_order_blob_selection(
373+
dataset_with_blobs, selection_kind
374+
)
375+
kwargs = {selection_kind: selection_values}
376+
377+
blobs = dataset_with_blobs.read_blobs("blobs", **kwargs, preserve_order=True)
378+
379+
assert blobs == expected
380+
381+
382+
@pytest.mark.parametrize("selection_kind", ["ids", "addresses", "indices"])
383+
def test_read_blobs_without_preserve_order_returns_same_rows(
384+
dataset_with_blobs, selection_kind
385+
):
386+
selection_values, expected = _out_of_order_blob_selection(
387+
dataset_with_blobs, selection_kind
388+
)
389+
kwargs = {selection_kind: selection_values}
390+
391+
blobs = dataset_with_blobs.read_blobs("blobs", **kwargs, preserve_order=False)
392+
393+
assert sorted(blobs) == sorted(expected)
394+
395+
254396
def test_blob_file_seek(tmp_path, dataset_with_blobs):
255397
row_ids = (
256398
dataset_with_blobs.to_table(columns=[], with_row_id=True)
@@ -466,6 +608,12 @@ def test_blob_extension_write_external_slice(tmp_path):
466608
with blob_file as f:
467609
assert f.read() == expected
468610

611+
assert ds.read_blobs("blob", indices=[0, 1, 2]) == [
612+
(0, b"alpha"),
613+
(1, b"bravo"),
614+
(2, b"charlie"),
615+
]
616+
469617

470618
def test_blob_extension_write_external_slice_ingest(tmp_path):
471619
tar_path = tmp_path / "container.tar"
@@ -548,6 +696,7 @@ def test_blob_extension_take_blobs_multi_base(payload, is_dataset_root, tmp_path
548696
with blobs[0] as f:
549697
assert f.read() == payload
550698

699+
assert ds.read_blobs("blob", indices=[0]) == [(0, payload)]
551700

552701
@pytest.fixture
553702
def dataset_for_pandas_blob_tests(tmp_path):

0 commit comments

Comments
 (0)