Skip to content

Commit ac142d3

Browse files
authored
refactor: don't use schema dictionary unless using legacy storage (#4023)
Closes #4010
1 parent d723708 commit ac142d3

8 files changed

Lines changed: 104 additions & 11 deletions

File tree

python/python/tests/test_dataset.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2294,6 +2294,80 @@ def test_scan_with_batch_size(tmp_path: Path):
22942294
assert batch.num_rows != 12
22952295

22962296

2297+
def test_dictionaries(tmp_path: Path):
2298+
data = pa.table(
2299+
{
2300+
"id": pa.array([1, 2, 3]),
2301+
"dict": pa.array(
2302+
["foo", "bar", "baz"], pa.dictionary(pa.int32(), pa.string())
2303+
),
2304+
}
2305+
)
2306+
ds = lance.write_dataset(data, tmp_path)
2307+
assert ds.schema == pa.schema(
2308+
{"id": pa.int64(), "dict": pa.dictionary(pa.int32(), pa.string())}
2309+
)
2310+
assert ds.to_table() == data
2311+
2312+
# Can insert data with new values
2313+
new_data = pa.table(
2314+
{
2315+
"id": [4, 5, 6],
2316+
"dict": pa.array(
2317+
["qux", "quux", "corge"], pa.dictionary(pa.int32(), pa.string())
2318+
),
2319+
}
2320+
)
2321+
ds.insert(new_data)
2322+
table = ds.to_table().combine_chunks()
2323+
assert table == pa.table(
2324+
{
2325+
"id": [1, 2, 3, 4, 5, 6],
2326+
"dict": pa.array(
2327+
["foo", "bar", "baz", "qux", "quux", "corge"],
2328+
pa.dictionary(pa.int32(), pa.string()),
2329+
),
2330+
}
2331+
)
2332+
2333+
dict_arr = table.column("dict").chunk(0)
2334+
assert dict_arr.type == pa.dictionary(pa.int32(), pa.string())
2335+
assert dict_arr.to_pylist() == ["foo", "bar", "baz", "qux", "quux", "corge"]
2336+
2337+
assert dict_arr.dictionary.to_pylist() == [
2338+
"foo",
2339+
"bar",
2340+
"baz",
2341+
"qux",
2342+
"quux",
2343+
"corge",
2344+
]
2345+
2346+
# Can merge insert data that has even more values
2347+
new_data = pa.table(
2348+
{
2349+
"id": [1, 7],
2350+
"dict": pa.array(
2351+
["grault", "garply"], pa.dictionary(pa.int32(), pa.string())
2352+
),
2353+
}
2354+
)
2355+
ds.merge_insert(
2356+
"id"
2357+
).when_matched_update_all().when_not_matched_insert_all().execute(new_data)
2358+
table = ds.to_table().combine_chunks().sort_by("id")
2359+
assert table.column("id").to_pylist() == [1, 2, 3, 4, 5, 6, 7]
2360+
assert table.column("dict").to_pylist() == [
2361+
"grault",
2362+
"bar",
2363+
"baz",
2364+
"qux",
2365+
"quux",
2366+
"corge",
2367+
"garply",
2368+
]
2369+
2370+
22972371
@pytest.mark.slow
22982372
def test_io_buffer_size(tmp_path: Path):
22992373
# These cases regress deadlock issues that happen when the

