Skip to content

Commit e7369fb

Browse files
westonpaceclaude
andauthored
feat: wire batch_size_bytes to Python and public Rust API (#6428)
## Summary Stacked on #6388. Please merge that PR first. - Adds `batch_size_bytes: Option<u64>` to `FileReaderOptions` and propagates it through all 6 `SchedulerDecoderConfig` creation sites in the file reader - Adds `batch_size_bytes` field + setter to `Scanner`, wired through both `scan_fragments` (via `LanceScanConfig`) and `pushdown_scan` (via `FileReaderOptions` in `ScanConfig`) - Adds `batch_size_bytes` to `LanceScanConfig`, with `try_new_v2` injecting it into `FragReadConfig` via `FileReaderOptions` - Exposes `batch_size_bytes` in the Python API: `LanceDataset.scanner()`, `to_table()`, `to_batches()`, `ScannerBuilder` ## Test plan - [x] `cargo check -p lance-file -p lance --tests` — clean - [x] `cargo clippy -p lance-file -p lance --tests -- -D warnings` — clean - [x] `cargo fmt --all` — applied - [x] `cargo test -p lance-encoding -- byte_sized` — 3/3 pass - [x] `cargo test -p lance -- test_scan` — 38/38 pass 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 103e947 commit e7369fb

7 files changed

Lines changed: 152 additions & 25 deletions

File tree

python/python/lance/dataset.py

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -834,6 +834,7 @@ def scanner(
834834
offset: Optional[int] = None,
835835
nearest: Optional[dict] = None,
836836
batch_size: Optional[int] = None,
837+
batch_size_bytes: Optional[int] = None,
837838
batch_readahead: Optional[int] = None,
838839
fragment_readahead: Optional[int] = None,
839840
scan_in_order: Optional[bool] = None,
@@ -929,9 +930,16 @@ def scanner(
929930
}
930931
931932
batch_size: int, default None
932-
The target size of batches returned. In some cases batches can be up to
933-
twice this size (but never larger than this). In some cases batches can
934-
be smaller than this size.
933+
The maximum number of rows per batch. In some cases batches can be
934+
smaller than this size. Note: this can be overridden by
935+
``batch_size_bytes`` or by a dataset-level ``batch_size_bytes``
936+
configured via ``FileReaderOptions``.
937+
batch_size_bytes: int, default None
938+
If set, the scanner will produce batches whose total size in bytes
939+
is approximately this value, overriding the row-based ``batch_size``.
940+
This can also be configured at the dataset level via
941+
``FileReaderOptions``. A scanner-level setting takes precedence
942+
over the dataset-level default.
935943
io_buffer_size: int, default None
936944
The size of the IO buffer. See ``ScannerBuilder.io_buffer_size``
937945
for more information.
@@ -1067,6 +1075,7 @@ def setopt(opt, val):
10671075
setopt(builder.limit, limit)
10681076
setopt(builder.offset, offset)
10691077
setopt(builder.batch_size, batch_size)
1078+
setopt(builder.batch_size_bytes, batch_size_bytes)
10701079
setopt(builder.io_buffer_size, io_buffer_size)
10711080
setopt(builder.batch_readahead, batch_readahead)
10721081
setopt(builder.fragment_readahead, fragment_readahead)
@@ -1150,6 +1159,7 @@ def to_table(
11501159
offset: Optional[int] = None,
11511160
nearest: Optional[dict] = None,
11521161
batch_size: Optional[int] = None,
1162+
batch_size_bytes: Optional[int] = None,
11531163
batch_readahead: Optional[int] = None,
11541164
fragment_readahead: Optional[int] = None,
11551165
scan_in_order: Optional[bool] = None,
@@ -1277,6 +1287,7 @@ def to_table(
12771287
offset=offset,
12781288
nearest=nearest,
12791289
batch_size=batch_size,
1290+
batch_size_bytes=batch_size_bytes,
12801291
io_buffer_size=io_buffer_size,
12811292
batch_readahead=batch_readahead,
12821293
fragment_readahead=fragment_readahead,
@@ -1720,6 +1731,7 @@ def to_batches(
17201731
offset: Optional[int] = None,
17211732
nearest: Optional[dict] = None,
17221733
batch_size: Optional[int] = None,
1734+
batch_size_bytes: Optional[int] = None,
17231735
batch_readahead: Optional[int] = None,
17241736
fragment_readahead: Optional[int] = None,
17251737
scan_in_order: Optional[bool] = None,
@@ -1756,6 +1768,7 @@ def to_batches(
17561768
offset=offset,
17571769
nearest=nearest,
17581770
batch_size=batch_size,
1771+
batch_size_bytes=batch_size_bytes,
17591772
io_buffer_size=io_buffer_size,
17601773
batch_readahead=batch_readahead,
17611774
fragment_readahead=fragment_readahead,
@@ -5189,6 +5202,7 @@ def __init__(self, ds: LanceDataset):
51895202
self._columns_with_transform = None
51905203
self._nearest = None
51915204
self._batch_size: Optional[int] = None
5205+
self._batch_size_bytes: Optional[int] = None
51925206
self._io_buffer_size: Optional[int] = None
51935207
self._batch_readahead: Optional[int] = None
51945208
self._fragment_readahead: Optional[int] = None
@@ -5219,10 +5233,28 @@ def apply_defaults(self, default_opts: Dict[str, Any]) -> ScannerBuilder:
52195233
return self
52205234

52215235
def batch_size(self, batch_size: int) -> ScannerBuilder:
5222-
"""Set batch size for Scanner"""
5236+
"""Set the maximum number of rows per batch.
5237+
5238+
Note: this can be overridden by ``batch_size_bytes`` or by a
5239+
dataset-level ``batch_size_bytes`` configured via
5240+
``FileReaderOptions``.
5241+
"""
52235242
self._batch_size = batch_size
52245243
return self
52255244

5245+
def batch_size_bytes(self, batch_size_bytes: int) -> ScannerBuilder:
5246+
"""Set the target batch size in bytes.
5247+
5248+
When set, the scanner will produce batches whose total size in bytes
5249+
is approximately this value, overriding the row-based ``batch_size``.
5250+
5251+
This can also be configured at the dataset level via
5252+
``FileReaderOptions``. A scanner-level setting takes precedence
5253+
over the dataset-level default.
5254+
"""
5255+
self._batch_size_bytes = batch_size_bytes
5256+
return self
5257+
52265258
def io_buffer_size(self, io_buffer_size: int) -> ScannerBuilder:
52275259
"""
52285260
Set the I/O buffer size for the Scanner
@@ -5607,6 +5639,7 @@ def to_scanner(self) -> LanceScanner:
56075639
self._offset,
56085640
self._nearest,
56095641
self._batch_size,
5642+
self._batch_size_bytes,
56105643
self._io_buffer_size,
56115644
self._batch_readahead,
56125645
self._fragment_readahead,

python/src/dataset.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -800,7 +800,7 @@ impl Dataset {
800800
}
801801

802802
#[allow(clippy::too_many_arguments)]
803-
#[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, search_filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, blob_handling=None, use_scalar_index=None, include_deleted_rows=None, scan_stats_callback=None, strict_batch_size=None, order_by=None, disable_scoring_autoprojection=None, substrait_aggregate=None))]
803+
#[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, search_filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, batch_size_bytes=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, blob_handling=None, use_scalar_index=None, include_deleted_rows=None, scan_stats_callback=None, strict_batch_size=None, order_by=None, disable_scoring_autoprojection=None, substrait_aggregate=None))]
804804
fn scanner(
805805
self_: PyRef<'_, Self>,
806806
columns: Option<Vec<String>>,
@@ -812,6 +812,7 @@ impl Dataset {
812812
offset: Option<i64>,
813813
nearest: Option<&Bound<PyDict>>,
814814
batch_size: Option<usize>,
815+
batch_size_bytes: Option<u64>,
815816
io_buffer_size: Option<u64>,
816817
batch_readahead: Option<usize>,
817818
fragment_readahead: Option<usize>,
@@ -956,6 +957,9 @@ impl Dataset {
956957
if let Some(batch_size) = batch_size {
957958
scanner.batch_size(batch_size);
958959
}
960+
if let Some(batch_size_bytes) = batch_size_bytes {
961+
scanner.batch_size_bytes(batch_size_bytes);
962+
}
959963
if let Some(io_buffer_size) = io_buffer_size {
960964
scanner.io_buffer_size(io_buffer_size);
961965
}

rust/lance-file/src/reader.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -342,13 +342,21 @@ pub struct FileReaderOptions {
342342
/// will be read in multiple chunks to control memory usage.
343343
/// Default: 8MB (DEFAULT_READ_CHUNK_SIZE)
344344
pub read_chunk_size: u64,
345+
/// If set, the reader will produce batches whose total size in bytes
346+
/// is approximately this value, overriding the row-based `batch_size`.
347+
///
348+
/// This can be set at the dataset level (via `ReadParams::file_reader_options`)
349+
/// to provide a default for all scans, or at the scanner level (via
350+
/// `Scanner::batch_size_bytes`) to override per scan.
351+
pub batch_size_bytes: Option<u64>,
345352
}
346353

347354
impl Default for FileReaderOptions {
348355
fn default() -> Self {
349356
Self {
350357
decoder_config: DecoderConfig::default(),
351358
read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
359+
batch_size_bytes: None,
352360
}
353361
}
354362
}
@@ -871,6 +879,7 @@ impl FileReader {
871879
projection: ReaderProjection,
872880
filter: FilterExpression,
873881
decoder_config: DecoderConfig,
882+
batch_size_bytes: Option<u64>,
874883
) -> Result<BoxStream<'static, ReadBatchTask>> {
875884
debug!(
876885
"Reading range {:?} with batch_size {} from file with {} rows and {} columns into schema with {} columns",
@@ -887,7 +896,7 @@ impl FileReader {
887896
decoder_plugins,
888897
io,
889898
decoder_config,
890-
batch_size_bytes: None,
899+
batch_size_bytes,
891900
};
892901

893902
let requested_rows = RequestedRows::Ranges(vec![range]);
@@ -921,6 +930,7 @@ impl FileReader {
921930
projection,
922931
filter,
923932
self.options.decoder_config.clone(),
933+
self.options.batch_size_bytes,
924934
)
925935
}
926936

@@ -935,6 +945,7 @@ impl FileReader {
935945
projection: ReaderProjection,
936946
filter: FilterExpression,
937947
decoder_config: DecoderConfig,
948+
batch_size_bytes: Option<u64>,
938949
) -> Result<BoxStream<'static, ReadBatchTask>> {
939950
debug!(
940951
"Taking {} rows spread across range {}..{} with batch_size {} from columns {:?}",
@@ -951,7 +962,7 @@ impl FileReader {
951962
decoder_plugins,
952963
io,
953964
decoder_config,
954-
batch_size_bytes: None,
965+
batch_size_bytes,
955966
};
956967

957968
let requested_rows = RequestedRows::Indices(indices);
@@ -983,6 +994,7 @@ impl FileReader {
983994
projection,
984995
FilterExpression::no_filter(),
985996
self.options.decoder_config.clone(),
997+
self.options.batch_size_bytes,
986998
)
987999
}
9881000

@@ -997,6 +1009,7 @@ impl FileReader {
9971009
projection: ReaderProjection,
9981010
filter: FilterExpression,
9991011
decoder_config: DecoderConfig,
1012+
batch_size_bytes: Option<u64>,
10001013
) -> Result<BoxStream<'static, ReadBatchTask>> {
10011014
let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
10021015
debug!(
@@ -1015,7 +1028,7 @@ impl FileReader {
10151028
decoder_plugins,
10161029
io,
10171030
decoder_config,
1018-
batch_size_bytes: None,
1031+
batch_size_bytes,
10191032
};
10201033

10211034
let requested_rows = RequestedRows::Ranges(ranges);
@@ -1047,6 +1060,7 @@ impl FileReader {
10471060
projection,
10481061
filter,
10491062
self.options.decoder_config.clone(),
1063+
self.options.batch_size_bytes,
10501064
)
10511065
}
10521066

@@ -1194,7 +1208,7 @@ impl FileReader {
11941208
decoder_plugins: self.decoder_plugins.clone(),
11951209
io: self.scheduler.clone(),
11961210
decoder_config: self.options.decoder_config.clone(),
1197-
batch_size_bytes: None,
1211+
batch_size_bytes: self.options.batch_size_bytes,
11981212
};
11991213

12001214
let requested_rows = RequestedRows::Indices(indices);
@@ -1234,7 +1248,7 @@ impl FileReader {
12341248
decoder_plugins: self.decoder_plugins.clone(),
12351249
io: self.scheduler.clone(),
12361250
decoder_config: self.options.decoder_config.clone(),
1237-
batch_size_bytes: None,
1251+
batch_size_bytes: self.options.batch_size_bytes,
12381252
};
12391253

12401254
let requested_rows = RequestedRows::Ranges(ranges);
@@ -1274,7 +1288,7 @@ impl FileReader {
12741288
decoder_plugins: self.decoder_plugins.clone(),
12751289
io: self.scheduler.clone(),
12761290
decoder_config: self.options.decoder_config.clone(),
1277-
batch_size_bytes: None,
1291+
batch_size_bytes: self.options.batch_size_bytes,
12781292
};
12791293

12801294
let requested_rows = RequestedRows::Ranges(vec![range]);

rust/lance/src/dataset.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,9 @@ pub struct ReadParams {
257257
/// File reader options to use when reading data files.
258258
///
259259
/// This allows control over features like caching repetition indices and validation.
260+
/// Options set here act as dataset-level defaults and can be overridden on a
261+
/// per-scan basis via [`Scanner::batch_size_bytes`](crate::dataset::scanner::Scanner::batch_size_bytes) or
262+
/// [`Scanner::with_file_reader_options`](crate::dataset::scanner::Scanner::with_file_reader_options).
260263
pub file_reader_options: Option<FileReaderOptions>,
261264
}
262265

rust/lance/src/dataset/scanner.rs

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,10 @@ pub struct Scanner {
719719
/// The batch size controls the maximum size of rows to return for each read.
720720
batch_size: Option<usize>,
721721

722+
/// If set, the scanner will produce batches whose total size in bytes
723+
/// is approximately this value, overriding the row-based `batch_size`.
724+
batch_size_bytes: Option<u64>,
725+
722726
/// Number of batches to prefetch
723727
batch_readahead: usize,
724728

@@ -989,6 +993,7 @@ impl Scanner {
989993
filter: LanceFilter::default(),
990994
full_text_query: None,
991995
batch_size: None,
996+
batch_size_bytes: None,
992997
batch_readahead: get_num_compute_intensive_cpus(),
993998
fragment_readahead: None,
994999
io_buffer_size: None,
@@ -1261,12 +1266,29 @@ impl Scanner {
12611266
Ok(self)
12621267
}
12631268

1264-
/// Set the batch size.
1269+
/// Set the maximum number of rows per batch.
1270+
///
1271+
/// Note: this can be overridden by [`Self::batch_size_bytes`] or by a dataset-level
1272+
/// `batch_size_bytes` set via [`ReadParams::file_reader_options`](crate::dataset::ReadParams::file_reader_options). When a byte-based
1273+
/// batch size is active, the row-based batch size is used only as an initial estimate.
12651274
pub fn batch_size(&mut self, batch_size: usize) -> &mut Self {
12661275
self.batch_size = Some(batch_size);
12671276
self
12681277
}
12691278

1279+
/// Set the target batch size in bytes.
1280+
///
1281+
/// When set, the scanner will produce batches whose total size in bytes
1282+
/// is approximately this value, overriding the row-based `batch_size`.
1283+
///
1284+
/// This can also be configured at the dataset level via
1285+
/// [`ReadParams::file_reader_options`](crate::dataset::ReadParams::file_reader_options). A scanner-level setting takes
1286+
/// precedence over the dataset-level default.
1287+
pub fn batch_size_bytes(&mut self, batch_size_bytes: u64) -> &mut Self {
1288+
self.batch_size_bytes = Some(batch_size_bytes);
1289+
self
1290+
}
1291+
12701292
/// Include deleted rows
12711293
///
12721294
/// These are rows that have been deleted from the dataset but are still present in the
@@ -1688,6 +1710,30 @@ impl Scanner {
16881710
self
16891711
}
16901712

1713+
/// Compute the resolved file reader options, merging the scanner's explicit
1714+
/// `file_reader_options`, the dataset-level defaults, and the `batch_size_bytes`
1715+
/// setting.
1716+
fn resolved_file_reader_options(&self) -> Option<FileReaderOptions> {
1717+
let base = self
1718+
.file_reader_options
1719+
.clone()
1720+
.or_else(|| self.dataset.file_reader_options.clone());
1721+
match (base, self.batch_size_bytes) {
1722+
(Some(mut opts), Some(bsb)) => {
1723+
if opts.batch_size_bytes.is_none() {
1724+
opts.batch_size_bytes = Some(bsb);
1725+
}
1726+
Some(opts)
1727+
}
1728+
(Some(opts), None) => Some(opts),
1729+
(None, Some(bsb)) => Some(FileReaderOptions {
1730+
batch_size_bytes: Some(bsb),
1731+
..Default::default()
1732+
}),
1733+
(None, None) => None,
1734+
}
1735+
}
1736+
16911737
/// Create a physical expression for a column that may be nested
16921738
fn create_column_expr(
16931739
column_name: &str,
@@ -2658,6 +2704,10 @@ impl Scanner {
26582704
read_options = read_options.with_batch_size(batch_size as u32);
26592705
}
26602706

2707+
if let Some(file_reader_options) = self.resolved_file_reader_options() {
2708+
read_options = read_options.with_file_reader_options(file_reader_options);
2709+
}
2710+
26612711
if let Some(fragment_readahead) = self.fragment_readahead {
26622712
read_options = read_options.with_fragment_readahead(fragment_readahead);
26632713
}
@@ -4003,6 +4053,7 @@ impl Scanner {
40034053
with_row_created_at_version,
40044054
with_make_deletions_null,
40054055
ordered_output: ordered,
4056+
file_reader_options: self.resolved_file_reader_options(),
40064057
};
40074058
Arc::new(LanceScanExec::new(
40084059
self.dataset.clone(),
@@ -4029,10 +4080,7 @@ impl Scanner {
40294080
with_row_address: self.projection_plan.physical_projection.with_row_addr,
40304081
make_deletions_null,
40314082
ordered_output: self.ordered,
4032-
file_reader_options: self
4033-
.file_reader_options
4034-
.clone()
4035-
.or_else(|| self.dataset.file_reader_options.clone()),
4083+
file_reader_options: self.resolved_file_reader_options(),
40364084
};
40374085

40384086
let fragments = if let Some(fragment) = self.fragments.as_ref() {

0 commit comments

Comments
 (0)