Skip to content

Commit 0f8e779

Browse files
committed
test: cover python read blobs api
1 parent d44b441 commit 0f8e779

File tree

7 files changed

+1005
-98
lines changed

7 files changed

+1005
-98
lines changed

python/python/lance/dataset.py

Lines changed: 89 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,23 @@
9393
LANCE_COMMIT_MESSAGE_KEY = "__lance_commit_message"
9494

9595

96+
def _resolve_blob_selection(
97+
ids: Optional[Union[List[int], pa.Array]],
98+
addresses: Optional[Union[List[int], pa.Array]],
99+
indices: Optional[Union[List[int], pa.Array]],
100+
) -> Tuple[str, Union[List[int], pa.Array]]:
101+
if sum([bool(v is not None) for v in [ids, addresses, indices]]) != 1:
102+
raise ValueError("Exactly one of ids, indices, or addresses must be specified")
103+
104+
if ids is not None:
105+
return "ids", ids
106+
if addresses is not None:
107+
return "addresses", addresses
108+
if indices is not None:
109+
return "indices", indices
110+
raise ValueError("Either ids, addresses, or indices must be specified")
111+
112+
96113
class MergeInsertBuilder(_MergeInsertBuilder):
97114
def execute(self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None):
98115
"""Executes the merge insert operation
@@ -1684,21 +1701,82 @@ def take_blobs(
16841701
-------
16851702
blob_files : List[BlobFile]
16861703
"""
1687-
if sum([bool(v is not None) for v in [ids, addresses, indices]]) != 1:
1688-
raise ValueError(
1689-
"Exactly one of ids, indices, or addresses must be specified"
1690-
)
1704+
selection_kind, selection_values = _resolve_blob_selection(ids, addresses, indices)
16911705

1692-
if ids is not None:
1693-
lance_blob_files = self._ds.take_blobs(ids, blob_column)
1694-
elif addresses is not None:
1695-
lance_blob_files = self._ds.take_blobs_by_addresses(addresses, blob_column)
1696-
elif indices is not None:
1697-
lance_blob_files = self._ds.take_blobs_by_indices(indices, blob_column)
1706+
if selection_kind == "ids":
1707+
lance_blob_files = self._ds.take_blobs(selection_values, blob_column)
1708+
elif selection_kind == "addresses":
1709+
lance_blob_files = self._ds.take_blobs_by_addresses(
1710+
selection_values, blob_column
1711+
)
16981712
else:
1699-
raise ValueError("Either ids, addresses, or indices must be specified")
1713+
lance_blob_files = self._ds.take_blobs_by_indices(
1714+
selection_values, blob_column
1715+
)
17001716
return [BlobFile(lance_blob_file) for lance_blob_file in lance_blob_files]
17011717