rust/lance-table/src/format/manifest.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,9 @@ impl SelfDescribingFileReader for FileReader {
632632
location: location!(),
633633
})?;
634634
let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
635-
populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
635+
if manifest.should_use_legacy_format() {
636+
populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
637+
}
636638
let schema = manifest.schema;
637639
let max_field_id = schema.max_field_id().unwrap_or_default();
638640
Self::try_new_from_reader(

rust/lance-table/src/io/manifest.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,10 @@ pub async fn write_manifest(
162162
) -> Result<usize> {
163163
// Write dictionary values.
164164
let max_field_id = manifest.schema.max_field_id().unwrap_or(-1);
165+
let is_legacy_storage = manifest.should_use_legacy_format();
165166
for field_id in 0..max_field_id + 1 {
166167
if let Some(field) = manifest.schema.mut_field_by_id(field_id) {
167-
if field.data_type().is_dictionary() {
168+
if field.data_type().is_dictionary() && is_legacy_storage {
168169
let dict_info = field.dictionary.as_mut().ok_or_else(|| {
169170
Error::io(
170171
format!("Lance field {} misses dictionary info", field.name),

rust/lance/src/dataset.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,9 @@ impl Dataset {
483483
}
484484
}
485485

486-
populate_schema_dictionary(&mut manifest.schema, object_reader.as_ref()).await?;
486+
if manifest.should_use_legacy_format() {
487+
populate_schema_dictionary(&mut manifest.schema, object_reader.as_ref()).await?;
488+
}
487489

488490
Ok(manifest)
489491
}
@@ -629,7 +631,7 @@ impl Dataset {
629631
return Ok((self.manifest.clone(), self.manifest_location.clone()));
630632
}
631633
let mut manifest = read_manifest(&self.object_store, &location.path, location.size).await?;
632-
if manifest.schema.has_dictionary_types() {
634+
if manifest.schema.has_dictionary_types() && manifest.should_use_legacy_format() {
633635
let reader = if let Some(size) = location.size {
634636
self.object_store
635637
.open_with_size(&location.path, size as usize)
@@ -2651,8 +2653,12 @@ mod tests {
26512653
assert!(matches!(result, Err(Error::SchemaMismatch { .. })))
26522654
}
26532655

2656+
#[rstest]
26542657
#[tokio::test]
2655-
async fn append_dictionary() {
2658+
async fn append_dictionary(
2659+
#[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
2660+
data_storage_version: LanceFileVersion,
2661+
) {
26562662
// We store the dictionary as part of the schema, so we check that the
26572663
// dictionary is consistent between appends.
26582664

@@ -2676,6 +2682,7 @@ mod tests {
26762682
let mut write_params = WriteParams {
26772683
max_rows_per_file: 40,
26782684
max_rows_per_group: 10,
2685+
data_storage_version: Some(data_storage_version),
26792686
..Default::default()
26802687
};
26812688
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
@@ -2711,10 +2718,14 @@ mod tests {
27112718
)
27122719
.unwrap()];
27132720

2714-
// Try write to dataset (fail)
2721+
// Try write to dataset (fails with legacy format)
27152722
let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
27162723
let result = Dataset::write(batches, test_uri, Some(write_params)).await;
2717-
assert!(result.is_err());
2724+
if data_storage_version == LanceFileVersion::Legacy {
2725+
assert!(result.is_err());
2726+
} else {
2727+
assert!(result.is_ok());
2728+
}
27182729
}
27192730

27202731
#[rstest]

rust/lance/src/dataset/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ impl DatasetBuilder {
308308
let location = commit_handler
309309
.resolve_version_location(&base_path, manifest.version, &object_store.inner)
310310
.await?;
311-
if manifest.schema.has_dictionary_types() {
311+
if manifest.schema.has_dictionary_types() && manifest.should_use_legacy_format() {
312312
let reader = object_store.open(&location.path).await?;
313313
populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
314314
}

rust/lance/src/dataset/write.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ pub async fn write_fragments_internal(
369369
compare_nullability: NullabilityComparison::Ignore,
370370
allow_missing_if_nullable: true,
371371
ignore_field_order: true,
372-
compare_dictionary: true,
372+
compare_dictionary: dataset.is_legacy_storage(),
373373
..Default::default()
374374
},
375375
)?;

rust/lance/src/dataset/write/insert.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,12 @@ impl<'a> InsertBuilder<'a> {
307307
}
308308
let m = dataset.manifest.as_ref();
309309
let mut schema_cmp_opts = SchemaCompareOptions {
310-
compare_dictionary: true,
310+
// In the legacy format we stored the dictionary in the manifest and
311+
// all files must have identical dictionaries.
312+
//
313+
// In 2.0+ the dictionary is stored in the files and dictionaries may
314+
// fluctuate between files.
315+
compare_dictionary: m.should_use_legacy_format(),
311316
// array nullability is checked later, using actual data instead
312317
// of the schema
313318
compare_nullability: NullabilityComparison::Ignore,

rust/lance/src/dataset/write/merge_insert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ impl MergeInsertJob {
403403
let is_compatible = lance_schema.check_compatible(
404404
self.dataset.schema(),
405405
&SchemaCompareOptions {
406-
compare_dictionary: true,
406+
compare_dictionary: self.dataset.is_legacy_storage(),
407407
..Default::default()
408408
},
409409
);

0 commit comments

Comments
 (0)