Skip to content

Commit 35c27d4

Browse files
author
Vova Kolmakov
committed
fix: sync structured primary key fields after metadata update
1 parent 43c3780 commit 35c27d4

3 files changed

Lines changed: 304 additions & 13 deletions

File tree

rust/lance-core/src/datatypes/field.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,6 +1018,23 @@ impl Field {
10181018
pub fn is_unenforced_primary_key(&self) -> bool {
10191019
self.unenforced_primary_key_position.is_some()
10201020
}
1021+
1022+
/// Re-parse well-known metadata keys and update the corresponding structured fields.
1023+
///
1024+
/// Call this after modifying `field.metadata` directly (e.g., via UpdateConfig)
1025+
/// to keep structured fields like `unenforced_primary_key_position` in sync.
1026+
pub fn sync_embedded_metadata(&mut self) {
1027+
self.unenforced_primary_key_position = self
1028+
.metadata
1029+
.get(LANCE_UNENFORCED_PRIMARY_KEY_POSITION)
1030+
.and_then(|s| s.parse::<u32>().ok())
1031+
.or_else(|| {
1032+
self.metadata
1033+
.get(LANCE_UNENFORCED_PRIMARY_KEY)
1034+
.filter(|s| matches!(s.to_lowercase().as_str(), "true" | "1" | "yes"))
1035+
.map(|_| 0)
1036+
});
1037+
}
10211038
}
10221039

10231040
impl fmt::Display for Field {
@@ -1098,16 +1115,6 @@ impl TryFrom<&ArrowField> for Field {
10981115
}
10991116
_ => vec![],
11001117
};
1101-
let unenforced_primary_key_position = metadata
1102-
.get(LANCE_UNENFORCED_PRIMARY_KEY_POSITION)
1103-
.and_then(|s| s.parse::<u32>().ok())
1104-
.or_else(|| {
1105-
// Backward compatibility: use 0 for legacy boolean flag
1106-
metadata
1107-
.get(LANCE_UNENFORCED_PRIMARY_KEY)
1108-
.filter(|s| matches!(s.to_lowercase().as_str(), "true" | "1" | "yes"))
1109-
.map(|_| 0)
1110-
});
11111118
let is_blob_v2 = has_blob_v2_extension(field);
11121119

11131120
if is_blob_v2 {
@@ -1125,7 +1132,7 @@ impl TryFrom<&ArrowField> for Field {
11251132
LogicalType::try_from(field.data_type())?
11261133
};
11271134

1128-
Ok(Self {
1135+
let mut result = Self {
11291136
id,
11301137
parent_id: -1,
11311138
name: field.name().clone(),
@@ -1144,8 +1151,10 @@ impl TryFrom<&ArrowField> for Field {
11441151
nullable: field.is_nullable(),
11451152
children,
11461153
dictionary: None,
1147-
unenforced_primary_key_position,
1148-
})
1154+
unenforced_primary_key_position: None,
1155+
};
1156+
result.sync_embedded_metadata();
1157+
Ok(result)
11491158
}
11501159
}
11511160

