Skip to content

Commit e24fd82

Browse files
committed
fix: list columns fail in blocking decode path
Summary Reading List, LargeList, Binary, or LargeBinary columns via read_stream_projected_blocking (used by the Java JNI LanceFileReader) fails with: Caused by: java.io.IOException: LanceError(Internal): drain was called on primitive field decoder for data type Int32 on column 2 but the decoder was never awaited, /app/rust/lance-encoding/src/previous/encodings/logical/primitive.rs:348:27 Full Rust-side error: thread 'tokio-runtime-worker' panicked at rust/lance-encoding/src/previous/encodings/logical/primitive.rs:348:27: Internal error: drain was called on primitive field decoder for data type Int32 on column 4 but the decoder was never awaited Stack: lance_encoding::previous::encodings::logical::primitive::PrimitiveFieldDecoder::drain lance_encoding::previous::encodings::logical::list::ListPageDecoder::drain lance_encoding::previous::encodings::logical::struct::SimpleStructDecoder::drain lance_encoding::decoder::BatchDecodeIterator::next_batch_task lance_file::reader::FileReader::read_stream_projected_blocking lance_jni::file_reader::BlockingFileReader::open_stream Java_org_lance_file_LanceFileReader_readAllNative Schema: col_a: int64 col_b: list<int32> This is caused by two independent bugs: Bug 1: tokio::spawn in list scheduler is incompatible with the blocking decode path. ListFieldSchedulingJob::schedule_next uses tokio::spawn to run indirect_schedule_task concurrently. This works in the async path where an active tokio runtime drives the spawned task. However, in the blocking path (schedule_and_decode_blocking), the scheduling runs outside any tokio runtime context, so the spawned task is never executed, and the JoinHandle cannot be awaited from a different runtime. Bug 2: BatchDecodeIterator skips wait when all rows are already scheduled. For list columns, all rows are marked as "scheduled" after the first schedule_ranges call (because the ListPageDecoder message is sent synchronously), even though the item data has not been loaded yet. Starting from the second batch, scheduled_need == 0, so wait_for_io() is skipped entirely. The subsequent drain then encounters PrimitiveFieldDecoder instances whose physical decoders were never awaited. Changes - list.rs: Replace tokio::spawn(indirect_schedule_task(...)) with an inline BoxFuture via .boxed(). This removes the dependency on a tokio runtime during scheduling and makes the indirect scheduling work in both async and blocking contexts. The ListPageDecoder.unloaded field type changes from JoinHandle to BoxFuture, and the JoinError handling in wait_for_loaded is simplified accordingly. - decoder.rs: Add an else branch in BatchDecodeIterator::next_batch_task to call self.root_decoder.wait(loaded_need, ...) even when scheduled_need == 0. This ensures that list item data is fully loaded before draining, regardless of the scheduling status. - reader.rs: Add test_project_list_int32 regression test covering projected reads of list<int32> columns on both V2_0 and V2_1 file formats.
1 parent d630106 commit e24fd82

File tree

3 files changed

+132
-14
lines changed

3 files changed

+132
-14
lines changed

rust/lance-encoding/src/decoder.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1654,6 +1654,13 @@ impl<T: RootDecoderType> BatchDecodeIterator<T> {
16541654
let under_scheduled = desired_scheduled - actually_scheduled;
16551655
to_take -= under_scheduled;
16561656
}
1657+
} else {
1658+
// All rows are scheduled but we still need to wait for data to be loaded.
1659+
// This is important for types with indirect I/O (e.g. List, Binary) where
1660+
// scheduling completes but item data may not yet be loaded.
1661+
let loaded_need = self.rows_drained + to_take.min(self.rows_per_batch as u64) - 1;
1662+
self.root_decoder
1663+
.wait(loaded_need, &self.wait_for_io_runtime)?;
16571664
}
16581665

