Skip to content

Commit 4f6e3c1

Browse files
Xuanwoerandagan
andauthored
fix: avoid corrupting sub-schema merge_insert pages on v2.2 (#6420)
This canonicalizes all-valid validity bitmaps into the same rep/def state as no-null arrays, so sub-schema `merge_insert` updates on `data_storage_version=2.2` stop emitting inconsistent control-word metadata and corrupting variable-width pages. This PR picks up the proposed fix for #6338 and adds regression coverage for both the end-to-end binary `merge_insert` failure and the underlying repdef canonicalization invariant. --------- Co-authored-by: Eran Dagan <eran@botika.io>
1 parent a43e725 commit 4f6e3c1

2 files changed

Lines changed: 153 additions & 0 deletions

File tree

rust/lance-encoding/src/repdef.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,10 @@ impl RepDefBuilder {
817817
/// Registers a nullable validity bitmap
818818
pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
819819
self.check_validity_len(validity.len());
820+
if validity.null_count() == 0 {
821+
self.add_no_null(validity.len());
822+
return;
823+
}
820824
self.repdefs.push(RawRepDef::Validity(ValidityDesc {
821825
num_values: validity.len(),
822826
validity: Some(validity.into_inner()),
@@ -2833,6 +2837,26 @@ mod tests {
28332837
assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
28342838
}
28352839

2840+
#[test]
2841+
fn test_all_valid_validity_bitmap_serializes_as_no_null() {
2842+
let mut from_bitmap = RepDefBuilder::default();
2843+
from_bitmap.add_validity_bitmap(validity(&[true, true, true, true]));
2844+
2845+
let mut from_no_null = RepDefBuilder::default();
2846+
from_no_null.add_no_null(4);
2847+
2848+
let from_bitmap = RepDefBuilder::serialize(vec![from_bitmap]);
2849+
let from_no_null = RepDefBuilder::serialize(vec![from_no_null]);
2850+
2851+
assert!(from_bitmap.repetition_levels.is_none());
2852+
assert!(from_bitmap.definition_levels.is_none());
2853+
assert_eq!(from_bitmap.def_meaning, from_no_null.def_meaning);
2854+
assert_eq!(
2855+
from_bitmap.max_visible_level,
2856+
from_no_null.max_visible_level
2857+
);
2858+
}
2859+
28362860
#[test]
28372861
fn test_slicer() {
28382862
let mut builder = RepDefBuilder::default();

rust/lance/src/dataset/tests/dataset_merge_update.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2072,3 +2072,132 @@ async fn test_fts_index_incremental_reindex_after_in_place_update() {
20722072
results.num_rows()
20732073
);
20742074
}
2075+
2076+
/// Regression test for https://github.com/lance-format/lance/issues/6338
2077+
/// Sub-schema merge_insert with binary columns on v2.2 causes data corruption
2078+
/// when the binary values are >= 256 bytes.
2079+
#[tokio::test]
2080+
async fn test_sub_schema_merge_insert_binary_v2_2() {
2081+
use crate::dataset::write::merge_insert::WhenMatched;
2082+
use arrow_array::BinaryArray;
2083+
2084+
let schema = Arc::new(ArrowSchema::new(vec![
2085+
ArrowField::new("id", DataType::Int64, false),
2086+
ArrowField::new("a", DataType::Binary, true),
2087+
ArrowField::new("b", DataType::Utf8, true),
2088+
]));
2089+
2090+
let test_uri = TempStrDir::default();
2091+
2092+
// Initial write: 2 rows with null binary values
2093+
let initial_batch = RecordBatch::try_new(
2094+
schema.clone(),
2095+
vec![
2096+
Arc::new(arrow_array::Int64Array::from(vec![0, 1])),
2097+
Arc::new(BinaryArray::from(vec![None::<&[u8]>, None])),
2098+
Arc::new(StringArray::from(vec![None::<&str>, None])),
2099+
],
2100+
)
2101+
.unwrap();
2102+
2103+
let write_params = WriteParams {
2104+
data_storage_version: Some(LanceFileVersion::V2_2),
2105+
..Default::default()
2106+
};
2107+
let batches = RecordBatchIterator::new(vec![initial_batch].into_iter().map(Ok), schema.clone());
2108+
Dataset::write(batches, &test_uri, Some(write_params))
2109+
.await
2110+
.unwrap();
2111+
2112+
let sub_schema = Arc::new(ArrowSchema::new(vec![
2113+
ArrowField::new("id", DataType::Int64, false),
2114+
ArrowField::new("a", DataType::Binary, true),
2115+
]));
2116+
2117+
// Sub-schema merge_insert for row 0 (binary value >= 256 bytes)
2118+
let data_a: Vec<u8> = (0..256).map(|i| (i % 251) as u8).collect();
2119+
{
2120+
let update_batch = RecordBatch::try_new(
2121+
sub_schema.clone(),
2122+
vec![
2123+
Arc::new(arrow_array::Int64Array::from(vec![0])),
2124+
Arc::new(BinaryArray::from(vec![Some(data_a.as_slice())])),
2125+
],
2126+
)
2127+
.unwrap();
2128+
let dataset = Dataset::open(&test_uri).await.unwrap();
2129+
let source = Box::new(RecordBatchIterator::new(
2130+
vec![update_batch].into_iter().map(Ok),
2131+
sub_schema.clone(),
2132+
));
2133+
MergeInsertBuilder::try_new(dataset.into(), vec!["id".into()])
2134+
.unwrap()
2135+
.when_matched(WhenMatched::UpdateAll)
2136+
.try_build()
2137+
.unwrap()
2138+
.execute_reader(source)
2139+
.await
2140+
.unwrap();
2141+
}
2142+
2143+
// Read back and verify first merge worked
2144+
let dataset = Dataset::open(&test_uri).await.unwrap();
2145+
let table = dataset
2146+
.scan()
2147+
.project(&["id", "a"])
2148+
.unwrap()
2149+
.try_into_stream()
2150+
.await
2151+
.unwrap()
2152+
.try_collect::<Vec<_>>()
2153+
.await
2154+
.unwrap();
2155+
let table = concat_batches(&table[0].schema(), &table).unwrap();
2156+
assert_eq!(table.num_rows(), 2);
2157+
2158+
// Sub-schema merge_insert for row 1 (binary value >= 256 bytes)
2159+
let data_b: Vec<u8> = (0..256).map(|i| ((i + 100) % 251) as u8).collect();
2160+
{
2161+
let update_batch = RecordBatch::try_new(
2162+
sub_schema.clone(),
2163+
vec![
2164+
Arc::new(arrow_array::Int64Array::from(vec![1])),
2165+
Arc::new(BinaryArray::from(vec![Some(data_b.as_slice())])),
2166+
],
2167+
)
2168+
.unwrap();
2169+
let dataset = Dataset::open(&test_uri).await.unwrap();
2170+
let source = Box::new(RecordBatchIterator::new(
2171+
vec![update_batch].into_iter().map(Ok),
2172+
sub_schema.clone(),
2173+
));
2174+
MergeInsertBuilder::try_new(dataset.into(), vec!["id".into()])
2175+
.unwrap()
2176+
.when_matched(WhenMatched::UpdateAll)
2177+
.try_build()
2178+
.unwrap()
2179+
.execute_reader(source)
2180+
.await
2181+
.unwrap();
2182+
}
2183+
2184+
// Read back and verify - this is where the bug manifests
2185+
let dataset = Dataset::open(&test_uri).await.unwrap();
2186+
let table = dataset
2187+
.scan()
2188+
.project(&["id", "a"])
2189+
.unwrap()
2190+
.try_into_stream()
2191+
.await
2192+
.unwrap()
2193+
.try_collect::<Vec<_>>()
2194+
.await
2195+
.unwrap();
2196+
let table = concat_batches(&table[0].schema(), &table).unwrap();
2197+
assert_eq!(table.num_rows(), 2);
2198+
2199+
let a_col = table.column_by_name("a").unwrap();
2200+
let binary_arr = a_col.as_any().downcast_ref::<BinaryArray>().unwrap();
2201+
assert_eq!(binary_arr.value(0), data_a.as_slice());
2202+
assert_eq!(binary_arr.value(1), data_b.as_slice());
2203+
}

0 commit comments

Comments
 (0)