diff --git a/java/lance-jni/src/blocking_blob.rs b/java/lance-jni/src/blocking_blob.rs index 4222e6b89d6..002fa817cf6 100755 --- a/java/lance-jni/src/blocking_blob.rs +++ b/java/lance-jni/src/blocking_blob.rs @@ -177,6 +177,41 @@ fn inner_blob_read_up_to<'local>( Ok(arr) } +#[unsafe(no_mangle)] +pub extern "system" fn Java_org_lance_BlobFile_nativeReadRange<'local>( + mut env: JNIEnv<'local>, + jblob: JObject, + offset: jlong, + len: jint, +) -> jbyteArray { + ok_or_throw_with_return!( + env, + inner_blob_read_range(&mut env, jblob, offset, len).map(|arr| arr.into_raw()), + JByteArray::default().into_raw() + ) +} + +fn inner_blob_read_range<'local>( + env: &mut JNIEnv<'local>, + jblob: JObject, + offset: jlong, + len: jint, +) -> Result> { + let end = (offset as u64) + .checked_add(len as u64) + .ok_or_else(|| lance_core::Error::invalid_input("offset + len overflowed".to_string()))?; + let bytes = { + let blob = unsafe { env.get_rust_field::<_, _, BlockingBlobFile>(jblob, NATIVE_BLOB) }?; + RT.block_on(blob.inner.read_range(offset as u64..end))? + }; + let arr = env.new_byte_array(bytes.len() as jint)?; + let u8_slice: &[u8] = bytes.as_ref(); + let i8_slice: &[i8] = unsafe { transmute(u8_slice) }; + + env.set_byte_array_region(&arr, 0, i8_slice)?; + Ok(arr) +} + #[unsafe(no_mangle)] pub extern "system" fn Java_org_lance_BlobFile_nativeSeek( mut env: JNIEnv, diff --git a/java/src/main/java/org/lance/BlobFile.java b/java/src/main/java/org/lance/BlobFile.java index 03c8408ab57..27af01fe73c 100755 --- a/java/src/main/java/org/lance/BlobFile.java +++ b/java/src/main/java/org/lance/BlobFile.java @@ -59,6 +59,13 @@ public byte[] readUpTo(int len) throws IOException { return nativeReadUpTo(len); } + /** Read a blob-local range without changing the current cursor. */ + public byte[] readRange(long offset, int len) throws IOException { + if (offset < 0) throw new IllegalArgumentException("offset must be non-negative"); + if (len < 0) throw new IllegalArgumentException("len must be non-negative"); + return nativeReadRange(offset, len); + } + /** Seek to a new cursor position. */ public void seek(long newCursor) throws IOException { if (newCursor < 0) throw new IllegalArgumentException("newCursor must be non-negative"); diff --git a/python/python/lance/blob.py b/python/python/lance/blob.py index 1a3f4e946fb..02adfdc95e3 100644 --- a/python/python/lance/blob.py +++ b/python/python/lance/blob.py @@ -282,6 +282,10 @@ def size(self) -> int: def readall(self) -> bytes: return self.inner.readall() + def read_range(self, offset: int, length: int) -> bytes: + """Read a blob-local byte range without changing the current cursor.""" + return self.inner.read_range(offset, length) + def readinto(self, b: bytearray) -> int: return self.inner.read_into(b) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 3c8668abb84..79641ab4438 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -227,6 +227,23 @@ def _is_null_blob_description(description: Any) -> bool: return False +def _resolve_blob_selection( + ids: Optional[Union[List[int], pa.Array]], + addresses: Optional[Union[List[int], pa.Array]], + indices: Optional[Union[List[int], pa.Array]], +) -> Tuple[str, Union[List[int], pa.Array]]: + if sum([bool(v is not None) for v in [ids, addresses, indices]]) != 1: + raise ValueError("Exactly one of ids, indices, or addresses must be specified") + + if ids is not None: + return "ids", ids + if addresses is not None: + return "addresses", addresses + if indices is not None: + return "indices", indices + raise ValueError("Either ids, addresses, or indices must be specified") + + class MergeInsertBuilder(_MergeInsertBuilder): def execute(self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None): """Executes the merge insert operation @@ -1910,21 +1927,77 @@ def take_blobs( ------- blob_files : List[BlobFile] """ - if sum([bool(v is not None) for v in [ids, addresses, indices]]) != 1: - raise ValueError( - "Exactly one of ids, indices, or addresses must be specified" - ) + selection_kind, selection_values = _resolve_blob_selection( + ids, addresses, indices + ) - if ids is not None: - lance_blob_files = self._ds.take_blobs(ids, blob_column) - elif addresses is not None: - lance_blob_files = self._ds.take_blobs_by_addresses(addresses, blob_column) - elif indices is not None: - lance_blob_files = self._ds.take_blobs_by_indices(indices, blob_column) + if selection_kind == "ids": + lance_blob_files = self._ds.take_blobs(selection_values, blob_column) + elif selection_kind == "addresses": + lance_blob_files = self._ds.take_blobs_by_addresses( + selection_values, blob_column + ) else: - raise ValueError("Either ids, addresses, or indices must be specified") + lance_blob_files = self._ds.take_blobs_by_indices( + selection_values, blob_column + ) return [BlobFile(lance_blob_file) for lance_blob_file in lance_blob_files] + def read_blobs( + self, + blob_column: str, + ids: Optional[Union[List[int], pa.Array]] = None, + addresses: Optional[Union[List[int], pa.Array]] = None, + indices: Optional[Union[List[int], pa.Array]] = None, + *, + io_buffer_size: Optional[int] = None, + preserve_order: Optional[bool] = None, + ) -> List[Tuple[int, bytes]]: + """ + Read blobs directly into memory using Lance's planned blob reader. + + Unlike :py:meth:`take_blobs`, which returns file-like :py:class:`lance.BlobFile` + handles for random access, this API plans and executes batched reads and + returns materialized blob payloads. + + Exactly one of ids, addresses, or indices must be specified. + + Parameters + ---------- + blob_column : str + The name of the blob column to read. + ids : Integer Array or array-like + Row IDs to read in the dataset. + addresses : Integer Array or array-like + The (unstable) row addresses to read in the dataset. + indices : Integer Array or array-like + The offset / indices of the row in the dataset. + io_buffer_size : int, optional + Override the scheduler I/O buffer size used while materializing blobs. + preserve_order : bool, optional + If True, returned rows follow the requested selection order. + + Returns + ------- + blobs : List[Tuple[int, bytes]] + A list of ``(row_address, blob_bytes)`` pairs. + """ + selection_kind, selection_values = _resolve_blob_selection( + ids, addresses, indices + ) + + kwargs = { + "io_buffer_size": io_buffer_size, + "preserve_order": preserve_order, + } + if selection_kind == "ids": + return self._ds.read_blobs(selection_values, blob_column, **kwargs) + if selection_kind == "addresses": + return self._ds.read_blobs_by_addresses( + selection_values, blob_column, **kwargs + ) + return self._ds.read_blobs_by_indices(selection_values, blob_column, **kwargs) + def head(self, num_rows, **kwargs): """ Load the first N rows of the dataset. diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 60398f9a3ae..82eaae3c97c 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -288,6 +288,27 @@ class _Dataset: row_indices: List[int], blob_column: str, ) -> List[LanceBlobFile]: ... + def read_blobs( + self, + row_ids: List[int], + blob_column: str, + io_buffer_size: Optional[int] = None, + preserve_order: Optional[bool] = None, + ) -> List[Tuple[int, bytes]]: ... + def read_blobs_by_addresses( + self, + row_addresses: List[int], + blob_column: str, + io_buffer_size: Optional[int] = None, + preserve_order: Optional[bool] = None, + ) -> List[Tuple[int, bytes]]: ... + def read_blobs_by_indices( + self, + row_indices: List[int], + blob_column: str, + io_buffer_size: Optional[int] = None, + preserve_order: Optional[bool] = None, + ) -> List[Tuple[int, bytes]]: ... def take_scan( self, row_slices: Iterable[Tuple[int, int]], diff --git a/python/python/tests/test_blob.py b/python/python/tests/test_blob.py index 01374a62b8e..c2baae79ce3 100644 --- a/python/python/tests/test_blob.py +++ b/python/python/tests/test_blob.py @@ -16,6 +16,32 @@ lance_dataset_module = importlib.import_module("lance.dataset") +def _blob_row_ids(dataset): + return dataset.to_table(columns=[], with_row_id=True).column("_rowid").to_pylist() + + +def _blob_row_addresses(dataset): + return ( + dataset.to_table(columns=["idx"], with_row_address=True) + .column("_rowaddr") + .to_pylist() + ) + + +def _out_of_order_blob_selection(dataset_with_blobs, selection_kind): + addresses = _blob_row_addresses(dataset_with_blobs) + expected = [(addresses[4], b"quux"), (addresses[0], b"foo")] + + if selection_kind == "ids": + return [ + _blob_row_ids(dataset_with_blobs)[4], + _blob_row_ids(dataset_with_blobs)[0], + ], expected + if selection_kind == "addresses": + return [addresses[4], addresses[0]], expected + return [4, 0], expected + + def test_blob_read_from_binary(): values = [b"foo", b"bar", b"baz"] data = pa.table( @@ -251,6 +277,134 @@ def test_blob_by_indices(tmp_path, dataset_with_blobs): assert f1.read() == f2.read() +@pytest.mark.parametrize( + ("selection_kind", "selection_values", "expected"), + [ + ("ids", [0, (1 << 32) + 1], [(0, b"foo"), ((1 << 32) + 1, b"quux")]), + ("addresses", [0, (1 << 32) + 1], [(0, b"foo"), ((1 << 32) + 1, b"quux")]), + ("indices", [0, 4], [(0, b"foo"), ((1 << 32) + 1, b"quux")]), + ], +) +def test_read_blobs(dataset_with_blobs, selection_kind, selection_values, expected): + kwargs = {selection_kind: selection_values} + + blobs = dataset_with_blobs.read_blobs( + "blobs", + **kwargs, + io_buffer_size=1024, + preserve_order=True, + ) + + assert blobs == expected + + +def test_read_blobs_requires_single_selector(dataset_with_blobs): + with pytest.raises( + ValueError, match="Exactly one of ids, indices, or addresses must be specified" + ): + dataset_with_blobs.read_blobs("blobs", ids=[0], indices=[0]) + + +def test_read_blobs_requires_selector(dataset_with_blobs): + with pytest.raises( + ValueError, match="Exactly one of ids, indices, or addresses must be specified" + ): + dataset_with_blobs.read_blobs("blobs") + + +def test_read_blobs_rejects_non_blob_column(dataset_with_blobs): + with pytest.raises(ValueError, match="not a blob column"): + dataset_with_blobs.read_blobs("idx", indices=[0]) + + +@pytest.mark.parametrize( + ("selection_kind", "selection_values", "expected"), + [ + ( + "ids", + pa.array([0, (1 << 32) + 1], type=pa.uint64()), + [(0, b"foo"), ((1 << 32) + 1, b"quux")], + ), + ( + "addresses", + pa.array([0, (1 << 32) + 1], type=pa.uint64()), + [(0, b"foo"), ((1 << 32) + 1, b"quux")], + ), + ( + "indices", + pa.array([0, 4], type=pa.uint64()), + [(0, b"foo"), ((1 << 32) + 1, b"quux")], + ), + ], +) +def test_read_blobs_accepts_arrow_array_selectors( + dataset_with_blobs, selection_kind, selection_values, expected +): + kwargs = {selection_kind: selection_values} + + blobs = dataset_with_blobs.read_blobs("blobs", **kwargs) + + assert blobs == expected + + +@pytest.mark.parametrize( + ("selection_kind", "selection_values"), + [ + ("ids", []), + ("addresses", []), + ("indices", []), + ("ids", pa.array([], type=pa.uint64())), + ("addresses", pa.array([], type=pa.uint64())), + ("indices", pa.array([], type=pa.uint64())), + ], +) +def test_read_blobs_accepts_empty_selection( + dataset_with_blobs, selection_kind, selection_values +): + kwargs = {selection_kind: selection_values} + + assert dataset_with_blobs.read_blobs("blobs", **kwargs) == [] + + +@pytest.mark.parametrize( + ("planner_kwargs", "error_message"), + [ + ({"io_buffer_size": 0}, "io_buffer_size must be greater than 0"), + ], +) +def test_read_blobs_rejects_invalid_planner_options( + dataset_with_blobs, planner_kwargs, error_message +): + with pytest.raises(ValueError, match=error_message): + dataset_with_blobs.read_blobs("blobs", indices=[0], **planner_kwargs) + + +@pytest.mark.parametrize("selection_kind", ["ids", "addresses", "indices"]) +def test_read_blobs_preserves_input_order(dataset_with_blobs, selection_kind): + selection_values, expected = _out_of_order_blob_selection( + dataset_with_blobs, selection_kind + ) + kwargs = {selection_kind: selection_values} + + blobs = dataset_with_blobs.read_blobs("blobs", **kwargs, preserve_order=True) + + assert blobs == expected + + +@pytest.mark.parametrize("selection_kind", ["ids", "addresses", "indices"]) +def test_read_blobs_without_preserve_order_returns_same_rows( + dataset_with_blobs, selection_kind +): + selection_values, expected = _out_of_order_blob_selection( + dataset_with_blobs, selection_kind + ) + kwargs = {selection_kind: selection_values} + + blobs = dataset_with_blobs.read_blobs("blobs", **kwargs, preserve_order=False) + + assert sorted(blobs) == sorted(expected) + + def test_blob_file_seek(tmp_path, dataset_with_blobs): row_ids = ( dataset_with_blobs.to_table(columns=[], with_row_id=True) @@ -466,6 +620,12 @@ def test_blob_extension_write_external_slice(tmp_path): with blob_file as f: assert f.read() == expected + assert ds.read_blobs("blob", indices=[0, 1, 2]) == [ + (0, b"alpha"), + (1, b"bravo"), + (2, b"charlie"), + ] + def test_blob_extension_write_external_slice_ingest(tmp_path): tar_path = tmp_path / "container.tar" @@ -548,6 +708,8 @@ def test_blob_extension_take_blobs_multi_base(payload, is_dataset_root, tmp_path with blobs[0] as f: assert f.read() == payload + assert ds.read_blobs("blob", indices=[0]) == [(0, payload)] + @pytest.fixture def dataset_for_pandas_blob_tests(tmp_path): diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 7e139446819..1885b78bd64 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -119,6 +119,30 @@ const DEFAULT_NPROBES: usize = 1; const LANCE_COMMIT_MESSAGE_KEY: &str = "__lance_commit_message"; const INDEX_PROGRESS_QUEUE_SIZE: usize = 1024; +fn read_blobs_to_python( + py: Python<'_>, + blobs: Vec, +) -> Vec<(u64, Py)> { + blobs + .into_iter() + .map(|blob| (blob.row_address, PyBytes::new(py, &blob.data).unbind())) + .collect() +} + +fn configure_read_blobs_builder( + mut builder: lance::dataset::ReadBlobsBuilder, + io_buffer_size: Option, + preserve_order: Option, +) -> lance::dataset::ReadBlobsBuilder { + if let Some(bytes) = io_buffer_size { + builder = builder.with_io_buffer_size_bytes(bytes); + } + if let Some(preserve) = preserve_order { + builder = builder.preserve_order(preserve); + } + builder +} + fn convert_reader(reader: &Bound) -> PyResult> { let py = reader.py(); if reader.is_instance_of::() { @@ -1247,6 +1271,90 @@ impl Dataset { Ok(blobs.into_iter().map(LanceBlobFile::from).collect()) } + #[pyo3(signature=( + row_ids, + blob_column, + io_buffer_size=None, + preserve_order=None + ))] + fn read_blobs( + self_: PyRef<'_, Self>, + row_ids: Vec, + blob_column: &str, + io_buffer_size: Option, + preserve_order: Option, + ) -> PyResult)>> { + let builder = configure_read_blobs_builder( + self_ + .ds + .read_blobs(blob_column) + .infer_error()? + .with_row_ids(row_ids), + io_buffer_size, + preserve_order, + ); + let blobs = rt() + .block_on(Some(self_.py()), builder.execute())? + .infer_error()?; + Ok(read_blobs_to_python(self_.py(), blobs)) + } + + #[pyo3(signature=( + row_addresses, + blob_column, + io_buffer_size=None, + preserve_order=None + ))] + fn read_blobs_by_addresses( + self_: PyRef<'_, Self>, + row_addresses: Vec, + blob_column: &str, + io_buffer_size: Option, + preserve_order: Option, + ) -> PyResult)>> { + let builder = configure_read_blobs_builder( + self_ + .ds + .read_blobs(blob_column) + .infer_error()? + .with_row_addresses(row_addresses), + io_buffer_size, + preserve_order, + ); + let blobs = rt() + .block_on(Some(self_.py()), builder.execute())? + .infer_error()?; + Ok(read_blobs_to_python(self_.py(), blobs)) + } + + #[pyo3(signature=( + row_indices, + blob_column, + io_buffer_size=None, + preserve_order=None + ))] + fn read_blobs_by_indices( + self_: PyRef<'_, Self>, + row_indices: Vec, + blob_column: &str, + io_buffer_size: Option, + preserve_order: Option, + ) -> PyResult)>> { + let builder = configure_read_blobs_builder( + self_ + .ds + .read_blobs(blob_column) + .infer_error()? + .with_row_indices(row_indices), + io_buffer_size, + preserve_order, + ); + let blobs = rt() + .block_on(Some(self_.py()), builder.execute())? + .infer_error()?; + Ok(read_blobs_to_python(self_.py(), blobs)) + } + #[pyo3(signature = (row_slices, columns = None, batch_readahead = 10))] fn take_scan( &self, diff --git a/python/src/dataset/blob.rs b/python/src/dataset/blob.rs index 569a0e3ed8e..1f6272075f2 100644 --- a/python/src/dataset/blob.rs +++ b/python/src/dataset/blob.rs @@ -62,6 +62,23 @@ impl LanceBlobFile { Ok(PyBytes::new(py, &data)) } + /// Read a blob-local byte range without changing the current cursor. + pub fn read_range<'a>( + &'a self, + py: Python<'a>, + offset: u64, + length: usize, + ) -> PyResult> { + let end = offset + .checked_add(length as u64) + .ok_or_else(|| PyValueError::new_err("offset + length overflowed"))?; + let inner = self.inner.clone(); + let data = rt() + .block_on(Some(py), inner.read_range(offset..end))? + .infer_error()?; + Ok(PyBytes::new(py, &data)) + } + pub fn read_into(&self, dst: Bound<'_, PyByteArray>) -> PyResult { let inner = self.inner.clone(); diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 9763f8bb68a..8a7a9cf3636 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -113,7 +113,7 @@ use crate::io::commit::{ use crate::session::Session; use crate::utils::temporal::{SystemTime, timestamp_to_nanos, utc_now}; use crate::{Error, Result}; -pub use blob::BlobFile; +pub use blob::{BlobFile, ReadBlob, ReadBlobsBuilder, ReadBlobsStream}; use hash_joiner::HashJoiner; pub use lance_core::ROW_ID; use lance_core::box_error; @@ -1486,6 +1486,38 @@ impl Dataset { blob::take_blobs_by_addresses(self, &row_addrs, column.as_ref()).await } + /// Create a planned blob reader for a blob column. + /// + /// This API complements [`Self::take_blobs`]. `take_blobs` returns + /// [`BlobFile`] handles for caller-driven random access, while + /// `read_blobs` builds a streaming read plan for sequential or batched blob + /// retrieval. + /// + /// ```rust + /// # use std::sync::Arc; + /// # use futures::TryStreamExt; + /// # use lance::dataset::Dataset; + /// # use lance::Result; + /// # async fn example(dataset: Arc) -> Result<()> { + /// let blobs = dataset + /// .read_blobs("images")? + /// .with_row_indices(vec![0, 1, 2]) + /// .execute() + /// .await?; + /// # let _ = blobs; + /// # Ok(()) + /// # } + /// ``` + pub fn read_blobs(self: &Arc, column: impl AsRef) -> Result { + let column = column.as_ref(); + let blob_field_id = blob::validate_blob_column(self, column)?; + Ok(ReadBlobsBuilder::new( + self.clone(), + column.to_string(), + blob_field_id, + )) + } + /// Get a stream of batches based on iterator of ranges of row numbers. /// /// This is an experimental API. It may change at any time. diff --git a/rust/lance/src/dataset/blob.rs b/rust/lance/src/dataset/blob.rs index 29ed15533aa..a828f3fc336 100644 --- a/rust/lance/src/dataset/blob.rs +++ b/rust/lance/src/dataset/blob.rs @@ -2,10 +2,12 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap, VecDeque}, future::Future, ops::{DerefMut, Range}, + panic::AssertUnwindSafe, sync::Arc, + task::Poll, }; use arrow::array::AsArray; @@ -14,11 +16,15 @@ use arrow_array::Array; use arrow_array::RecordBatch; use arrow_array::builder::{LargeBinaryBuilder, PrimitiveBuilder, StringBuilder}; use arrow_schema::DataType as ArrowDataType; +use bytes::Bytes; +use futures::stream::BoxStream; +use futures::{FutureExt, StreamExt, TryStreamExt, stream}; use lance_arrow::{BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY, FieldExt}; use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; +use lance_io::scheduler::{FileScheduler, ScanScheduler, SchedulerConfig}; use object_store::path::Path; use tokio::io::AsyncWriteExt; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, OnceCell, oneshot}; use url::Url; use super::take::TakeBuilder; @@ -29,11 +35,11 @@ use lance_core::datatypes::{BlobKind, BlobVersion}; use lance_core::utils::blob::blob_path; use lance_core::{Error, Result, utils::address::RowAddress}; use lance_io::traits::{Reader, WriteExt, Writer}; +use lance_io::utils::CachedFileSize; const INLINE_MAX: usize = 64 * 1024; // 64KB inline cutoff const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; // 4MB dedicated cutoff const PACK_FILE_MAX_SIZE: usize = 1024 * 1024 * 1024; // 1GiB per .pack sidecar - #[derive(Clone, Debug, PartialEq, Eq)] pub(super) struct ResolvedExternalBase { pub base_id: u32, @@ -715,29 +721,267 @@ pub async fn preprocess_blob_batches( Ok(out) } -/// Current state of the reader. Held in a mutex for easy sharing +/// Mutable state for a [`BlobFile`] cursor. /// -/// The u64 is the cursor in the file that the reader is currently at -/// (note that seeks are allowed before the file is opened) +/// The cursor is logical to the blob slice, not the backing object. Once closed, +/// subsequent cursor-based and range-based reads are rejected, but reads that +/// were already in flight may still complete. #[derive(Debug)] -enum ReaderState { - Uninitialized(u64), - Open((u64, Arc)), +enum BlobFileState { + Open(u64), Closed, } -/// A file-like object that represents a blob in a dataset +/// Shared physical read context for blob handles that resolve to the same object. +/// +/// Blob descriptors are logical slices over a backing object (data file, packed +/// sidecar, dedicated sidecar, or external object). Multiple [`BlobFile`] values +/// can point at different regions of that same object. This struct gives those +/// handles a single lazy-open scheduler plus a lightweight pending queue so +/// concurrent reads can be opportunistically grouped before reaching Lance's +/// existing I/O scheduler. #[derive(Debug)] -pub struct BlobFile { +struct BlobSource { object_store: Arc, path: Path, - reader: Arc>, + file_size: CachedFileSize, + scheduler: OnceCell, + pending_reads: Mutex, +} + +impl BlobSource { + /// Create a shared read context for one physical backing object. + fn new(object_store: Arc, path: Path) -> Self { + Self { + object_store, + path, + file_size: CachedFileSize::unknown(), + scheduler: OnceCell::new(), + pending_reads: Mutex::new(PendingBlobReads::default()), + } + } + + /// Read one or more physical ranges from this source. + /// + /// Concurrent callers enqueue their requests into `pending_reads`. The first + /// caller in a drain cycle becomes the leader and spawns the batch drain task. + /// The mutex critical section only updates in-memory queue bookkeeping; the + /// actual scheduler submission happens after the lock is released. + async fn read_ranges(self: &Arc, ranges: Vec>) -> Result> { + if ranges.is_empty() { + return Ok(Vec::new()); + } + + let scheduler = self + .scheduler + .get_or_try_init(|| async { + ScanScheduler::new( + self.object_store.clone(), + SchedulerConfig::max_bandwidth(self.object_store.as_ref()), + ) + .open_file(&self.path, &self.file_size) + .await + }) + .await?; + + let (response_tx, response_rx) = oneshot::channel(); + let should_spawn = { + let mut pending_reads = self.pending_reads.lock().await; + pending_reads.requests.push(PendingBlobRead { + ranges, + response: response_tx, + }); + if pending_reads.is_draining { + false + } else { + pending_reads.is_draining = true; + true + } + }; + + if should_spawn { + let source = self.clone(); + let scheduler = scheduler.clone(); + tokio::spawn(async move { + let result = AssertUnwindSafe(source.clone().drain_pending_reads(scheduler)) + .catch_unwind() + .await; + if let Err(panic) = result { + let mut pending_reads = source.pending_reads.lock().await; + pending_reads.is_draining = false; + std::panic::resume_unwind(panic); + } + }); + } + + response_rx.await.map_err(|_| { + Error::internal("Blob source read task dropped the response".to_string()) + })? + } + + /// Drain currently queued requests and submit them as scheduler batches. + /// + /// Each loop iteration grabs the queued requests with a short mutex hold and + /// immediately releases the lock before any I/O is awaited. + async fn drain_pending_reads(self: Arc, scheduler: FileScheduler) { + loop { + let batch = { + let mut pending_reads = self.pending_reads.lock().await; + if pending_reads.requests.is_empty() { + pending_reads.is_draining = false; + return; + } + std::mem::take(&mut pending_reads.requests) + }; + fulfill_pending_blob_reads(&scheduler, batch).await; + } + } +} + +/// Queue of pending logical blob reads for one [`BlobSource`]. +/// +/// `is_draining` marks whether a leader task is already draining the queue. +#[derive(Default, Debug)] +struct PendingBlobReads { + requests: Vec, + is_draining: bool, +} + +/// Pending logical blob reads waiting to be grouped into one scheduler batch. +/// +/// This queue exists only to combine overlapping concurrent calls. The actual +/// coalescing and physical I/O scheduling still happens in [`FileScheduler`]. +#[derive(Debug)] +struct PendingBlobRead { + ranges: Vec>, + response: oneshot::Sender>>, +} + +/// Submit one grouped batch of pending blob reads to Lance's [`FileScheduler`]. +/// +/// The function flattens all logical requests into one range list, preserves the +/// caller-visible order for each request, and fans the bytes back out after the +/// scheduler completes its own merge / split logic. +async fn fulfill_pending_blob_reads(scheduler: &FileScheduler, batch: Vec) { + let total_ranges = batch + .iter() + .map(|request| request.ranges.len()) + .sum::(); + let mut request_ranges = Vec::with_capacity(total_ranges); + let mut response = batch + .iter() + .map(|request| vec![Bytes::new(); request.ranges.len()]) + .collect::>(); + + for (request_idx, request) in batch.iter().enumerate() { + for (range_idx, range) in request.ranges.iter().enumerate() { + if range.is_empty() { + continue; + } + request_ranges.push((range.clone(), request_idx, range_idx)); + } + } + + let result = if request_ranges.is_empty() { + Ok(()) + } else { + request_ranges.sort_by_key(|(range, _, _)| (range.start, range.end)); + let priority = request_ranges[0].0.start; + match scheduler + .submit_request( + request_ranges + .iter() + .map(|(range, _, _)| range.clone()) + .collect::>(), + priority, + ) + .await + { + Ok(bytes_vec) => { + for ((_, request_idx, range_idx), bytes) in + request_ranges.into_iter().zip(bytes_vec) + { + response[request_idx][range_idx] = bytes; + } + Ok(()) + } + Err(err) => Err(err), + } + }; + + match result { + Ok(()) => { + for (request, bytes) in batch.into_iter().zip(response) { + let _ = request.response.send(Ok(bytes)); + } + } + Err(err) => { + let message = format!( + "Failed to read blob source {}: {}", + scheduler.reader().path(), + err + ); + for request in batch { + let _ = request.response.send(Err(Error::io(message.clone()))); + } + } + } +} + +/// Cache key for sharing one [`BlobSource`] across multiple blob descriptors. +/// +/// We include the store prefix as well as the path so the same path string in +/// different object stores is never conflated. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct BlobSourceKey { + store_prefix: String, + path: String, +} + +impl BlobSourceKey { + /// Build the cache key for one shared [`BlobSource`]. + fn new(source: &BlobSource) -> Self { + Self { + store_prefix: source.object_store.store_prefix.clone(), + path: source.path.to_string(), + } + } +} + +/// Return a shared [`BlobSource`] for the given physical object. +/// +/// This keeps all blob handles that resolve to the same `(store, path)` on a +/// single lazy-open scheduler and pending-read queue. +fn shared_blob_source( + source_cache: &mut HashMap>, + object_store: Arc, + path: &Path, +) -> Arc { + let key = BlobSourceKey { + store_prefix: object_store.store_prefix.clone(), + path: path.to_string(), + }; + source_cache + .entry(key) + .or_insert_with(|| Arc::new(BlobSource::new(object_store, path.clone()))) + .clone() +} + +/// A file-like object that represents a blob in a dataset +#[derive(Debug)] +pub struct BlobFile { + source: Arc, + state: Arc>, position: u64, size: u64, kind: BlobKind, uri: Option, } +/// Base-aware physical location metadata used while resolving blob reads. +/// +/// This is cached per fragment so repeated rows from the same fragment do not +/// recompute the object store, data directory, and data file key. #[derive(Clone)] struct BlobReadLocation { object_store: Arc, @@ -747,22 +991,20 @@ struct BlobReadLocation { } impl BlobFile { - fn with_location( - object_store: Arc, - path: Path, + fn with_source( + source: Arc, position: u64, size: u64, kind: BlobKind, uri: Option, ) -> Self { Self { - object_store, - path, + source, position, size, kind, uri, - reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))), + state: Arc::new(Mutex::new(BlobFileState::Open(0))), } } @@ -783,7 +1025,13 @@ impl BlobFile { position: u64, size: u64, ) -> Self { - Self::with_location(object_store, path, position, size, BlobKind::Inline, None) + Self::with_source( + Arc::new(BlobSource::new(object_store, path)), + position, + size, + BlobKind::Inline, + None, + ) } /// Create a dedicated blob reader backed by a sidecar `.blob` file. @@ -797,7 +1045,13 @@ impl BlobFile { /// * `path` - Full path to the dedicated sidecar blob file. /// * `size` - Total byte length to expose from the sidecar file. pub fn new_dedicated(object_store: Arc, path: Path, size: u64) -> Self { - Self::with_location(object_store, path, 0, size, BlobKind::Dedicated, None) + Self::with_source( + Arc::new(BlobSource::new(object_store, path)), + 0, + size, + BlobKind::Dedicated, + None, + ) } /// Create a packed blob reader for a slice inside a shared sidecar `.blob` file. @@ -817,7 +1071,13 @@ impl BlobFile { position: u64, size: u64, ) -> Self { - Self::with_location(object_store, path, position, size, BlobKind::Packed, None) + Self::with_source( + Arc::new(BlobSource::new(object_store, path)), + position, + size, + BlobKind::Packed, + None, + ) } /// Create an external blob reader backed by a caller-resolved object location. @@ -840,9 +1100,8 @@ impl BlobFile { position: u64, size: u64, ) -> Self { - Self::with_location( - object_store, - path, + Self::with_source( + Arc::new(BlobSource::new(object_store, path)), position, size, BlobKind::External, @@ -852,58 +1111,118 @@ impl BlobFile { /// Close the blob file, releasing any associated resources pub async fn close(&self) -> Result<()> { - let mut reader = self.reader.lock().await; - *reader = ReaderState::Closed; + let mut state = self.state.lock().await; + *state = BlobFileState::Closed; Ok(()) } /// Returns true if the blob file is closed pub async fn is_closed(&self) -> bool { - matches!(*self.reader.lock().await, ReaderState::Closed) + matches!(*self.state.lock().await, BlobFileState::Closed) } - async fn do_with_reader< - T, - Fut: Future>, - Func: FnOnce(u64, Arc) -> Fut, - >( + async fn do_with_cursor>, Func: FnOnce(u64) -> Fut>( &self, func: Func, ) -> Result { - let mut reader = self.reader.lock().await; - if let ReaderState::Uninitialized(cursor) = *reader { - let opened = self.object_store.open(&self.path).await?; - let opened = Arc::::from(opened); - *reader = ReaderState::Open((cursor, opened.clone())); - } - match reader.deref_mut() { - ReaderState::Open((cursor, reader)) => { - let (new_cursor, data) = func(*cursor, reader.clone()).await?; + let mut state = self.state.lock().await; + match state.deref_mut() { + BlobFileState::Open(cursor) => { + let (new_cursor, data) = func(*cursor).await?; *cursor = new_cursor; Ok(data) } - ReaderState::Closed => Err(Error::invalid_input( + BlobFileState::Closed => Err(Error::invalid_input( "Blob file is already closed".to_string(), )), - _ => unreachable!(), } } + async fn ensure_open(&self) -> Result<()> { + let state = self.state.lock().await; + match *state { + BlobFileState::Open(_) => Ok(()), + BlobFileState::Closed => Err(Error::invalid_input( + "Blob file is already closed".to_string(), + )), + } + } + + fn read_phys_range(&self, range: Range) -> Result> { + if range.start > range.end { + return Err(Error::invalid_input(format!( + "Blob range start {} must be <= end {}", + range.start, range.end + ))); + } + if range.end > self.size { + return Err(Error::invalid_input(format!( + "Blob range end {} exceeds blob size {}", + range.end, self.size + ))); + } + let start = self.position.checked_add(range.start).ok_or_else(|| { + Error::invalid_input(format!( + "Blob range start overflowed physical position: base={} offset={}", + self.position, range.start + )) + })?; + let end = self.position.checked_add(range.end).ok_or_else(|| { + Error::invalid_input(format!( + "Blob range end overflowed physical position: base={} offset={}", + self.position, range.end + )) + })?; + Ok(start..end) + } + + /// Read a byte range relative to the beginning of this blob without changing the cursor. + /// + /// The provided range is interpreted in blob-local coordinates, not object + /// coordinates. Empty ranges are allowed. This method is intended for random + /// access callers that want deterministic range semantics instead of the + /// stateful file-like cursor used by [`Self::read`] and [`Self::read_up_to`]. + pub async fn read_range(&self, range: Range) -> Result { + let mut data = self.read_ranges(&[range]).await?; + Ok(data.pop().unwrap_or_default()) + } + + /// Read multiple ranges relative to the beginning of this blob without changing the cursor. + /// + /// Empty ranges are allowed and yield empty buffers. The result order always + /// matches the input order, even though the underlying physical requests may + /// be reordered, coalesced, or split for efficiency. + pub async fn read_ranges(&self, ranges: &[Range]) -> Result> { + self.ensure_open().await?; + let physical_ranges = ranges + .iter() + .cloned() + .map(|range| self.read_phys_range(range)) + .collect::>>()?; + self.source.read_ranges(physical_ranges).await + } + /// Read the entire blob file from the current cursor position /// to the end of the file /// /// After this call the cursor will be pointing to the end of /// the file. pub async fn read(&self) -> Result { - let position = self.position; let size = self.size; - self.do_with_reader(|cursor, reader| async move { - if cursor >= size { - return Ok((size, bytes::Bytes::new())); + let source = self.source.clone(); + let position = self.position; + self.do_with_cursor(move |cursor| { + let source = source.clone(); + async move { + if cursor >= size { + return Ok((size, Bytes::new())); + } + let physical = (position + cursor)..(position + size); + Ok(( + size, + source.read_ranges(vec![physical]).await?.pop().unwrap(), + )) } - let start = position as usize + cursor as usize; - let end = (position + size) as usize; - Ok((size, reader.get_range(start..end).await?)) }) .await } @@ -913,48 +1232,47 @@ impl BlobFile { /// After this call the cursor will be pointing to the end of /// the read data. pub async fn read_up_to(&self, len: usize) -> Result { - let position = self.position; let size = self.size; - self.do_with_reader(|cursor, reader| async move { - if cursor >= size || len == 0 { - return Ok((size.min(cursor), bytes::Bytes::new())); + let source = self.source.clone(); + let position = self.position; + self.do_with_cursor(move |cursor| { + let source = source.clone(); + async move { + if cursor >= size || len == 0 { + return Ok((size.min(cursor), Bytes::new())); + } + let read_size = len.min((size - cursor) as usize) as u64; + let start = position + cursor; + let end = start + read_size; + let data = source.read_ranges(vec![start..end]).await?.pop().unwrap(); + Ok((cursor + read_size, data)) } - let start = position as usize + cursor as usize; - let read_size = len.min((size - cursor) as usize); - let end = start + read_size; - let data = reader.get_range(start..end).await?; - Ok((end as u64 - position, data)) }) .await } /// Seek to a new cursor position in the file pub async fn seek(&self, new_cursor: u64) -> Result<()> { - let mut reader = self.reader.lock().await; - match reader.deref_mut() { - ReaderState::Open((cursor, _)) => { + let mut state = self.state.lock().await; + match state.deref_mut() { + BlobFileState::Open(cursor) => { *cursor = new_cursor; Ok(()) } - ReaderState::Closed => Err(Error::invalid_input( + BlobFileState::Closed => Err(Error::invalid_input( "Blob file is already closed".to_string(), )), - ReaderState::Uninitialized(cursor) => { - *cursor = new_cursor; - Ok(()) - } } } /// Return the current cursor position in the file pub async fn tell(&self) -> Result { - let reader = self.reader.lock().await; - match *reader { - ReaderState::Open((cursor, _)) => Ok(cursor), - ReaderState::Closed => Err(Error::invalid_input( + let state = self.state.lock().await; + match *state { + BlobFileState::Open(cursor) => Ok(cursor), + BlobFileState::Closed => Err(Error::invalid_input( "Blob file is already closed".to_string(), )), - ReaderState::Uninitialized(cursor) => Ok(cursor), } } @@ -968,7 +1286,7 @@ impl BlobFile { } pub fn data_path(&self) -> &Path { - &self.path + &self.source.path } pub fn kind(&self) -> BlobKind { @@ -980,140 +1298,555 @@ impl BlobFile { } } -pub(super) async fn take_blobs( - dataset: &Arc, - row_ids: &[u64], - column: &str, -) -> Result> { - let projection = dataset.schema().project(&[column])?; - let blob_field = &projection.fields[0]; - let blob_field_id = blob_field.id; - if !projection.fields[0].is_blob() { - return Err(Error::invalid_input_source( - format!("the column '{}' is not a blob column", column).into(), - )); - } - let description_and_addr = dataset - .take_builder(row_ids, projection)? - .with_row_address(true) - .execute() - .await?; - let descriptions = description_and_addr.column(0).as_struct(); - let row_addrs = description_and_addr.column(1).as_primitive::(); - let blob_field_id = blob_field_id as u32; +/// Blob bytes materialized by [`ReadBlobsBuilder`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ReadBlob { + /// Row address of the blob that was read. + pub row_address: u64, + /// Blob payload bytes. + pub data: Bytes, +} - match blob_version_from_descriptions(descriptions)? { - BlobVersion::V1 => collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs), - BlobVersion::V2 => { - collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs).await +/// Stream returned by [`ReadBlobsBuilder::try_into_stream`]. +pub type ReadBlobsStream = BoxStream<'static, Result>; + +/// Row selector configured on [`ReadBlobsBuilder`]. +#[derive(Debug, Clone)] +enum ReadBlobsSelection { + None, + RowIds(Vec), + RowIndices(Vec), + RowAddresses(Vec), +} + +/// Planner knobs for [`ReadBlobsBuilder`]. +/// +/// Options that shape how `read_blobs` uses Lance's existing schedulers. +#[derive(Debug, Clone)] +struct ReadBlobsOptions { + io_buffer_size_bytes: Option, + preserve_order: bool, +} + +impl Default for ReadBlobsOptions { + fn default() -> Self { + Self { + io_buffer_size_bytes: None, + preserve_order: true, } } } -/// Take [BlobFile] by row addresses. +/// Builder for sequential / planned blob reads. /// -/// Row addresses are `u64` values encoding `(fragment_id << 32) | row_offset`. -/// Use this method when you already have row addresses, for example from -/// a scan with `with_row_address()`. For row IDs (stable identifiers), use -/// [`Dataset::take_blobs`]. For row indices (offsets), use -/// [`Dataset::take_blobs_by_indices`]. -pub async fn take_blobs_by_addresses( - dataset: &Arc, - row_addrs: &[u64], - column: &str, -) -> Result> { - let projection = dataset.schema().project(&[column])?; - let blob_field = &projection.fields[0]; - let blob_field_id = blob_field.id; - if !projection.fields[0].is_blob() { - return Err(Error::invalid_input_source( - format!("the column '{}' is not a blob column", column).into(), - )); +/// Unlike [`Dataset::take_blobs`], which returns [`BlobFile`] handles for +/// caller-driven random access, this builder plans object-store reads across a +/// selected row set and yields fully materialized blob payloads. +#[derive(Debug, Clone)] +pub struct ReadBlobsBuilder { + dataset: Arc, + column: String, + blob_field_id: u32, + selection: ReadBlobsSelection, + options: ReadBlobsOptions, +} + +impl ReadBlobsBuilder { + pub(crate) fn new(dataset: Arc, column: String, blob_field_id: u32) -> Self { + Self { + dataset, + column, + blob_field_id, + selection: ReadBlobsSelection::None, + options: ReadBlobsOptions::default(), + } } - // Convert Schema to ProjectionPlan - let projection_request = ProjectionRequest::from(projection); - let projection_plan = Arc::new(projection_request.into_projection_plan(dataset.clone())?); + /// Read blobs for the provided stable row ids. + pub fn with_row_ids(mut self, row_ids: impl Into>) -> Self { + self.selection = ReadBlobsSelection::RowIds(row_ids.into()); + self + } - // Use try_new_from_addresses to bypass row ID index lookup. - // This is critical when enable_stable_row_ids=true because row addresses - // (fragment_id << 32 | row_offset) are different from row IDs (sequential integers). - let description_and_addr = - TakeBuilder::try_new_from_addresses(dataset.clone(), row_addrs.to_vec(), projection_plan)? - .with_row_address(true) - .execute() - .await?; + /// Read blobs for the provided row offsets in dataset order. + pub fn with_row_indices(mut self, row_indices: impl Into>) -> Self { + self.selection = ReadBlobsSelection::RowIndices(row_indices.into()); + self + } - let descriptions = description_and_addr.column(0).as_struct(); - let row_addrs_result = description_and_addr.column(1).as_primitive::(); - let blob_field_id = blob_field_id as u32; + /// Read blobs for the provided physical row addresses. + pub fn with_row_addresses(mut self, row_addrs: impl Into>) -> Self { + self.selection = ReadBlobsSelection::RowAddresses(row_addrs.into()); + self + } - match blob_version_from_descriptions(descriptions)? { - BlobVersion::V1 => { - collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs_result) + /// Set the scheduler I/O buffer size used while materializing blobs. + pub fn with_io_buffer_size_bytes(mut self, bytes: u64) -> Self { + self.options.io_buffer_size_bytes = Some(bytes); + self + } + + /// Whether results must follow the caller's requested row order. + pub fn preserve_order(mut self, preserve: bool) -> Self { + self.options.preserve_order = preserve; + self + } + + /// Execute the planned blob read and return a stream of blob payloads. + /// + /// The stream yields one [`ReadBlob`] per selected non-null blob row. + pub async fn try_into_stream(self) -> Result { + self.validate()?; + let entries = collect_blob_entries_for_selection( + &self.dataset, + self.blob_field_id, + &self.column, + &self.selection, + ) + .await?; + let expected_selection_indices = entries + .iter() + .map(|entry| entry.selection_index) + .collect::>(); + let plans = plan_blob_read_plans(entries); + let execution = Arc::new(ReadBlobsExecution::new(self.options.io_buffer_size_bytes)); + if plans.is_empty() { + return Ok(stream::empty().boxed()); } - BlobVersion::V2 => { - collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs_result).await + + let plan_stream = stream::iter(plans.into_iter().map(move |plan| { + let execution = execution.clone(); + execute_blob_read_plan(plan, execution) + })) + .buffer_unordered(self.dataset.object_store.io_parallelism().max(1)); + + if !self.options.preserve_order { + return Ok(plan_stream + .map_ok(|blobs| { + stream::iter(blobs.into_iter().map(|blob| Ok(into_read_blob(blob)))) + }) + .try_flatten() + .boxed()); } + + let mut plan_stream = plan_stream.boxed(); + let mut expected_selection_indices = expected_selection_indices; + let mut ready = BTreeMap::::new(); + + Ok(stream::poll_fn(move |cx| { + loop { + let Some(next_selection_index) = expected_selection_indices.front().copied() else { + return Poll::Ready(None); + }; + + if let Some(blob) = ready.remove(&next_selection_index) { + expected_selection_indices.pop_front(); + return Poll::Ready(Some(Ok(blob))); + } + + match plan_stream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(blobs))) => { + for blob in blobs { + ready.insert(blob.selection_index, into_read_blob(blob)); + } + } + Poll::Ready(Some(Err(err))) => { + return Poll::Ready(Some(Err(err))); + } + Poll::Ready(None) => { + let err = Error::internal(format!( + "planned blob read stream completed before selection index {} was produced", + next_selection_index + )); + return Poll::Ready(Some(Err(err))); + } + Poll::Pending => return Poll::Pending, + } + } + }) + .boxed()) } -} -fn blob_version_from_descriptions(descriptions: &StructArray) -> Result { - let fields = descriptions.fields(); - if fields.len() == 2 && fields[0].name() == "position" && fields[1].name() == "size" { - return Ok(BlobVersion::V1); + /// Execute the planned blob read and collect the full result in memory. + pub async fn execute(self) -> Result> { + self.try_into_stream().await?.try_collect().await } - if fields.len() == 5 - && fields[0].name() == "kind" - && fields[1].name() == "position" - && fields[2].name() == "size" - && fields[3].name() == "blob_id" - && fields[4].name() == "blob_uri" - { - return Ok(BlobVersion::V2); + + fn validate(&self) -> Result<()> { + match self.selection { + ReadBlobsSelection::None => Err(Error::invalid_input( + "ReadBlobsBuilder requires a row selection; call one of with_row_ids, with_row_indices, or with_row_addresses".to_string(), + )), + _ if self.options.io_buffer_size_bytes == Some(0) => Err(Error::invalid_input( + "ReadBlobsBuilder io_buffer_size must be greater than 0".to_string(), + )), + _ => Ok(()), + } } - Err(Error::invalid_input_source(format!( - "Unrecognized blob descriptions schema: expected v1 (position,size) or v2 (kind,position,size,blob_id,blob_uri) but got {:?}", - fields.iter().map(|f| f.name().as_str()).collect::>(), - ) - .into())) } -fn collect_blob_files_v1( - dataset: &Arc, - blob_field_id: u32, - descriptions: &StructArray, - row_addrs: &arrow::array::PrimitiveArray, -) -> Result> { - let positions = descriptions.column(0).as_primitive::(); - let sizes = descriptions.column(1).as_primitive::(); +/// One logical blob selected for planned reading. +#[derive(Debug)] +struct BlobEntry { + selection_index: usize, + row_address: u64, + file: BlobFile, +} - Ok(row_addrs - .values() +/// Physical read input derived from one [`BlobEntry`]. +#[derive(Debug)] +struct PlannedBlobRead { + selection_index: usize, + row_address: u64, + physical_range: Range, +} + +/// One per-source read plan emitted by `read_blobs`. +#[derive(Debug)] +struct BlobReadPlan { + source_key: BlobSourceKey, + source: Arc, + reads: Vec, +} + +/// Operation-scoped scheduler cache for one [`ReadBlobsBuilder`] execution. +/// +/// We reuse one [`ScanScheduler`] per object store during a single `read_blobs` +/// operation and still submit exactly one request per physical file. +#[derive(Debug)] +struct ReadBlobsExecution { + io_buffer_size_bytes: Option, + schedulers: std::sync::Mutex>>, +} + +impl ReadBlobsExecution { + fn new(io_buffer_size_bytes: Option) -> Self { + Self { + io_buffer_size_bytes, + schedulers: std::sync::Mutex::new(HashMap::new()), + } + } + + fn scheduler_for(&self, source: &BlobSource) -> Arc { + let mut schedulers = self.schedulers.lock().unwrap(); + schedulers + .entry(source.object_store.store_prefix.clone()) + .or_insert_with(|| { + let config = self + .io_buffer_size_bytes + .map(SchedulerConfig::new) + .unwrap_or_else(|| { + SchedulerConfig::max_bandwidth(source.object_store.as_ref()) + }); + ScanScheduler::new(source.object_store.clone(), config) + }) + .clone() + } +} + +/// Materialized blob bytes plus the original selection index used to restore +/// caller ordering after per-source reads complete. +#[derive(Debug)] +struct IndexedReadBlob { + selection_index: usize, + row_address: u64, + data: Bytes, +} + +fn into_read_blob(blob: IndexedReadBlob) -> ReadBlob { + ReadBlob { + row_address: blob.row_address, + data: blob.data, + } +} + +/// Group selected blobs by physical source and sort each group's ranges by +/// physical offset before handing them to the file scheduler. +fn plan_blob_read_plans(entries: Vec) -> Vec { + let mut plan_indices = HashMap::::new(); + let mut plans = Vec::::new(); + + for entry in entries { + let source_key = BlobSourceKey::new(&entry.file.source); + let plan_index = if let Some(plan_index) = plan_indices.get(&source_key) { + *plan_index + } else { + let plan_index = plans.len(); + plans.push(BlobReadPlan { + source_key: source_key.clone(), + source: entry.file.source.clone(), + reads: Vec::new(), + }); + plan_indices.insert(source_key.clone(), plan_index); + plan_index + }; + + plans[plan_index].reads.push(PlannedBlobRead { + selection_index: entry.selection_index, + row_address: entry.row_address, + physical_range: entry.file.position..(entry.file.position + entry.file.size), + }); + } + + plans.sort_by(|left, right| { + left.source_key + .store_prefix + .cmp(&right.source_key.store_prefix) + .then_with(|| left.source_key.path.cmp(&right.source_key.path)) + }); + + for plan in &mut plans { + plan.reads.sort_by(|left, right| { + left.physical_range + .start + .cmp(&right.physical_range.start) + .then_with(|| left.physical_range.end.cmp(&right.physical_range.end)) + .then_with(|| left.selection_index.cmp(&right.selection_index)) + }); + } + + plans +} + +/// Execute one per-source blob read plan with a single scheduler submission. +async fn execute_blob_read_plan( + task: BlobReadPlan, + execution: Arc, +) -> Result> { + let ranges = task + .reads + .iter() + .map(|read| read.physical_range.clone()) + .collect::>(); + let scheduler = execution.scheduler_for(&task.source); + let file_scheduler = scheduler + .open_file(&task.source.path, &task.source.file_size) + .await?; + let priority = ranges[0].start; + let bytes = file_scheduler.submit_request(ranges, priority).await?; + + Ok(task + .reads + .into_iter() + .zip(bytes) + .map(|(read, data)| IndexedReadBlob { + selection_index: read.selection_index, + row_address: read.row_address, + data, + }) + .collect()) +} + +pub(super) async fn take_blobs( + dataset: &Arc, + row_ids: &[u64], + column: &str, +) -> Result> { + let blob_field_id = validate_blob_column(dataset, column)?; + Ok(collect_blob_entries_for_selection( + dataset, + blob_field_id, + column, + &ReadBlobsSelection::RowIds(row_ids.to_vec()), + ) + .await? + .into_iter() + .map(|entry| entry.file) + .collect()) +} + +/// Take [BlobFile] by row addresses. +/// +/// Row addresses are `u64` values encoding `(fragment_id << 32) | row_offset`. +/// Use this method when you already have row addresses, for example from +/// a scan with `with_row_address()`. For row IDs (stable identifiers), use +/// [`Dataset::take_blobs`]. For row indices (offsets), use +/// [`Dataset::take_blobs_by_indices`]. +pub async fn take_blobs_by_addresses( + dataset: &Arc, + row_addrs: &[u64], + column: &str, +) -> Result> { + let blob_field_id = validate_blob_column(dataset, column)?; + Ok(collect_blob_entries_for_selection( + dataset, + blob_field_id, + column, + &ReadBlobsSelection::RowAddresses(row_addrs.to_vec()), + ) + .await? + .into_iter() + .map(|entry| entry.file) + .collect()) +} + +/// Validate that `column` exists and is a blob column, returning its field id. +pub(super) fn validate_blob_column(dataset: &Arc, column: &str) -> Result { + let projection = dataset.schema().project(&[column])?; + let blob_field = &projection.fields[0]; + if !blob_field.is_blob() { + return Err(Error::invalid_input_source( + format!("the column '{}' is not a blob column", column).into(), + )); + } + Ok(blob_field.id as u32) +} + +/// Load blob descriptor rows for a stable-row-id selection. +async fn take_blob_descriptions_by_row_ids( + dataset: &Arc, + row_ids: &[u64], + column: &str, +) -> Result { + let projection = dataset.schema().project(&[column])?; + dataset + .take_builder(row_ids, projection)? + .with_row_address(true) + .execute() + .await +} + +/// Load blob descriptor rows for a physical-row-address selection. +async fn take_blob_descriptions_by_row_addresses( + dataset: &Arc, + row_addrs: &[u64], + column: &str, +) -> Result { + let projection = dataset.schema().project(&[column])?; + let projection_request = ProjectionRequest::from(projection); + let projection_plan = Arc::new(projection_request.into_projection_plan(dataset.clone())?); + TakeBuilder::try_new_from_addresses(dataset.clone(), row_addrs.to_vec(), projection_plan)? + .with_row_address(true) + .execute() + .await +} + +/// Resolve a caller selection into [`BlobEntry`] values that share `BlobSource` +/// instances by physical backing object. +async fn collect_blob_entries_for_selection( + dataset: &Arc, + blob_field_id: u32, + column: &str, + selection: &ReadBlobsSelection, +) -> Result> { + let description_and_addr = match selection { + ReadBlobsSelection::None => { + return Err(Error::invalid_input( + "Blob row selection is required".to_string(), + )); + } + ReadBlobsSelection::RowIds(row_ids) => { + take_blob_descriptions_by_row_ids(dataset, row_ids, column).await? + } + ReadBlobsSelection::RowIndices(row_indices) => { + let row_addrs = + super::take::row_offsets_to_row_addresses(&dataset.get_fragments(), row_indices) + .await?; + take_blob_descriptions_by_row_addresses(dataset, &row_addrs, column).await? + } + ReadBlobsSelection::RowAddresses(row_addrs) => { + take_blob_descriptions_by_row_addresses(dataset, row_addrs, column).await? + } + }; + + if description_and_addr.num_rows() == 0 { + return Ok(Vec::new()); + } + + let descriptions = description_and_addr.column(0).as_struct(); + let row_addrs = description_and_addr.column(1).as_primitive::(); + + match blob_version_from_descriptions(descriptions)? { + BlobVersion::V1 => collect_blob_entries_v1(dataset, blob_field_id, descriptions, row_addrs), + BlobVersion::V2 => { + collect_blob_entries_v2(dataset, blob_field_id, descriptions, row_addrs).await + } + } +} + +fn blob_version_from_descriptions(descriptions: &StructArray) -> Result { + let fields = descriptions.fields(); + if fields.len() == 2 && fields[0].name() == "position" && fields[1].name() == "size" { + return Ok(BlobVersion::V1); + } + if fields.len() == 5 + && fields[0].name() == "kind" + && fields[1].name() == "position" + && fields[2].name() == "size" + && fields[3].name() == "blob_id" + && fields[4].name() == "blob_uri" + { + return Ok(BlobVersion::V2); + } + Err(Error::invalid_input_source(format!( + "Unrecognized blob descriptions schema: expected v1 (position,size) or v2 (kind,position,size,blob_id,blob_uri) but got {:?}", + fields.iter().map(|f| f.name().as_str()).collect::>(), + ) + .into())) +} + +/// Convert blob v1 descriptors into logical blob entries. +fn collect_blob_entries_v1( + dataset: &Arc, + blob_field_id: u32, + descriptions: &StructArray, + row_addrs: &arrow::array::PrimitiveArray, +) -> Result> { + let positions = descriptions.column(0).as_primitive::(); + let sizes = descriptions.column(1).as_primitive::(); + let mut source_cache = HashMap::>::new(); + row_addrs + .values() .iter() .zip(positions.iter()) .zip(sizes.iter()) - .filter_map(|((row_addr, position), size)| { + .enumerate() + .filter_map(|(selection_index, ((row_addr, position), size))| { let position = position?; let size = size?; - Some((*row_addr, position, size)) + Some((selection_index, *row_addr, position, size)) }) - .map(|(row_addr, position, size)| { + .map(|(selection_index, row_addr, position, size)| { let frag_id = RowAddress::from(row_addr).fragment_id(); - let frag = dataset.get_fragment(frag_id as usize).unwrap(); - let data_file = frag.data_file_for_field(blob_field_id).unwrap(); + let frag = dataset.get_fragment(frag_id as usize).ok_or_else(|| { + Error::invalid_input(format!( + "Blob row address {} references missing fragment {}", + row_addr, frag_id + )) + })?; + let data_file = frag.data_file_for_field(blob_field_id).ok_or_else(|| { + Error::invalid_input(format!( + "Blob field {} has no data file in fragment {} for row address {}", + blob_field_id, frag_id, row_addr + )) + })?; let data_file_path = dataset.data_dir().child(data_file.path.as_str()); - BlobFile::new_inline(dataset.object_store.clone(), data_file_path, position, size) + Ok(BlobEntry { + selection_index, + row_address: row_addr, + file: BlobFile::with_source( + shared_blob_source( + &mut source_cache, + dataset.object_store.clone(), + &data_file_path, + ), + position, + size, + BlobKind::Inline, + None, + ), + }) }) - .collect()) + .collect() } -async fn collect_blob_files_v2( +/// Convert blob v2 descriptors into logical blob entries. +async fn collect_blob_entries_v2( dataset: &Arc, blob_field_id: u32, descriptions: &StructArray, row_addrs: &arrow::array::PrimitiveArray, -) -> Result> { +) -> Result> { let kinds = descriptions.column(0).as_primitive::(); let positions = descriptions.column(1).as_primitive::(); let sizes = descriptions.column(2).as_primitive::(); @@ -1124,7 +1857,9 @@ async fn collect_blob_files_v2( let mut fragment_cache = HashMap::::new(); let mut store_cache = HashMap::>::new(); let mut external_base_path_cache = HashMap::::new(); - for (idx, row_addr) in row_addrs.values().iter().enumerate() { + let mut source_cache = HashMap::>::new(); + for (selection_index, row_addr) in row_addrs.values().iter().enumerate() { + let idx = selection_index; let kind = BlobKind::try_from(kinds.value(idx))?; // Struct is non-nullable; null rows are encoded as inline with zero position/size and empty uri @@ -1144,12 +1879,16 @@ async fn collect_blob_files_v2( &mut store_cache, ) .await?; - files.push(BlobFile::new_inline( + let source = shared_blob_source( + &mut source_cache, location.object_store, - location.data_file_path, - position, - size, - )); + &location.data_file_path, + ); + files.push(BlobEntry { + selection_index, + row_address: *row_addr, + file: BlobFile::with_source(source, position, size, BlobKind::Inline, None), + }); } BlobKind::Dedicated => { let blob_id = blob_ids.value(idx); @@ -1163,7 +1902,12 @@ async fn collect_blob_files_v2( ) .await?; let path = blob_path(&location.data_file_dir, &location.data_file_key, blob_id); - files.push(BlobFile::new_dedicated(location.object_store, path, size)); + let source = shared_blob_source(&mut source_cache, location.object_store, &path); + files.push(BlobEntry { + selection_index, + row_address: *row_addr, + file: BlobFile::with_source(source, 0, size, BlobKind::Dedicated, None), + }); } BlobKind::Packed => { let blob_id = blob_ids.value(idx); @@ -1178,12 +1922,12 @@ async fn collect_blob_files_v2( ) .await?; let path = blob_path(&location.data_file_dir, &location.data_file_key, blob_id); - files.push(BlobFile::new_packed( - location.object_store, - path, - position, - size, - )); + let source = shared_blob_source(&mut source_cache, location.object_store, &path); + files.push(BlobEntry { + selection_index, + row_address: *row_addr, + file: BlobFile::with_source(source, position, size, BlobKind::Packed, None), + }); } BlobKind::External => { let uri_or_path = blob_uris.value(idx).to_string(); @@ -1227,13 +1971,18 @@ async fn collect_blob_files_v2( } else { object_store.size(&path).await? }; - files.push(BlobFile::new_external( - object_store, - path, - uri_or_path, - position, - size, - )); + let source = shared_blob_source(&mut source_cache, object_store, &path); + files.push(BlobEntry { + selection_index, + row_address: *row_addr, + file: BlobFile::with_source( + source, + position, + size, + BlobKind::External, + Some(uri_or_path), + ), + }); } } } @@ -1324,7 +2073,9 @@ fn data_file_key_from_path(path: &str) -> &str { #[cfg(test)] mod tests { use std::collections::HashMap; + use std::ops::Range; use std::sync::Arc; + use std::time::Duration; use arrow::{ array::AsArray, @@ -1336,7 +2087,9 @@ mod tests { }; use arrow_schema::{DataType, Field, Schema}; use async_trait::async_trait; - use futures::TryStreamExt; + use bytes::Bytes; + use chrono::Utc; + use futures::{StreamExt, TryStreamExt, future::try_join_all}; use lance_arrow::{ ARROW_EXT_NAME_KEY, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY, BLOB_V2_EXT_NAME, DataTypeExt, }; @@ -1347,9 +2100,10 @@ mod tests { use lance_io::stream::RecordBatchStream; use lance_table::format::BasePath; use object_store::{ - GetOptions, GetRange, GetResult, ListResult, MultipartUpload, ObjectMeta, - PutMultipartOptions, PutOptions, PutPayload, PutResult, path::Path, + Attributes, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload, + ObjectMeta, PutMultipartOptions, PutOptions, PutPayload, PutResult, path::Path, }; + use tokio::sync::Notify; use url::Url; use lance_core::{ @@ -1359,7 +2113,11 @@ mod tests { use lance_datagen::{BatchCount, RowCount, array}; use lance_file::version::LanceFileVersion; - use super::{BlobFile, ExternalBaseCandidate, ExternalBaseResolver, data_file_key_from_path}; + use super::{ + BlobEntry, BlobFile, BlobSource, ExternalBaseCandidate, ExternalBaseResolver, + ReadBlobsExecution, collect_blob_entries_v1, data_file_key_from_path, + execute_blob_read_plan, plan_blob_read_plans, + }; use crate::{ Dataset, blob::{BlobArrayBuilder, blob_field}, @@ -1569,6 +2327,205 @@ mod tests { )) } + #[derive(Debug)] + struct RecordingRangeObjectStore { + data: Bytes, + gate: Option>, + requested_ranges: std::sync::Mutex>>, + } + + impl RecordingRangeObjectStore { + fn new(data: Bytes) -> Self { + Self { + data, + gate: None, + requested_ranges: std::sync::Mutex::new(Vec::new()), + } + } + + fn with_gate(data: Bytes, gate: Arc) -> Self { + Self { + data, + gate: Some(gate), + requested_ranges: std::sync::Mutex::new(Vec::new()), + } + } + + fn requested_ranges(&self) -> Vec> { + self.requested_ranges.lock().unwrap().clone() + } + + fn object_meta(&self, location: &Path) -> ObjectMeta { + ObjectMeta { + location: location.clone(), + last_modified: Utc::now(), + size: self.data.len() as u64, + e_tag: None, + version: None, + } + } + } + + impl std::fmt::Display for RecordingRangeObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "RecordingRangeObjectStore") + } + } + + #[async_trait] + impl object_store::ObjectStore for RecordingRangeObjectStore { + async fn put( + &self, + _location: &Path, + _bytes: PutPayload, + ) -> object_store::Result { + unimplemented!("put is not used by these tests") + } + + async fn put_opts( + &self, + _location: &Path, + _bytes: PutPayload, + _opts: PutOptions, + ) -> object_store::Result { + unimplemented!("put_opts is not used by these tests") + } + + async fn put_multipart( + &self, + _location: &Path, + ) -> object_store::Result> { + unimplemented!("put_multipart is not used by these tests") + } + + async fn put_multipart_opts( + &self, + _location: &Path, + _opts: PutMultipartOptions, + ) -> object_store::Result> { + unimplemented!("put_multipart_opts is not used by these tests") + } + + async fn get(&self, location: &Path) -> object_store::Result { + self.get_opts(location, GetOptions::default()).await + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { + let range = match options.range { + Some(GetRange::Bounded(range)) => range, + None => 0..self.data.len() as u64, + Some(other) => { + return Err(object_store::Error::NotSupported { + source: format!("unsupported range request {other:?}").into(), + }); + } + }; + if let Some(gate) = &self.gate { + gate.notified().await; + } + self.requested_ranges.lock().unwrap().push(range.clone()); + let bytes = self.data.slice(range.start as usize..range.end as usize); + Ok(GetResult { + payload: GetResultPayload::Stream( + futures::stream::once(async move { Ok(bytes) }).boxed(), + ), + meta: self.object_meta(location), + range, + attributes: Attributes::default(), + }) + } + + async fn head(&self, location: &Path) -> object_store::Result { + Ok(self.object_meta(location)) + } + + async fn delete(&self, _location: &Path) -> object_store::Result<()> { + unimplemented!("delete is not used by these tests") + } + + fn list( + &self, + _prefix: Option<&Path>, + ) -> futures::stream::BoxStream<'static, object_store::Result> { + unimplemented!("list is not used by these tests") + } + + async fn list_with_delimiter( + &self, + _prefix: Option<&Path>, + ) -> object_store::Result { + unimplemented!("list_with_delimiter is not used by these tests") + } + + async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> { + unimplemented!("copy is not used by these tests") + } + + async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> object_store::Result<()> { + unimplemented!("copy_if_not_exists is not used by these tests") + } + } + + fn recording_range_store_with_url( + data: Bytes, + url: &str, + ) -> (Arc, Arc) { + const TEST_RANGE_STORE_SIZE: usize = 128 * 1024; + let mut padded = vec![0; TEST_RANGE_STORE_SIZE.max(data.len())]; + padded[..data.len()].copy_from_slice(data.as_ref()); + let inner = Arc::new(RecordingRangeObjectStore::new(Bytes::from(padded))); + let store = Arc::new(ObjectStore::new( + inner.clone() as Arc, + Url::parse(url).unwrap(), + None, + None, + false, + true, + lance_io::object_store::DEFAULT_LOCAL_IO_PARALLELISM, + lance_io::object_store::DEFAULT_DOWNLOAD_RETRY_COUNT, + None, + )); + (store, inner) + } + + fn recording_range_store(data: Bytes) -> (Arc, Arc) { + recording_range_store_with_url(data, "mock://recording/blob-range-tests") + } + + fn gated_range_store( + data: Bytes, + url: &str, + ) -> ( + Arc, + Arc, + Arc, + ) { + const TEST_RANGE_STORE_SIZE: usize = 128 * 1024; + let mut padded = vec![0; TEST_RANGE_STORE_SIZE.max(data.len())]; + padded[..data.len()].copy_from_slice(data.as_ref()); + let gate = Arc::new(Notify::new()); + let inner = Arc::new(RecordingRangeObjectStore::with_gate( + Bytes::from(padded), + gate.clone(), + )); + let store = Arc::new(ObjectStore::new( + inner.clone() as Arc, + Url::parse(url).unwrap(), + None, + None, + false, + true, + lance_io::object_store::DEFAULT_LOCAL_IO_PARALLELISM, + lance_io::object_store::DEFAULT_DOWNLOAD_RETRY_COUNT, + None, + )); + (store, inner, gate) + } + impl BlobTestFixture { async fn new() -> Self { let test_dir = TempStrDir::default(); @@ -1698,6 +2655,46 @@ mod tests { } } + #[tokio::test] + async fn test_read_blobs_requires_selection() { + let fixture = BlobTestFixture::new().await; + + let err = fixture.dataset.read_blobs("blobs").unwrap().execute().await; + + assert!(matches!(err, Err(Error::InvalidInput { .. }))); + assert!( + err.unwrap_err() + .to_string() + .contains("requires a row selection") + ); + } + + #[tokio::test] + async fn test_read_blobs_by_indices_execute() { + let fixture = BlobTestFixture::new().await; + let indices = vec![2, 12, 22]; + + let blobs = fixture + .dataset + .read_blobs("blobs") + .unwrap() + .with_row_indices(indices) + .execute() + .await + .unwrap(); + + assert_eq!(blobs.len(), 3); + for (actual_idx, (expected_batch_idx, expected_row_idx)) in + [(0, 2), (1, 2), (2, 2)].iter().enumerate() + { + let expected = fixture.data[*expected_batch_idx] + .column(1) + .as_binary::() + .value(*expected_row_idx); + assert_eq!(blobs[actual_idx].data.as_ref(), expected); + } + } + #[tokio::test] pub async fn test_take_blobs_by_indices() { let fixture = BlobTestFixture::new().await; @@ -1744,6 +2741,30 @@ mod tests { assert!(matches!(err, Err(Error::InvalidInput { .. }))); } + #[tokio::test] + async fn test_collect_blob_entries_v1_rejects_missing_fragment() { + let fixture = BlobTestFixture::new().await; + let blob_field_id = + fixture.dataset.schema().project(&["blobs"]).unwrap().fields[0].id as u32; + let descriptions = StructArray::from(vec![ + ( + Arc::new(Field::new("position", DataType::UInt64, false)), + Arc::new(UInt64Array::from(vec![1])) as ArrayRef, + ), + ( + Arc::new(Field::new("size", DataType::UInt64, false)), + Arc::new(UInt64Array::from(vec![3])) as ArrayRef, + ), + ]); + let row_addrs = UInt64Array::from(vec![(999_u64 << 32) | 7]); + + let err = + collect_blob_entries_v1(&fixture.dataset, blob_field_id, &descriptions, &row_addrs) + .unwrap_err(); + + assert!(err.to_string().contains("references missing fragment")); + } + #[tokio::test] pub async fn test_take_blob_not_blob_col() { let fixture = BlobTestFixture::new().await; @@ -1959,6 +2980,158 @@ mod tests { assert_eq!(blob.tell().await.unwrap(), 2); } + #[tokio::test] + async fn test_blob_file_read_range_does_not_change_cursor() { + let (store, _) = recording_range_store(Bytes::from_static(b"abcdefgh")); + let path = Path::from("blobs/test.bin"); + let blob = BlobFile::new_packed(store, path, 1, 6); + + let bytes = blob.read_range(2..5).await.unwrap(); + assert_eq!(bytes.as_ref(), b"def"); + assert_eq!(blob.tell().await.unwrap(), 0); + } + + #[tokio::test] + async fn test_blob_file_read_ranges_preserves_input_order() { + let (store, inner) = recording_range_store(Bytes::from_static(b"abcdefghij")); + let path = Path::from("blobs/test.bin"); + let blob = BlobFile::new_packed(store, path, 1, 6); + + let chunks = blob.read_ranges(&[4..6, 0..2, 2..4, 2..2]).await.unwrap(); + assert_eq!(chunks[0].as_ref(), b"fg"); + assert_eq!(chunks[1].as_ref(), b"bc"); + assert_eq!(chunks[2].as_ref(), b"de"); + assert!(chunks[3].is_empty()); + assert_eq!(inner.requested_ranges(), vec![1..7]); + } + + #[tokio::test] + async fn test_blob_file_read_range_rejects_out_of_bounds() { + let (store, _) = recording_range_store(Bytes::from_static(b"abcdef")); + let path = Path::from("blobs/test.bin"); + let blob = BlobFile::new_packed(store, path, 0, 4); + + let err = blob.read_range(1..5).await.unwrap_err(); + assert!(err.to_string().contains("exceeds blob size")); + } + + #[tokio::test] + async fn test_blob_files_share_source_and_coalesce() { + let (store, inner) = recording_range_store(Bytes::from_static(b"abcdefghij")); + let source = Arc::new(BlobSource::new(store, Path::from("blobs/test.bin"))); + let blob1 = BlobFile::with_source(source.clone(), 1, 3, BlobKind::Packed, None); + let blob2 = BlobFile::with_source(source, 4, 3, BlobKind::Packed, None); + + let (data1, data2) = tokio::join!(blob1.read(), blob2.read()); + assert_eq!(data1.unwrap().as_ref(), b"bcd"); + assert_eq!(data2.unwrap().as_ref(), b"efg"); + assert_eq!(inner.requested_ranges(), vec![1..7]); + } + + #[tokio::test] + async fn test_read_blobs_plan_preserves_order_and_coalesces() { + let (store, inner) = recording_range_store(Bytes::from_static(b"abcdefghij")); + let source = Arc::new(BlobSource::new(store, Path::from("blobs/test.bin"))); + let entries = vec![ + BlobEntry { + selection_index: 0, + row_address: 10, + file: BlobFile::with_source(source.clone(), 4, 3, BlobKind::Packed, None), + }, + BlobEntry { + selection_index: 1, + row_address: 11, + file: BlobFile::with_source(source, 1, 3, BlobKind::Packed, None), + }, + ]; + let execution = Arc::new(ReadBlobsExecution::new(None)); + let blobs = try_join_all( + plan_blob_read_plans(entries) + .into_iter() + .map(|plan| execute_blob_read_plan(plan, execution.clone())), + ) + .await + .unwrap(); + let mut blobs = blobs.into_iter().flatten().collect::>(); + blobs.sort_by_key(|blob| blob.selection_index); + + assert_eq!(blobs.len(), 2); + assert_eq!(blobs[0].row_address, 10); + assert_eq!(blobs[0].data.as_ref(), b"efg"); + assert_eq!(blobs[1].row_address, 11); + assert_eq!(blobs[1].data.as_ref(), b"bcd"); + assert_eq!(inner.requested_ranges(), vec![1..7]); + } + + #[tokio::test] + async fn test_read_blobs_stream_emits_ready_plan_without_waiting_for_slower_ones() { + let (slow_store, _, slow_gate) = gated_range_store( + Bytes::from_static(b"abcdef"), + "mock://slow/blob-range-tests", + ); + let (fast_store, _) = recording_range_store_with_url( + Bytes::from_static(b"uvwxyz"), + "mock://fast/blob-range-tests", + ); + let entries = vec![ + BlobEntry { + selection_index: 0, + row_address: 10, + file: BlobFile::with_source( + Arc::new(BlobSource::new(slow_store, Path::from("blobs/slow.bin"))), + 0, + 3, + BlobKind::Packed, + None, + ), + }, + BlobEntry { + selection_index: 1, + row_address: 11, + file: BlobFile::with_source( + Arc::new(BlobSource::new(fast_store, Path::from("blobs/fast.bin"))), + 0, + 3, + BlobKind::Packed, + None, + ), + }, + ]; + let execution = Arc::new(ReadBlobsExecution::new(None)); + let mut stream: super::ReadBlobsStream = futures::stream::iter( + plan_blob_read_plans(entries) + .into_iter() + .map(move |plan| execute_blob_read_plan(plan, execution.clone())), + ) + .buffer_unordered(2) + .map_ok(|blobs: Vec| { + futures::stream::iter( + blobs + .into_iter() + .map(|blob| Ok::(super::into_read_blob(blob))), + ) + }) + .try_flatten() + .boxed(); + + let first = tokio::time::timeout(Duration::from_secs(1), stream.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + assert_eq!(first.row_address, 11); + assert_eq!(first.data.as_ref(), b"uvw"); + + slow_gate.notify_one(); + let second = tokio::time::timeout(Duration::from_secs(1), stream.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + assert_eq!(second.row_address, 10); + assert_eq!(second.data.as_ref(), b"abc"); + } + #[tokio::test] async fn test_take_blob_v2_from_non_default_base_inline() { let fixture = create_multi_base_blob_v2_fixture(b"inline".to_vec(), None, true).await; diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index 83ee7f7216a..f304f732978 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -373,14 +373,18 @@ async fn do_take_rows( AddRowOffsetExec::compute_row_offset_array(&row_addr_col, builder.dataset).await?; let row_offset_field = ArrowField::new(ROW_OFFSET, arrow::datatypes::DataType::UInt64, false); - batch = batch.try_with_column(row_offset_field, row_offset_col)?; + if batch.schema().column_with_name(ROW_OFFSET).is_none() { + batch = batch.try_with_column(row_offset_field, row_offset_col)?; + } } if builder.with_row_address { // inject `ROW_ADDR` column let row_addr_field = ArrowField::new(ROW_ADDR, arrow::datatypes::DataType::UInt64, false); - batch = batch.try_with_column(row_addr_field, row_addr_col)?; + if batch.schema().column_with_name(ROW_ADDR).is_none() { + batch = batch.try_with_column(row_addr_field, row_addr_col)?; + } } }