16591666
if to_take == 0 {

rust/lance-encoding/src/previous/encodings/logical/list.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use arrow_schema::{DataType, Field, Fields};
1414
use futures::{FutureExt, future::BoxFuture};
1515
use lance_core::{Error, Result, cache::LanceCache};
1616
use log::trace;
17-
use tokio::task::JoinHandle;
1817

1918
use crate::{
2019
EncodingsIo,
@@ -457,8 +456,9 @@ impl SchedulingJob for ListFieldSchedulingJob<'_> {
457456
let io = context.io().clone();
458457
let cache = context.cache().clone();
459458

460-
// Immediately spawn the indirect scheduling
461-
let indirect_fut = tokio::spawn(indirect_schedule_task(
459+
// Immediately schedule the indirect I/O (inline future, no tokio::spawn).
460+
// This ensures compatibility with both async and blocking decode paths.
461+
let indirect_fut = indirect_schedule_task(
462462
next_offsets_decoder,
463463
list_reqs,
464464
null_offset_adjustment,
@@ -467,7 +467,8 @@ impl SchedulingJob for ListFieldSchedulingJob<'_> {
467467
io,
468468
cache,
469469
priority.box_clone(),
470-
));
470+
)
471+
.boxed();
471472

472473
// Return a decoder
473474
let decoder = Box::new(ListPageDecoder {
@@ -590,9 +591,8 @@ impl FieldScheduler for ListFieldScheduler {
590591
/// them.
591592
///
592593
/// TODO: Test the case where a single list page has multiple items pages
593-
#[derive(Debug)]
594594
struct ListPageDecoder {
595-
unloaded: Option<JoinHandle<Result<IndirectlyLoaded>>>,
595+
unloaded: Option<BoxFuture<'static, Result<IndirectlyLoaded>>>,
596596
// offsets and validity will have already been decoded as part of the indirect I/O
597597
offsets: Arc<[u64]>,
598598
validity: BooleanBuffer,
@@ -605,6 +605,18 @@ struct ListPageDecoder {
605605
data_type: DataType,
606606
}
607607

608+
impl std::fmt::Debug for ListPageDecoder {
609+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
610+
f.debug_struct("ListPageDecoder")
611+
.field("unloaded", &self.unloaded.as_ref().map(|_| "..."))
612+
.field("num_rows", &self.num_rows)
613+
.field("rows_drained", &self.rows_drained)
614+
.field("rows_loaded", &self.rows_loaded)
615+
.field("data_type", &self.data_type)
616+
.finish()
617+
}
618+
}
619+
608620
struct ListDecodeTask {
609621
offsets: Vec<u64>,
610622
validity: BooleanBuffer,
@@ -695,14 +707,7 @@ impl LogicalPageDecoder for ListPageDecoder {
695707
// I/O and then wait for enough items to arrive
696708
if self.unloaded.is_some() {
697709
trace!("List scheduler needs to wait for indirect I/O to complete");
698-
let indirectly_loaded = self.unloaded.take().unwrap().await;
699-
if let Err(err) = indirectly_loaded {
700-
match err.try_into_panic() {
701-
Ok(err) => std::panic::resume_unwind(err),
702-
Err(err) => panic!("{:?}", err),
703-
};
704-
}
705-
let indirectly_loaded = indirectly_loaded.unwrap()?;
710+
let indirectly_loaded = self.unloaded.take().unwrap().await?;
706711

707712
self.offsets = indirectly_loaded.offsets;
708713
self.validity = indirectly_loaded.validity;

rust/lance-file/src/reader.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1916,6 +1916,112 @@ mod tests {
19161916
);
19171917
}
19181918

1919+
/// Reproduce: projecting a list<int32> column from a multi-column v2.0 file
1920+
/// triggers "drain was called on primitive field decoder … but the decoder was
1921+
/// never awaited".
1922+
#[rstest]
1923+
#[tokio::test]
1924+
async fn test_project_list_int32(
1925+
#[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1926+
) {
1927+
use arrow_array::builder::{Int32Builder, Int64Builder, ListBuilder};
1928+
1929+
let fs = FsFixture::default();
1930+
1931+
// Schema: col_a (int64) + col_b (list<int32>)
1932+
let arrow_schema = Arc::new(ArrowSchema::new(vec![
1933+
Field::new("col_a", DataType::Int64, false),
1934+
Field::new(
1935+
"col_b",
1936+
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
1937+
false,
1938+
),
1939+
]));
1940+
1941+
// Generate enough rows with large lists to exercise multi-page items
1942+
let num_rows = 10000_usize;
1943+
let items_per_row = 500_usize;
1944+
let mut col_a_builder = Int64Builder::with_capacity(num_rows);
1945+
let inner_builder = Int32Builder::new();
1946+
let mut list_builder = ListBuilder::new(inner_builder);
1947+
1948+
for i in 0..num_rows {
1949+
col_a_builder.append_value(i as i64);
1950+
for j in 0..items_per_row as i32 {
1951+
list_builder.values().append_value(i as i32 * 100 + j);
1952+
}
1953+
list_builder.append(true);
1954+
}
1955+
1956+
let batch = RecordBatch::try_new(
1957+
arrow_schema.clone(),
1958+
vec![
1959+
Arc::new(col_a_builder.finish()),
1960+
Arc::new(list_builder.finish()),
1961+
],
1962+
)
1963+
.unwrap();
1964+
1965+
let reader =
1966+
arrow_array::RecordBatchIterator::new(vec![Ok(batch.clone())], arrow_schema.clone());
1967+
let written = write_lance_file(
1968+
reader,
1969+
&fs,
1970+
FileWriterOptions {
1971+
format_version: Some(version),
1972+
..Default::default()
1973+
},
1974+
)
1975+
.await;
1976+
1977+
let file_scheduler = fs
1978+
.scheduler
1979+
.open_file(&fs.tmp_path, &CachedFileSize::unknown())
1980+
.await
1981+
.unwrap();
1982+
1983+
// Test: project only list column, scalar + list, and all columns
1984+
for col in [vec!["col_b"], vec!["col_a", "col_b"]] {
1985+
let file_reader = FileReader::try_open(
1986+
file_scheduler.clone(),
1987+
None,
1988+
Arc::<DecoderPlugins>::default(),
1989+
&test_cache(),
1990+
FileReaderOptions::default(),
1991+
)
1992+
.await
1993+
.unwrap();
1994+
1995+
let projection = ReaderProjection::from_column_names(
1996+
file_reader.metadata.version(),
1997+
&written.schema,
1998+
&col,
1999+
)
2000+
.unwrap();
2001+
2002+
let batch_stream = file_reader
2003+
.read_stream_projected(
2004+
lance_io::ReadBatchParams::RangeFull,
2005+
512,
2006+
16,
2007+
projection.clone(),
2008+
FilterExpression::no_filter(),
2009+
)
2010+
.unwrap();
2011+
2012+
let projection_arrow = ArrowSchema::from(projection.schema.as_ref());
2013+
verify_expected(
2014+
&written.data,
2015+
batch_stream,
2016+
512,
2017+
Some(Box::new(move |b: &RecordBatch| {
2018+
b.project_by_schema(&projection_arrow).unwrap()
2019+
})),
2020+
)
2021+
.await;
2022+
}
2023+
}
2024+
19192025
#[test_log::test(tokio::test)]
19202026
async fn test_compressing_buffer() {
19212027
let fs = FsFixture::default();

0 commit comments

Comments
 (0)