@@ -33,6 +33,34 @@ use std::sync::Arc;
3333const DICTIONARY_NAME : & str = "ontology_dictionary" ;
3434const META_NAME : & str = "ontology_meta" ;
3535
36+ // Why this exists (read before proposing a migration path):
37+ //
38+ // `ontology_dictionary` is a CACHE of hydrated TTL, keyed in the meta table
39+ // by `ttl_root_checksum`. The TTL files on disk are the source of truth;
40+ // this Lance dataset is a fast-path projection so hydration doesn't re-parse
41+ // on every boot. BindSpace (FingerprintColumns / QualiaColumn / MetaColumn /
42+ // EdgeColumn) is the live runtime SoA and is unrelated — it never lands here.
43+ //
44+ // Because we're cache, not source-of-truth, schema evolution does NOT need
45+ // a per-version migration ladder. On version mismatch we invalidate (delete
46+ // the cache directory) and let hydration re-derive from TTL. That eliminates
47+ // a class of "silent default-fill smuggles synthesized zeros into the
48+ // codebook" bugs at the cost of one cold rebuild on the first boot after a
49+ // version bump. Cold rebuild is acceptable; codebook contamination is not.
50+ //
51+ // "Unknown" version (newer than this binary expects, e.g. a feature branch
52+ // wrote v3 columns we don't know about) is also invalidated — forward-incompat
53+ // datasets get a clean rebuild rather than corrupting the running binary's
54+ // view of the codebook.
55+ //
56+ // **Rule for the next editor:** if you change `dictionary_schema()` in any
57+ // way (add / remove / rename / retype a column), bump `SCHEMA_VERSION` in
58+ // the same commit. The `schema_version_pinned` unit test fails loudly
59+ // otherwise — that's the compile-adjacent guard. The runtime guard is
60+ // `LanceWriter::open_or_create`, which checks the on-disk version against
61+ // this constant and invalidates on any mismatch.
62+ pub const SCHEMA_VERSION : u32 = 2 ;
63+
3664pub struct LanceWriter {
3765 base : PathBuf ,
3866}
@@ -43,9 +71,66 @@ impl LanceWriter {
4371 path : path. to_path_buf ( ) ,
4472 source,
4573 } ) ?;
46- Ok ( Self {
74+ let writer = Self {
4775 base : path. to_path_buf ( ) ,
48- } )
76+ } ;
77+ writer. invalidate_if_stale_schema ( ) . await ?;
78+ Ok ( writer)
79+ }
80+
81+ // Read the persisted `schema_version` from the meta table. Returns:
82+ // Ok(Some(n)) — meta exists and the column was readable
83+ // Ok(None) — meta dir is absent (fresh install) OR the column is
84+ // missing / unreadable (pre-versioning v1 deployment,
85+ // or a corrupted meta file — both treated as "stale,
86+ // invalidate" by the caller)
87+ async fn read_schema_version ( & self ) -> Result < Option < u32 > > {
88+ let path = self . meta_path ( ) ;
89+ if !path. exists ( ) {
90+ return Ok ( None ) ;
91+ }
92+ let path_str = path. to_string_lossy ( ) . to_string ( ) ;
93+ let dataset = match Dataset :: open ( & path_str) . await {
94+ Ok ( d) => d,
95+ Err ( _) => return Ok ( None ) ,
96+ } ;
97+ let mut stream = match dataset. scan ( ) . try_into_stream ( ) . await {
98+ Ok ( s) => s,
99+ Err ( _) => return Ok ( None ) ,
100+ } ;
101+ use futures:: StreamExt ;
102+ if let Some ( batch) = stream. next ( ) . await {
103+ let Ok ( batch) = batch else { return Ok ( None ) } ;
104+ let Some ( col) = batch. column_by_name ( "schema_version" ) else {
105+ return Ok ( None ) ;
106+ } ;
107+ let Some ( arr) = col. as_any ( ) . downcast_ref :: < UInt32Array > ( ) else {
108+ return Ok ( None ) ;
109+ } ;
110+ if arr. len ( ) > 0 {
111+ return Ok ( Some ( arr. value ( 0 ) ) ) ;
112+ }
113+ }
114+ Ok ( None )
115+ }
116+
117+ // On version mismatch, drop the cache so the next hydration rebuilds
118+ // from TTL. See the module-level reasoning comment above `SCHEMA_VERSION`
119+ // for why we invalidate instead of migrating.
120+ async fn invalidate_if_stale_schema ( & self ) -> Result < ( ) > {
121+ let on_disk = self . read_schema_version ( ) . await ?;
122+ if on_disk == Some ( SCHEMA_VERSION ) {
123+ return Ok ( ( ) ) ;
124+ }
125+ for sub in [ self . dictionary_path ( ) , self . meta_path ( ) ] {
126+ if sub. exists ( ) {
127+ std:: fs:: remove_dir_all ( & sub) . map_err ( |source| Error :: Io {
128+ path : sub. clone ( ) ,
129+ source,
130+ } ) ?;
131+ }
132+ }
133+ Ok ( ( ) )
49134 }
50135
51136 pub fn dictionary_path ( & self ) -> PathBuf {
@@ -132,6 +217,9 @@ impl LanceWriter {
132217 }
133218
134219 pub async fn set_last_root_checksum ( & self , checksum : & str ) -> Result < ( ) > {
220+ // `schema_version` is the cache-coherence handshake — read on open
221+ // by `invalidate_if_stale_schema` to decide whether the on-disk
222+ // dictionary is still meaningful to this binary.
135223 let schema = Arc :: new ( ArrowSchema :: new ( vec ! [
136224 Field :: new( "ttl_root_checksum" , DataType :: Utf8 , false ) ,
137225 Field :: new(
@@ -140,12 +228,14 @@ impl LanceWriter {
140228 false ,
141229 ) ,
142230 Field :: new( "crate_version" , DataType :: Utf8 , false ) ,
231+ Field :: new( "schema_version" , DataType :: UInt32 , false ) ,
143232 ] ) ) ;
144233 let now = chrono_micros ( ) ;
145234 let cols: Vec < ArrayRef > = vec ! [
146235 Arc :: new( StringArray :: from( vec![ checksum] ) ) ,
147236 Arc :: new( TimestampMicrosecondArray :: from( vec![ now] ) ) ,
148237 Arc :: new( StringArray :: from( vec![ env!( "CARGO_PKG_VERSION" ) ] ) ) ,
238+ Arc :: new( UInt32Array :: from( vec![ SCHEMA_VERSION ] ) ) ,
149239 ] ;
150240 let batch = RecordBatch :: try_new ( schema. clone ( ) , cols)
151241 . map_err ( |e| Error :: Arrow ( format ! ( "meta batch: {e}" ) ) ) ?;
@@ -192,11 +282,15 @@ fn dictionary_schema() -> Arc<ArrowSchema> {
192282 Field :: new( "base17_head" , DataType :: FixedSizeBinary ( 8 ) , false ) ,
193283 Field :: new( "palette_key" , DataType :: UInt32 , false ) ,
194284 Field :: new( "scent" , DataType :: UInt8 , false ) ,
195- // QualiaMeta — Pillar-0 dispatch bundle
285+ // QualiaMeta — Pillar-0 dispatch bundle.
286+ // Item nullability mirrors what `FixedSizeListBuilder<Float32Builder>`
287+ // produces by default (nullable items). We never actually write nulls,
288+ // but the schema has to agree with the builder for `RecordBatch::try_new`
289+ // to accept the column. The outer list field stays non-null.
196290 Field :: new(
197291 "qualia" ,
198292 DataType :: FixedSizeList (
199- Arc :: new( Field :: new( "item" , DataType :: Float32 , false ) ) ,
293+ Arc :: new( Field :: new( "item" , DataType :: Float32 , true ) ) ,
200294 18 ,
201295 ) ,
202296 false ,
@@ -845,4 +939,138 @@ mod tests {
845939 assert_eq ! ( r. thinking_style, None , "None thinking_style must survive round-trip" ) ;
846940 assert ! ( r. attribute_sources. is_empty( ) , "empty attribute_sources must survive round-trip" ) ;
847941 }
942+
943+ // Pins the schema field-set against `SCHEMA_VERSION`. If you change
944+ // `dictionary_schema()` without bumping `SCHEMA_VERSION`, this test
945+ // fails — that's the compile-adjacent guard for the cache-coherence
946+ // contract. To fix: bump `SCHEMA_VERSION` in lance_cache.rs and update
947+ // the `expected` list below with the new field set (printed on failure).
948+ #[ test]
949+ fn schema_version_pinned ( ) {
950+ let schema = dictionary_schema ( ) ;
951+ let actual: Vec < ( String , String , bool ) > = schema
952+ . fields ( )
953+ . iter ( )
954+ . map ( |f| ( f. name ( ) . clone ( ) , format ! ( "{:?}" , f. data_type( ) ) , f. is_nullable ( ) ) )
955+ . collect ( ) ;
956+ // Pinned to SCHEMA_VERSION = 2.
957+ let expected: Vec < ( & str , & str , bool ) > = vec ! [
958+ ( "bridge_id" , "Utf8" , false ) ,
959+ ( "public_name" , "Utf8" , false ) ,
960+ ( "ogit_uri" , "Utf8" , false ) ,
961+ ( "namespace_id" , "UInt8" , false ) ,
962+ ( "schema_ptr" , "UInt32" , false ) ,
963+ ( "kind" , "Utf8" , false ) ,
964+ ( "semantic_type" , "Utf8" , false ) ,
965+ ( "marking" , "Utf8" , false ) ,
966+ ( "confidence" , "Float32" , false ) ,
967+ ( "created_at" , "Timestamp(Microsecond, None)" , false ) ,
968+ ( "created_by" , "Utf8" , false ) ,
969+ ( "source_uri" , "Utf8" , false ) ,
970+ ( "active" , "Boolean" , false ) ,
971+ ( "checksum" , "Utf8" , false ) ,
972+ ( "cam_pq_code" , "FixedSizeBinary(6)" , false ) ,
973+ ( "base17_head" , "FixedSizeBinary(8)" , false ) ,
974+ ( "palette_key" , "UInt32" , false ) ,
975+ ( "scent" , "UInt8" , false ) ,
976+ // qualia data_type debug format depends on arrow internals; the
977+ // round-trip tests catch any drift in item nullability, so here
978+ // we only assert the column name and outer nullability.
979+ ( "qualia" , "__skip__" , false ) ,
980+ ( "codec_meta" , "UInt32" , false ) ,
981+ ( "codec_edge" , "UInt64" , false ) ,
982+ ( "thinking_style" , "Utf8" , true ) ,
983+ ( "attribute_sources_enc" , "Utf8" , false ) ,
984+ ( "subject_type" , "Utf8" , false ) ,
985+ ( "object_type" , "Utf8" , false ) ,
986+ ( "entity_type_ref" , "Utf8" , false ) ,
987+ ] ;
988+ assert_eq ! (
989+ actual. len( ) ,
990+ expected. len( ) ,
991+ "column count drifted from SCHEMA_VERSION = {SCHEMA_VERSION}; bump the constant and update this pin. actual = {actual:#?}" ,
992+ ) ;
993+ for ( i, ( ( a_name, a_type, a_null) , ( e_name, e_type, e_null) ) ) in
994+ actual. iter ( ) . zip ( expected. iter ( ) ) . enumerate ( )
995+ {
996+ assert_eq ! ( a_name. as_str( ) , * e_name, "column {i} name drifted" ) ;
997+ assert_eq ! (
998+ * a_null, * e_null,
999+ "column {i} ({e_name}) outer-nullability drifted from SCHEMA_VERSION = {SCHEMA_VERSION}" ,
1000+ ) ;
1001+ if * e_type != "__skip__" {
1002+ assert_eq ! (
1003+ a_type. as_str( ) ,
1004+ * e_type,
1005+ "column {i} ({e_name}) type drifted from SCHEMA_VERSION = {SCHEMA_VERSION}; bump the constant and update this pin" ,
1006+ ) ;
1007+ }
1008+ }
1009+ }
1010+
1011+ // Runtime guard test: a meta table written by a binary that did NOT
1012+ // know about `schema_version` (the v1 pre-versioning shape) must cause
1013+ // `open_or_create` to wipe the cache directory so hydration rebuilds
1014+ // from TTL. Same path covers "future v3 wrote columns we don't know".
1015+ #[ tokio:: test]
1016+ async fn stale_meta_invalidates_cache_dir ( ) {
1017+ let tmp = std:: env:: temp_dir ( ) . join ( format ! (
1018+ "lance_cache_invalidate_{}" ,
1019+ std:: process:: id( )
1020+ ) ) ;
1021+ let _ = std:: fs:: remove_dir_all ( & tmp) ;
1022+ std:: fs:: create_dir_all ( & tmp) . unwrap ( ) ;
1023+ let writer = LanceWriter :: open_or_create ( & tmp) . await . unwrap ( ) ;
1024+
1025+ // Plant a fake v1-shaped meta (no schema_version column) and a
1026+ // dictionary dir; opening again must remove both.
1027+ let v1_meta_schema = Arc :: new ( ArrowSchema :: new ( vec ! [
1028+ Field :: new( "ttl_root_checksum" , DataType :: Utf8 , false ) ,
1029+ Field :: new(
1030+ "last_hydrated_at" ,
1031+ DataType :: Timestamp ( TimeUnit :: Microsecond , None ) ,
1032+ false ,
1033+ ) ,
1034+ Field :: new( "crate_version" , DataType :: Utf8 , false ) ,
1035+ ] ) ) ;
1036+ let batch = RecordBatch :: try_new (
1037+ v1_meta_schema. clone ( ) ,
1038+ vec ! [
1039+ Arc :: new( StringArray :: from( vec![ "pretend_v1_checksum" ] ) ) ,
1040+ Arc :: new( TimestampMicrosecondArray :: from( vec![ 0i64 ] ) ) ,
1041+ Arc :: new( StringArray :: from( vec![ "0.0.0" ] ) ) ,
1042+ ] ,
1043+ )
1044+ . unwrap ( ) ;
1045+ let reader = arrow:: record_batch:: RecordBatchIterator :: new (
1046+ vec ! [ Ok ( batch) ] . into_iter ( ) ,
1047+ v1_meta_schema,
1048+ ) ;
1049+ Dataset :: write (
1050+ reader,
1051+ writer. meta_path ( ) . to_string_lossy ( ) . as_ref ( ) ,
1052+ Some ( WriteParams {
1053+ mode : WriteMode :: Overwrite ,
1054+ ..Default :: default ( )
1055+ } ) ,
1056+ )
1057+ . await
1058+ . unwrap ( ) ;
1059+ std:: fs:: create_dir_all ( writer. dictionary_path ( ) ) . unwrap ( ) ;
1060+ std:: fs:: write ( writer. dictionary_path ( ) . join ( "sentinel" ) , b"x" ) . unwrap ( ) ;
1061+
1062+ // Re-open: the stale meta (no schema_version) must trigger
1063+ // invalidation of both dictionary and meta directories.
1064+ let _writer2 = LanceWriter :: open_or_create ( & tmp) . await . unwrap ( ) ;
1065+ assert ! (
1066+ !writer. dictionary_path( ) . exists( ) ,
1067+ "stale schema must wipe dictionary_path"
1068+ ) ;
1069+ assert ! (
1070+ !writer. meta_path( ) . exists( ) ,
1071+ "stale schema must wipe meta_path"
1072+ ) ;
1073+
1074+ let _ = std:: fs:: remove_dir_all ( & tmp) ;
1075+ }
8481076}
0 commit comments