1718+
def read_blobs(
1719+
self,
1720+
blob_column: str,
1721+
ids: Optional[Union[List[int], pa.Array]] = None,
1722+
addresses: Optional[Union[List[int], pa.Array]] = None,
1723+
indices: Optional[Union[List[int], pa.Array]] = None,
1724+
*,
1725+
target_request_bytes: Optional[int] = None,
1726+
max_gap_bytes: Optional[int] = None,
1727+
max_concurrency: Optional[int] = None,
1728+
preserve_order: Optional[bool] = None,
1729+
) -> List[Tuple[int, bytes]]:
1730+
"""
1731+
Read blobs directly into memory using Lance's planned blob reader.
1732+
1733+
Unlike :py:meth:`take_blobs`, which returns file-like :py:class:`lance.BlobFile`
1734+
handles for random access, this API plans and executes batched reads and
1735+
returns materialized blob payloads.
1736+
1737+
Exactly one of ids, addresses, or indices must be specified.
1738+
1739+
Parameters
1740+
----------
1741+
blob_column : str
1742+
The name of the blob column to read.
1743+
ids : Integer Array or array-like
1744+
Row IDs to read in the dataset.
1745+
addresses : Integer Array or array-like
1746+
The (unstable) row addresses to read in the dataset.
1747+
indices : Integer Array or array-like
1748+
The offset / indices of the row in the dataset.
1749+
target_request_bytes : int, optional
1750+
Target maximum size of each merged object-store read.
1751+
max_gap_bytes : int, optional
1752+
Maximum gap allowed between neighboring blob ranges when merging.
1753+
max_concurrency : int, optional
1754+
Maximum number of merged blob read tasks to execute concurrently.
1755+
preserve_order : bool, optional
1756+
If False, Lance may reorder reads by physical layout to reduce object
1757+
store requests.
1758+
1759+
Returns
1760+
-------
1761+
blobs : List[Tuple[int, bytes]]
1762+
A list of ``(row_address, blob_bytes)`` pairs.
1763+
"""
1764+
selection_kind, selection_values = _resolve_blob_selection(ids, addresses, indices)
1765+
1766+
kwargs = {
1767+
"target_request_bytes": target_request_bytes,
1768+
"max_gap_bytes": max_gap_bytes,
1769+
"max_concurrency": max_concurrency,
1770+
"preserve_order": preserve_order,
1771+
}
1772+
if selection_kind == "ids":
1773+
return self._ds.read_blobs(selection_values, blob_column, **kwargs)
1774+
if selection_kind == "addresses":
1775+
return self._ds.read_blobs_by_addresses(
1776+
selection_values, blob_column, **kwargs
1777+
)
1778+
return self._ds.read_blobs_by_indices(selection_values, blob_column, **kwargs)
1779+
17021780
def head(self, num_rows, **kwargs):
17031781
"""
17041782
Load the first N rows of the dataset.

python/python/lance/lance/__init__.pyi

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,33 @@ class _Dataset:
295295
row_indices: List[int],
296296
blob_column: str,
297297
) -> List[LanceBlobFile]: ...
298+
def read_blobs(
299+
self,
300+
row_ids: List[int],
301+
blob_column: str,
302+
target_request_bytes: Optional[int] = None,
303+
max_gap_bytes: Optional[int] = None,
304+
max_concurrency: Optional[int] = None,
305+
preserve_order: Optional[bool] = None,
306+
) -> List[Tuple[int, bytes]]: ...
307+
def read_blobs_by_addresses(
308+
self,
309+
row_addresses: List[int],
310+
blob_column: str,
311+
target_request_bytes: Optional[int] = None,
312+
max_gap_bytes: Optional[int] = None,
313+
max_concurrency: Optional[int] = None,
314+
preserve_order: Optional[bool] = None,
315+
) -> List[Tuple[int, bytes]]: ...
316+
def read_blobs_by_indices(
317+
self,
318+
row_indices: List[int],
319+
blob_column: str,
320+
target_request_bytes: Optional[int] = None,
321+
max_gap_bytes: Optional[int] = None,
322+
max_concurrency: Optional[int] = None,
323+
preserve_order: Optional[bool] = None,
324+
) -> List[Tuple[int, bytes]]: ...
298325
def take_scan(
299326
self,
300327
row_slices: Iterable[Tuple[int, int]],

python/python/tests/test_blob.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,29 @@
1313
from lance import Blob, BlobColumn, DatasetBasePath
1414

1515

16+
def _blob_row_ids(dataset):
17+
return dataset.to_table(columns=[], with_row_id=True).column("_rowid").to_pylist()
18+
19+
20+
def _blob_row_addresses(dataset):
21+
return (
22+
dataset.to_table(columns=["idx"], with_row_address=True)
23+
.column("_rowaddr")
24+
.to_pylist()
25+
)
26+
27+
28+
def _out_of_order_blob_selection(dataset_with_blobs, selection_kind):
29+
addresses = _blob_row_addresses(dataset_with_blobs)
30+
expected = [(addresses[4], b"quux"), (addresses[0], b"foo")]
31+
32+
if selection_kind == "ids":
33+
return [_blob_row_ids(dataset_with_blobs)[4], _blob_row_ids(dataset_with_blobs)[0]], expected
34+
if selection_kind == "addresses":
35+
return [addresses[4], addresses[0]], expected
36+
return [4, 0], expected
37+
38+
1639
def test_blob_read_from_binary():
1740
values = [b"foo", b"bar", b"baz"]
1841
data = pa.table(
@@ -248,6 +271,125 @@ def test_blob_by_indices(tmp_path, dataset_with_blobs):
248271
assert f1.read() == f2.read()
249272

250273

274+
@pytest.mark.parametrize(
275+
("selection_kind", "selection_values", "expected"),
276+
[
277+
("ids", [0, (1 << 32) + 1], [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
278+
("addresses", [0, (1 << 32) + 1], [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
279+
("indices", [0, 4], [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
280+
],
281+
)
282+
def test_read_blobs(dataset_with_blobs, selection_kind, selection_values, expected):
283+
kwargs = {selection_kind: selection_values}
284+
285+
blobs = dataset_with_blobs.read_blobs(
286+
"blobs",
287+
**kwargs,
288+
target_request_bytes=1024,
289+
max_gap_bytes=64,
290+
max_concurrency=2,
291+
preserve_order=True,
292+
)
293+
294+
assert blobs == expected
295+
296+
297+
def test_read_blobs_requires_single_selector(dataset_with_blobs):
298+
with pytest.raises(
299+
ValueError, match="Exactly one of ids, indices, or addresses must be specified"
300+
):
301+
dataset_with_blobs.read_blobs("blobs", ids=[0], indices=[0])
302+
303+
304+
def test_read_blobs_requires_selector(dataset_with_blobs):
305+
with pytest.raises(
306+
ValueError, match="Exactly one of ids, indices, or addresses must be specified"
307+
):
308+
dataset_with_blobs.read_blobs("blobs")
309+
310+
311+
def test_read_blobs_rejects_non_blob_column(dataset_with_blobs):
312+
with pytest.raises(ValueError, match="not a blob column"):
313+
dataset_with_blobs.read_blobs("idx", indices=[0])
314+
315+
316+
@pytest.mark.parametrize(
317+
("selection_kind", "selection_values", "expected"),
318+
[
319+
("ids", pa.array([0, (1 << 32) + 1], type=pa.uint64()), [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
320+
("addresses", pa.array([0, (1 << 32) + 1], type=pa.uint64()), [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
321+
("indices", pa.array([0, 4], type=pa.uint64()), [(0, b"foo"), ((1 << 32) + 1, b"quux")]),
322+
],
323+
)
324+
def test_read_blobs_accepts_arrow_array_selectors(
325+
dataset_with_blobs, selection_kind, selection_values, expected
326+
):
327+
kwargs = {selection_kind: selection_values}
328+
329+
blobs = dataset_with_blobs.read_blobs("blobs", **kwargs)
330+
331+
assert blobs == expected
332+
333+
334+
@pytest.mark.parametrize(
335+
("selection_kind", "selection_values"),
336+
[
337+
("ids", []),
338+
("addresses", []),
339+
("indices", []),
340+
("ids", pa.array([], type=pa.uint64())),
341+
("addresses", pa.array([], type=pa.uint64())),
342+
("indices", pa.array([], type=pa.uint64())),
343+
],
344+
)
345+
def test_read_blobs_accepts_empty_selection(
346+
dataset_with_blobs, selection_kind, selection_values
347+
):
348+
kwargs = {selection_kind: selection_values}
349+
350+
assert dataset_with_blobs.read_blobs("blobs", **kwargs) == []
351+
352+
353+
@pytest.mark.parametrize(
354+
("planner_kwargs", "error_message"),
355+
[
356+
({"target_request_bytes": 0}, "target_request_bytes must be greater than 0"),
357+
({"max_concurrency": 0}, "max_concurrency must be greater than 0"),
358+
],
359+
)
360+
def test_read_blobs_rejects_invalid_planner_options(
361+
dataset_with_blobs, planner_kwargs, error_message
362+
):
363+
with pytest.raises(ValueError, match=error_message):
364+
dataset_with_blobs.read_blobs("blobs", indices=[0], **planner_kwargs)
365+
366+
367+
@pytest.mark.parametrize("selection_kind", ["ids", "addresses", "indices"])
368+
def test_read_blobs_preserves_input_order(dataset_with_blobs, selection_kind):
369+
selection_values, expected = _out_of_order_blob_selection(
370+
dataset_with_blobs, selection_kind
371+
)
372+
kwargs = {selection_kind: selection_values}
373+
374+
blobs = dataset_with_blobs.read_blobs("blobs", **kwargs, preserve_order=True)
375+
376+
assert blobs == expected
377+
378+
379+
@pytest.mark.parametrize("selection_kind", ["ids", "addresses", "indices"])
380+
def test_read_blobs_without_preserve_order_returns_same_rows(
381+
dataset_with_blobs, selection_kind
382+
):
383+
selection_values, expected = _out_of_order_blob_selection(
384+
dataset_with_blobs, selection_kind
385+
)
386+
kwargs = {selection_kind: selection_values}
387+
388+
blobs = dataset_with_blobs.read_blobs("blobs", **kwargs, preserve_order=False)
389+
390+
assert sorted(blobs) == sorted(expected)
391+
392+
251393
def test_blob_file_seek(tmp_path, dataset_with_blobs):
252394
row_ids = (
253395
dataset_with_blobs.to_table(columns=[], with_row_id=True)
@@ -422,6 +564,12 @@ def test_blob_extension_write_external_slice(tmp_path):
422564
with blob_file as f:
423565
assert f.read() == expected
424566

567+
assert ds.read_blobs("blob", indices=[0, 1, 2]) == [
568+
(0, b"alpha"),
569+
(1, b"bravo"),
570+
(2, b"charlie"),
571+
]
572+
425573

426574
@pytest.mark.parametrize(
427575
("payload", "is_dataset_root"),
@@ -460,3 +608,5 @@ def test_blob_extension_take_blobs_multi_base(payload, is_dataset_root, tmp_path
460608
assert len(blobs) == 1
461609
with blobs[0] as f:
462610
assert f.read() == payload
611+
612+
assert ds.read_blobs("blob", indices=[0]) == [(0, payload)]

0 commit comments

Comments
 (0)