rust/lance/src/dataset/metadata.rs

Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,4 +557,285 @@ mod tests {
557557

558558
assert!(matches!(result, Err(Error::InvalidInput { .. })));
559559
}
560+
561+
/// Helper to create a simple dataset with a non-nullable `id` field suitable for PK tests.
562+
async fn test_dataset_for_pk() -> Dataset {
563+
let schema = Arc::new(ArrowSchema::new(vec![
564+
ArrowField::new("id", DataType::Int32, false),
565+
ArrowField::new("value", DataType::Utf8, true),
566+
]));
567+
568+
let batch = RecordBatch::try_new(
569+
schema.clone(),
570+
vec![
571+
Arc::new(Int32Array::from(vec![1, 2, 3])),
572+
Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])),
573+
],
574+
)
575+
.unwrap();
576+
577+
Dataset::write(
578+
RecordBatchIterator::new(vec![Ok(batch)], schema.clone()),
579+
"memory://",
580+
None,
581+
)
582+
.await
583+
.unwrap()
584+
}
585+
586+
#[tokio::test]
587+
async fn test_update_field_metadata_sets_unenforced_primary_key() {
588+
let mut dataset = test_dataset_for_pk().await;
589+
590+
// Set the boolean primary key metadata on the "id" field
591+
dataset
592+
.update_field_metadata()
593+
.update("id", [("lance-schema:unenforced-primary-key", "true")])
594+
.unwrap()
595+
.await
596+
.unwrap();
597+
598+
let field = dataset.schema().field("id").unwrap();
599+
assert!(
600+
field.is_unenforced_primary_key(),
601+
"Field should be recognized as unenforced primary key after metadata update"
602+
);
603+
assert_eq!(
604+
field.unenforced_primary_key_position,
605+
Some(0),
606+
"Legacy boolean flag should map to position 0"
607+
);
608+
}
609+
610+
#[tokio::test]
611+
async fn test_update_field_metadata_sets_unenforced_primary_key_position() {
612+
let mut dataset = test_dataset_for_pk().await;
613+
614+
// Set both the boolean flag and explicit position
615+
dataset
616+
.update_field_metadata()
617+
.update(
618+
"id",
619+
[
620+
("lance-schema:unenforced-primary-key", "true"),
621+
("lance-schema:unenforced-primary-key:position", "2"),
622+
],
623+
)
624+
.unwrap()
625+
.await
626+
.unwrap();
627+
628+
let field = dataset.schema().field("id").unwrap();
629+
assert!(field.is_unenforced_primary_key());
630+
assert_eq!(
631+
field.unenforced_primary_key_position,
632+
Some(2),
633+
"Explicit position should take precedence over boolean flag"
634+
);
635+
}
636+
637+
#[tokio::test]
638+
async fn test_update_field_metadata_removes_unenforced_primary_key() {
639+
let mut dataset = test_dataset_for_pk().await;
640+
641+
// First, set the primary key
642+
dataset
643+
.update_field_metadata()
644+
.update("id", [("lance-schema:unenforced-primary-key", "true")])
645+
.unwrap()
646+
.await
647+
.unwrap();
648+
assert!(
649+
dataset
650+
.schema()
651+
.field("id")
652+
.unwrap()
653+
.is_unenforced_primary_key()
654+
);
655+
656+
// Now remove it by setting value to None (delete the key)
657+
dataset
658+
.update_field_metadata()
659+
.update(
660+
"id",
661+
[("lance-schema:unenforced-primary-key", Option::<&str>::None)],
662+
)
663+
.unwrap()
664+
.await
665+
.unwrap();
666+
667+
let field = dataset.schema().field("id").unwrap();
668+
assert!(
669+
!field.is_unenforced_primary_key(),
670+
"Field should no longer be a primary key after removing the metadata key"
671+
);
672+
assert_eq!(field.unenforced_primary_key_position, None);
673+
}
674+
675+
#[tokio::test]
676+
async fn test_update_field_metadata_replace_clears_unenforced_primary_key() {
677+
let mut dataset = test_dataset_for_pk().await;
678+
679+
// First, set the primary key
680+
dataset
681+
.update_field_metadata()
682+
.update("id", [("lance-schema:unenforced-primary-key", "true")])
683+
.unwrap()
684+
.await
685+
.unwrap();
686+
assert!(
687+
dataset
688+
.schema()
689+
.field("id")
690+
.unwrap()
691+
.is_unenforced_primary_key()
692+
);
693+
694+
// Replace all metadata with unrelated keys — PK metadata should be cleared
695+
dataset
696+
.update_field_metadata()
697+
.replace("id", [("some-other-key", "some-value")])
698+
.unwrap()
699+
.await
700+
.unwrap();
701+
702+
let field = dataset.schema().field("id").unwrap();
703+
assert!(
704+
!field.is_unenforced_primary_key(),
705+
"Primary key status should be cleared after replacing metadata without PK keys"
706+
);
707+
assert_eq!(field.unenforced_primary_key_position, None);
708+
}
709+
710+
#[tokio::test]
711+
async fn test_update_field_metadata_primary_key_roundtrip() {
712+
use lance_core::utils::tempfile::TempDir;
713+
714+
let dir = TempDir::default();
715+
let uri = dir.path_str();
716+
717+
let schema = Arc::new(ArrowSchema::new(vec![
718+
ArrowField::new("id", DataType::Int32, false),
719+
ArrowField::new("value", DataType::Utf8, true),
720+
]));
721+
722+
let batch = RecordBatch::try_new(
723+
schema.clone(),
724+
vec![
725+
Arc::new(Int32Array::from(vec![1, 2, 3])),
726+
Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])),
727+
],
728+
)
729+
.unwrap();
730+
731+
let mut dataset = Dataset::write(
732+
RecordBatchIterator::new(vec![Ok(batch)], schema.clone()),
733+
&uri,
734+
None,
735+
)
736+
.await
737+
.unwrap();
738+
739+
// Set PK metadata via update
740+
dataset
741+
.update_field_metadata()
742+
.update(
743+
"id",
744+
[
745+
("lance-schema:unenforced-primary-key", "true"),
746+
("lance-schema:unenforced-primary-key:position", "1"),
747+
],
748+
)
749+
.unwrap()
750+
.await
751+
.unwrap();
752+
753+
// Reload the dataset from storage to verify protobuf round-trip
754+
let reloaded = Dataset::open(&uri).await.unwrap();
755+
let field = reloaded.schema().field("id").unwrap();
756+
assert!(
757+
field.is_unenforced_primary_key(),
758+
"Primary key should survive protobuf round-trip after metadata update"
759+
);
760+
assert_eq!(
761+
field.unenforced_primary_key_position,
762+
Some(1),
763+
"Primary key position should survive protobuf round-trip"
764+
);
765+
}
766+
767+
#[tokio::test]
768+
async fn test_update_field_metadata_primary_key_used_by_merge_insert() {
769+
use crate::dataset::write::merge_insert::*;
770+
771+
let mut dataset = test_dataset_for_pk().await;
772+
773+
// Set PK via metadata update (the bug scenario)
774+
dataset
775+
.update_field_metadata()
776+
.update("id", [("lance-schema:unenforced-primary-key", "true")])
777+
.unwrap()
778+
.await
779+
.unwrap();
780+
781+
let dataset = Arc::new(dataset);
782+
783+
// Prepare new data that overlaps with existing
784+
let schema = Arc::new(ArrowSchema::new(vec![
785+
ArrowField::new("id", DataType::Int32, false),
786+
ArrowField::new("value", DataType::Utf8, true),
787+
]));
788+
let new_batch = RecordBatch::try_new(
789+
schema.clone(),
790+
vec![
791+
Arc::new(Int32Array::from(vec![2, 4])),
792+
Arc::new(arrow_array::StringArray::from(vec!["updated", "new"])),
793+
],
794+
)
795+
.unwrap();
796+
797+
// MergeInsert with empty `on` keys — should default to the unenforced PK
798+
let mut builder = MergeInsertBuilder::try_new(dataset.clone(), Vec::new()).unwrap();
799+
builder
800+
.when_matched(WhenMatched::UpdateAll)
801+
.when_not_matched(WhenNotMatched::InsertAll);
802+
let job = builder.try_build().unwrap();
803+
804+
let new_reader = Box::new(RecordBatchIterator::new([Ok(new_batch)], schema.clone()));
805+
let new_stream = lance_datafusion::utils::reader_to_stream(new_reader);
806+
807+
let (updated_dataset, stats) = job.execute(new_stream).await.unwrap();
808+
809+
assert_eq!(stats.num_inserted_rows, 1, "id=4 should be inserted");
810+
assert_eq!(stats.num_updated_rows, 1, "id=2 should be updated");
811+
812+
let result = updated_dataset.scan().try_into_batch().await.unwrap();
813+
let ids = result
814+
.column_by_name("id")
815+
.unwrap()
816+
.as_any()
817+
.downcast_ref::<Int32Array>()
818+
.unwrap();
819+
let values = result
820+
.column_by_name("value")
821+
.unwrap()
822+
.as_any()
823+
.downcast_ref::<arrow_array::StringArray>()
824+
.unwrap();
825+
826+
let mut pairs: Vec<(i32, String)> = (0..ids.len())
827+
.map(|i| (ids.value(i), values.value(i).to_string()))
828+
.collect();
829+
pairs.sort_by_key(|(id, _)| *id);
830+
831+
assert_eq!(
832+
pairs,
833+
vec![
834+
(1, "a".to_string()),
835+
(2, "updated".to_string()),
836+
(3, "c".to_string()),
837+
(4, "new".to_string()),
838+
]
839+
);
840+
}
560841
}

rust/lance/src/dataset/transaction.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2327,6 +2327,7 @@ impl Transaction {
23272327
for (field_id, field_metadata_update) in field_metadata_updates {
23282328
if let Some(field) = manifest.schema.field_by_id_mut(*field_id) {
23292329
apply_update_map(&mut field.metadata, field_metadata_update);
2330+
field.sync_embedded_metadata();
23302331
} else {
23312332
return Err(Error::invalid_input_source(
23322333
format!("Field with id {} does not exist", field_id).into(),

0 commit comments

Comments
 (0)