diff --git a/rust/lance-namespace-impls/examples/manifest_bench.rs b/rust/lance-namespace-impls/examples/manifest_bench.rs index e84026a7382..c57f33d60ea 100644 --- a/rust/lance-namespace-impls/examples/manifest_bench.rs +++ b/rust/lance-namespace-impls/examples/manifest_bench.rs @@ -15,13 +15,13 @@ //! //! # Seed 500K rows directly into __manifest table //! manifest_bench seed-large --root s3://bucket/bench/scale \ -//! --count 500000 --inline-optimization true +//! --count 500000 --inline-optimization true --rich-metadata true //! //! # Run scale benchmark at 500K initial entries //! manifest_bench run --root s3://bucket/bench/scale \ //! --concurrency 1,10,100 --operations 200 -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::fs; use std::io::{BufRead, BufReader}; use std::path::{Path, PathBuf}; @@ -41,7 +41,10 @@ use lance_namespace::models::{ CreateNamespaceRequest, CreateTableRequest, DeclareTableRequest, DescribeTableRequest, ListNamespacesRequest, ListTablesRequest, }; -use lance_namespace_impls::DirectoryNamespaceBuilder; +use lance_namespace_impls::{ + DESCRIPTION_METADATA_KEY, DirectoryNamespace, DirectoryNamespaceBuilder, MetadataUpdate, + UpdateTableMetadataRequest, +}; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Clone)] @@ -162,7 +165,7 @@ async fn build_namespace( inline_optimization: bool, manifest_shard_count: usize, storage_options: &HashMap, -) -> Box { +) -> DirectoryNamespace { let mut properties = HashMap::new(); properties.insert("root".to_string(), root.to_string()); properties.insert("dir_listing_enabled".to_string(), "false".to_string()); @@ -181,7 +184,7 @@ async fn build_namespace( } let builder = DirectoryNamespaceBuilder::from_properties(properties, None) .expect("Failed to create namespace builder from properties"); - Box::new(builder.build().await.expect("Failed to build namespace")) + builder.build().await.expect("Failed to build namespace") } // ──────────────────── seed mode ──────────────────── @@ -192,6 +195,7 @@ async fn seed( inline_optimization: bool, manifest_shard_count: usize, storage_options: &HashMap, + rich_metadata: bool, ) { eprintln!("Seeding {} entries at {}", count, root); let ns = build_namespace( @@ -209,6 +213,10 @@ async fn seed( for i in 0..ns_count { let mut req = CreateNamespaceRequest::new(); req.id = Some(vec![format!("ns_{}", i)]); + if rich_metadata { + req.properties = + Some(serde_json::from_str(&rich_metadata_json(i, "namespace")).unwrap()); + } if let Err(e) = ns.create_namespace(req).await { eprintln!("seed ns_{}: {}", i, e); } @@ -219,6 +227,9 @@ async fn seed( for i in 0..table_count { let mut req = CreateTableRequest::new(); req.id = Some(vec![format!("table_{}", i)]); + if rich_metadata { + req.properties = Some(serde_json::from_str(&rich_metadata_json(i, "table")).unwrap()); + } if let Err(e) = ns.create_table(req, ipc_data.clone()).await { eprintln!("seed table_{}: {}", i, e); } @@ -253,6 +264,38 @@ fn stable_hash(value: &str) -> u64 { hash } +fn rich_metadata_json(row_index: usize, object_type: &str) -> String { + let description = format!( + "Catalog row {} for {} stores a carefully maintained analytical dataset used by product, research, and operations teams. The description is intentionally long enough to model user supplied catalog text while remaining deterministic across benchmark runs. It includes ownership context, lifecycle notes, discovery hints, and operational expectations for steady state metadata update measurements.", + row_index, object_type + ); + let mut metadata = BTreeMap::new(); + metadata.insert(DESCRIPTION_METADATA_KEY.to_string(), description); + for tag_idx in 0..10 { + metadata.insert( + format!("tag_{:02}", tag_idx), + format!("{}_{}_{}", object_type, row_index % 4096, tag_idx), + ); + } + serde_json::to_string(&metadata).expect("failed to serialize rich metadata") +} + +fn update_metadata(worker_id: usize, op_idx: usize, table_idx: usize) -> MetadataUpdate { + let description = format!( + "Updated catalog description for table {} from worker {} operation {}. This write changes the same JSON metadata object shape used during bootstrap and models a steady state user edit to long descriptive text plus several key value tags in the directory catalog. The payload remains intentionally verbose to reflect realistic catalog annotations that explain ownership, business purpose, freshness expectations, and operational review notes.", + table_idx, worker_id, op_idx + ); + let mut set = HashMap::new(); + set.insert(DESCRIPTION_METADATA_KEY.to_string(), description); + for tag_idx in 0..10 { + set.insert( + format!("tag_{:02}", tag_idx), + format!("updated_{}_{}_{}", worker_id, op_idx, tag_idx), + ); + } + MetadataUpdate { set, unset: vec![] } +} + fn manifest_shard_marker_row(shard_index: usize) -> ManifestSeedRow { ManifestSeedRow { object_id: format!("__manifest_shard_marker_{:06}", shard_index), @@ -262,14 +305,14 @@ fn manifest_shard_marker_row(shard_index: usize) -> ManifestSeedRow { } } -fn manifest_seed_row(i: usize, total_count: usize) -> ManifestSeedRow { +fn manifest_seed_row(i: usize, total_count: usize, rich_metadata: bool) -> ManifestSeedRow { let ns_count = total_count / 3; if i < ns_count { ManifestSeedRow { object_id: format!("ns_{}", i), object_type: "namespace".to_string(), location: None, - metadata: None, + metadata: rich_metadata.then(|| rich_metadata_json(i, "namespace")), } } else { let table_idx = i - ns_count; @@ -277,7 +320,11 @@ fn manifest_seed_row(i: usize, total_count: usize) -> ManifestSeedRow { object_id: format!("table_{}", table_idx), object_type: "table".to_string(), location: Some(format!("table_{}", table_idx)), - metadata: Some(r#"{"bench":"true"}"#.to_string()), + metadata: Some(if rich_metadata { + rich_metadata_json(i, "table") + } else { + r#"{"bench":"true"}"#.to_string() + }), } } } @@ -380,6 +427,7 @@ fn generate_manifest_batch( start_idx: usize, batch_size: usize, total_count: usize, + rich_metadata: bool, ) -> RecordBatch { let ns_count = total_count / 3; let actual_size = batch_size.min(total_count - start_idx); @@ -387,25 +435,29 @@ fn generate_manifest_batch( let mut object_ids = Vec::with_capacity(actual_size); let mut object_types = Vec::with_capacity(actual_size); let mut locations: Vec> = Vec::with_capacity(actual_size); - let mut metadatas: Vec> = Vec::with_capacity(actual_size); + let mut metadatas: Vec> = Vec::with_capacity(actual_size); for i in start_idx..start_idx + actual_size { if i < ns_count { object_ids.push(format!("ns_{}", i)); object_types.push("namespace".to_string()); locations.push(None); - metadatas.push(None); + metadatas.push(rich_metadata.then(|| rich_metadata_json(i, "namespace"))); } else { let table_idx = i - ns_count; object_ids.push(format!("table_{}", table_idx)); object_types.push("table".to_string()); locations.push(Some(format!("table_{}", table_idx))); - metadatas.push(Some(r#"{"bench":"true"}"#)); + metadatas.push(Some(if rich_metadata { + rich_metadata_json(i, "table") + } else { + r#"{"bench":"true"}"#.to_string() + })); } } let metadata_array = Arc::new( - JsonArray::try_from_iter(metadatas.into_iter()) + JsonArray::try_from_iter(metadatas.iter().map(|metadata| metadata.as_deref())) .expect("Failed to encode metadata as JSON") .into_inner(), ); @@ -428,6 +480,7 @@ async fn seed_large( inline_optimization: bool, manifest_shard_count: usize, storage_options: &HashMap, + rich_metadata: bool, ) { if manifest_shard_count > 0 { seed_large_sharded( @@ -436,6 +489,7 @@ async fn seed_large( inline_optimization, manifest_shard_count, storage_options, + rich_metadata, ) .await; return; @@ -454,7 +508,13 @@ async fn seed_large( let mut offset = 0; while offset < count { let batch_size = SEED_LARGE_BATCH_SIZE.min(count - offset); - batches.push(generate_manifest_batch(&schema, offset, batch_size, count)); + batches.push(generate_manifest_batch( + &schema, + offset, + batch_size, + count, + rich_metadata, + )); offset += batch_size; } eprintln!(" generated {} batches", batches.len()); @@ -500,6 +560,7 @@ async fn seed_large_sharded( inline_optimization: bool, manifest_shard_count: usize, storage_options: &HashMap, + rich_metadata: bool, ) { eprintln!( "Seed-large: writing {} rows directly across {} manifest fragments", @@ -511,7 +572,7 @@ async fn seed_large_sharded( (0..manifest_shard_count).map(|_| Vec::new()).collect(); for i in 0..count { - let row = manifest_seed_row(i, count); + let row = manifest_seed_row(i, count, rich_metadata); let shard_index = stable_hash(&row.object_id) as usize % manifest_shard_count; rows_by_shard[shard_index].push(row); } @@ -579,8 +640,7 @@ async fn worker( // Warmup (only for warm-read operations) if operation.starts_with("warm-read") { for _ in 0..warmup { - let _ = - run_operation(ns.as_ref(), operation, worker_id, 0, table_count, &ipc_data).await; + let _ = run_operation(&ns, operation, worker_id, 0, table_count, &ipc_data).await; } } @@ -588,7 +648,7 @@ async fn worker( for i in 0..operations { let start = Instant::now(); - let err = run_operation(ns.as_ref(), operation, worker_id, i, table_count, &ipc_data) + let err = run_operation(&ns, operation, worker_id, i, table_count, &ipc_data) .await .is_err(); let latency_ms = start.elapsed().as_secs_f64() * 1000.0; @@ -602,7 +662,7 @@ async fn worker( } async fn run_operation( - ns: &dyn LanceNamespace, + ns: &DirectoryNamespace, operation: &str, worker_id: usize, op_idx: usize, @@ -645,6 +705,14 @@ async fn run_operation( }; ns.declare_table(req).await?; } + "write-update-table-metadata" => { + let table_idx = (worker_id * 1000 + op_idx) % table_count.max(1); + ns.update_table_metadata(UpdateTableMetadataRequest { + id: vec![format!("table_{}", table_idx)], + update: update_metadata(worker_id, op_idx, table_idx), + }) + .await?; + } _ => { return Err(format!("unknown operation: {}", operation).into()); } @@ -681,7 +749,7 @@ async fn cold_read_worker( storage_options, ) .await; - let err = run_operation(ns.as_ref(), operation, worker_id, i, table_count, &ipc_data) + let err = run_operation(&ns, operation, worker_id, i, table_count, &ipc_data) .await .is_err(); let latency_ms = start.elapsed().as_secs_f64() * 1000.0; @@ -902,6 +970,7 @@ async fn main() { let mut ready_path: Option = None; let mut start_path: Option = None; let mut storage_options: HashMap = HashMap::new(); + let mut rich_metadata = false; let mut i = 2; while i < args.len() { @@ -950,6 +1019,10 @@ async fn main() { manifest_shard_count = args[i + 1].parse().unwrap(); i += 2; } + "--rich-metadata" => { + rich_metadata = args[i + 1].parse().unwrap(); + i += 2; + } "--variant" => { variant = args[i + 1].clone(); i += 2; @@ -992,6 +1065,7 @@ async fn main() { inline_optimization, manifest_shard_count, &storage_options, + rich_metadata, ) .await; } @@ -1002,6 +1076,7 @@ async fn main() { inline_optimization, manifest_shard_count, &storage_options, + rich_metadata, ) .await; } @@ -1053,6 +1128,7 @@ async fn main() { "write-create-namespace", "write-declare-table", "write-create-table", + "write-update-table-metadata", ]; // If --operation is set, only run that one @@ -1067,6 +1143,7 @@ async fn main() { eprintln!("root: {}", root); eprintln!("inline_optimization: {}", inline_optimization); eprintln!("manifest_shard_count: {}", manifest_shard_count); + eprintln!("rich_metadata: {}", rich_metadata); eprintln!("initial_entries: {}", initial_entries); eprintln!("concurrency: {:?}", concurrency_list); eprintln!("operations per level: {}", operations); diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index cf8eb3804f5..6d87eefa186 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -104,6 +104,78 @@ impl OpsMetrics { } } +/// Metadata key used for the long description displayed separately by clients. +pub const DESCRIPTION_METADATA_KEY: &str = "description"; + +/// Set/unset update for manifest-backed table and namespace metadata. +/// +/// The manifest stores metadata as JSON. This update interprets the JSON object +/// as a string-to-string tag map. +#[derive(Debug, Clone, Default)] +pub struct MetadataUpdate { + /// Metadata keys to insert or replace. + pub set: HashMap, + /// Metadata keys to remove. + pub unset: Vec, +} + +impl MetadataUpdate { + fn validate(&self, target: &str) -> Result<()> { + if self.set.is_empty() && self.unset.is_empty() { + return Err(NamespaceError::InvalidInput { + message: format!("{} metadata update cannot be empty", target), + } + .into()); + } + + for key in &self.unset { + if self.set.contains_key(key) { + return Err(NamespaceError::InvalidInput { + message: format!( + "{} metadata key '{}' cannot be both set and unset", + target, key + ), + } + .into()); + } + } + + Ok(()) + } + + pub(crate) fn apply(&self, metadata: &mut HashMap) { + for key in &self.unset { + metadata.remove(key); + } + metadata.extend(self.set.clone()); + } +} + +/// Native directory namespace request for updating a table manifest metadata row. +#[derive(Debug, Clone, Default)] +pub struct UpdateTableMetadataRequest { + /// Table identifier, with the table name as the final path component. + pub id: Vec, + /// Metadata changes to apply to the table row. + pub update: MetadataUpdate, +} + +/// Native directory namespace request for updating a namespace manifest metadata row. +#[derive(Debug, Clone, Default)] +pub struct UpdateNamespaceMetadataRequest { + /// Namespace identifier. Root namespace metadata updates are not supported. + pub id: Vec, + /// Metadata changes to apply to the namespace row. + pub update: MetadataUpdate, +} + +/// Response returned after updating manifest-backed table or namespace metadata. +#[derive(Debug, Clone, Default)] +pub struct UpdateMetadataResponse { + /// Updated metadata properties, or `None` when all metadata keys were removed. + pub properties: Option>, +} + /// Result of checking table status atomically. /// /// This struct captures the state of a table directory in a single snapshot, @@ -886,6 +958,28 @@ impl ManifestCatalog { } } + async fn update_table_metadata( + &self, + table_id: &[String], + update: &MetadataUpdate, + ) -> Result>> { + match self { + Self::Single(ns) => ns.update_table_metadata(table_id, update).await, + Self::Sharded(ns) => ns.update_table_metadata(table_id, update).await, + } + } + + async fn update_namespace_metadata( + &self, + namespace_id: &[String], + update: &MetadataUpdate, + ) -> Result>> { + match self { + Self::Single(ns) => ns.update_namespace_metadata(namespace_id, update).await, + Self::Sharded(ns) => ns.update_namespace_metadata(namespace_id, update).await, + } + } + #[cfg(test)] async fn query_table_versions( &self, @@ -2553,6 +2647,65 @@ impl DirectoryNamespace { } } + /// Update metadata properties for a manifest-backed table row. + /// + /// The manifest metadata column remains JSON, and this API applies set/unset + /// changes to that JSON object as a string-to-string tag map. + pub async fn update_table_metadata( + &self, + request: UpdateTableMetadataRequest, + ) -> Result { + self.record_op("update_table_metadata"); + if request.id.is_empty() { + return Err(NamespaceError::InvalidInput { + message: "Table ID cannot be empty".to_string(), + } + .into()); + } + request.update.validate("table")?; + + let Some(ref manifest_ns) = self.manifest_ns else { + return Err(NamespaceError::Unsupported { + message: "Table metadata updates require manifest mode".to_string(), + } + .into()); + }; + + let properties = manifest_ns + .update_table_metadata(&request.id, &request.update) + .await?; + Ok(UpdateMetadataResponse { properties }) + } + + /// Update metadata properties for a manifest-backed namespace row. + /// + /// The root namespace has no manifest row and cannot be updated through this API. + pub async fn update_namespace_metadata( + &self, + request: UpdateNamespaceMetadataRequest, + ) -> Result { + self.record_op("update_namespace_metadata"); + if request.id.is_empty() { + return Err(NamespaceError::InvalidInput { + message: "Root namespace metadata cannot be updated".to_string(), + } + .into()); + } + request.update.validate("namespace")?; + + let Some(ref manifest_ns) = self.manifest_ns else { + return Err(NamespaceError::Unsupported { + message: "Namespace metadata updates require manifest mode".to_string(), + } + .into()); + }; + + let properties = manifest_ns + .update_namespace_metadata(&request.id, &request.update) + .await?; + Ok(UpdateMetadataResponse { properties }) + } + /// Increment the counter for an operation. fn record_op(&self, operation: &str) { if let Some(ref metrics) = self.ops_metrics { @@ -4681,6 +4834,326 @@ mod tests { } } + #[tokio::test] + async fn test_update_table_metadata_set_and_unset() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let namespace = DirectoryNamespaceBuilder::new(temp_path) + .dir_listing_enabled(false) + .build() + .await + .unwrap(); + + let mut properties = HashMap::new(); + properties.insert( + DESCRIPTION_METADATA_KEY.to_string(), + "Original table description".to_string(), + ); + properties.insert("owner".to_string(), "alice".to_string()); + properties.insert("remove_me".to_string(), "old".to_string()); + let mut create_request = CreateTableRequest::new(); + create_request.id = Some(vec!["metadata_table".to_string()]); + create_request.properties = Some(properties); + namespace + .create_table(create_request, Bytes::from(create_scalar_table_ipc_data())) + .await + .unwrap(); + + let mut set = HashMap::new(); + set.insert( + DESCRIPTION_METADATA_KEY.to_string(), + "Updated table description".to_string(), + ); + set.insert("owner".to_string(), "bob".to_string()); + set.insert("priority".to_string(), "high".to_string()); + + let response = namespace + .update_table_metadata(UpdateTableMetadataRequest { + id: vec!["metadata_table".to_string()], + update: MetadataUpdate { + set, + unset: vec!["remove_me".to_string()], + }, + }) + .await + .unwrap(); + let response_properties = response.properties.unwrap(); + assert_eq!( + response_properties.get(DESCRIPTION_METADATA_KEY), + Some(&"Updated table description".to_string()) + ); + assert_eq!(response_properties.get("owner"), Some(&"bob".to_string())); + assert_eq!( + response_properties.get("priority"), + Some(&"high".to_string()) + ); + assert!(!response_properties.contains_key("remove_me")); + + let describe = namespace + .describe_table(DescribeTableRequest { + id: Some(vec!["metadata_table".to_string()]), + ..Default::default() + }) + .await + .unwrap(); + assert_eq!(describe.properties, Some(response_properties)); + + let response = namespace + .update_table_metadata(UpdateTableMetadataRequest { + id: vec!["metadata_table".to_string()], + update: MetadataUpdate { + set: HashMap::new(), + unset: vec![ + DESCRIPTION_METADATA_KEY.to_string(), + "owner".to_string(), + "priority".to_string(), + ], + }, + }) + .await + .unwrap(); + assert_eq!(response.properties, None); + + let describe = namespace + .describe_table(DescribeTableRequest { + id: Some(vec!["metadata_table".to_string()]), + ..Default::default() + }) + .await + .unwrap(); + assert_eq!(describe.properties, None); + } + + #[tokio::test] + async fn test_update_namespace_metadata_set_and_unset() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let namespace = DirectoryNamespaceBuilder::new(temp_path) + .dir_listing_enabled(false) + .build() + .await + .unwrap(); + + let mut properties = HashMap::new(); + properties.insert( + DESCRIPTION_METADATA_KEY.to_string(), + "Original namespace description".to_string(), + ); + properties.insert("team".to_string(), "search".to_string()); + properties.insert("remove_me".to_string(), "old".to_string()); + let mut create_request = CreateNamespaceRequest::new(); + create_request.id = Some(vec!["workspace".to_string()]); + create_request.properties = Some(properties); + namespace.create_namespace(create_request).await.unwrap(); + + let mut set = HashMap::new(); + set.insert( + DESCRIPTION_METADATA_KEY.to_string(), + "Updated namespace description".to_string(), + ); + set.insert("team".to_string(), "platform".to_string()); + set.insert("env".to_string(), "prod".to_string()); + + let response = namespace + .update_namespace_metadata(UpdateNamespaceMetadataRequest { + id: vec!["workspace".to_string()], + update: MetadataUpdate { + set, + unset: vec!["remove_me".to_string()], + }, + }) + .await + .unwrap(); + let response_properties = response.properties.unwrap(); + assert_eq!( + response_properties.get(DESCRIPTION_METADATA_KEY), + Some(&"Updated namespace description".to_string()) + ); + assert_eq!( + response_properties.get("team"), + Some(&"platform".to_string()) + ); + assert_eq!(response_properties.get("env"), Some(&"prod".to_string())); + assert!(!response_properties.contains_key("remove_me")); + + let describe = namespace + .describe_namespace(DescribeNamespaceRequest { + id: Some(vec!["workspace".to_string()]), + ..Default::default() + }) + .await + .unwrap(); + assert_eq!(describe.properties, Some(response_properties)); + } + + #[tokio::test] + async fn test_update_metadata_missing_and_invalid_requests() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let namespace = DirectoryNamespaceBuilder::new(temp_path) + .dir_listing_enabled(false) + .build() + .await + .unwrap(); + + let mut set = HashMap::new(); + set.insert("owner".to_string(), "alice".to_string()); + let err = namespace + .update_table_metadata(UpdateTableMetadataRequest { + id: vec!["missing".to_string()], + update: MetadataUpdate { + set: set.clone(), + unset: vec![], + }, + }) + .await + .unwrap_err(); + assert!(err.to_string().contains("Table not found")); + + let err = namespace + .update_namespace_metadata(UpdateNamespaceMetadataRequest { + id: vec!["missing".to_string()], + update: MetadataUpdate { + set: set.clone(), + unset: vec![], + }, + }) + .await + .unwrap_err(); + assert!(err.to_string().contains("Namespace not found")); + + let err = namespace + .update_table_metadata(UpdateTableMetadataRequest { + id: vec!["missing".to_string()], + update: MetadataUpdate::default(), + }) + .await + .unwrap_err(); + assert!(err.to_string().contains("metadata update cannot be empty")); + + let err = namespace + .update_table_metadata(UpdateTableMetadataRequest { + id: vec!["missing".to_string()], + update: MetadataUpdate { + set, + unset: vec!["owner".to_string()], + }, + }) + .await + .unwrap_err(); + assert!(err.to_string().contains("cannot be both set and unset")); + + let mut list_request = ListTablesRequest::new(); + list_request.id = Some(vec![]); + assert!( + namespace + .list_tables(list_request) + .await + .unwrap() + .tables + .is_empty() + ); + } + + #[tokio::test] + async fn test_update_namespace_metadata_rejects_root_and_directory_only() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let namespace = DirectoryNamespaceBuilder::new(temp_path) + .build() + .await + .unwrap(); + + let mut set = HashMap::new(); + set.insert("owner".to_string(), "alice".to_string()); + let err = namespace + .update_namespace_metadata(UpdateNamespaceMetadataRequest { + id: vec![], + update: MetadataUpdate { + set: set.clone(), + unset: vec![], + }, + }) + .await + .unwrap_err(); + assert!( + err.to_string() + .contains("Root namespace metadata cannot be updated") + ); + + let dir_only = DirectoryNamespaceBuilder::new(temp_path) + .manifest_enabled(false) + .build() + .await + .unwrap(); + let err = dir_only + .update_table_metadata(UpdateTableMetadataRequest { + id: vec!["table".to_string()], + update: MetadataUpdate { set, unset: vec![] }, + }) + .await + .unwrap_err(); + assert!(err.to_string().contains("require manifest mode")); + } + + #[tokio::test] + async fn test_sharded_update_table_metadata_replaces_one_fragment() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let namespace = create_sharded_namespace(temp_path).await; + + create_scalar_table(&namespace, "alpha").await; + create_scalar_table(&namespace, "beta").await; + + let before = load_sharded_manifest(temp_path).await; + assert_eq!(before.count_fragments(), 4); + let before_files = fragment_data_files(&before); + + let mut set = HashMap::new(); + set.insert( + DESCRIPTION_METADATA_KEY.to_string(), + "Sharded table description".to_string(), + ); + set.insert("owner".to_string(), "alice".to_string()); + namespace + .update_table_metadata(UpdateTableMetadataRequest { + id: vec!["alpha".to_string()], + update: MetadataUpdate { set, unset: vec![] }, + }) + .await + .unwrap(); + + let after = load_sharded_manifest(temp_path).await; + assert_eq!(after.count_fragments(), 4); + let after_files = fragment_data_files(&after); + let changed_fragments = before_files + .iter() + .filter(|(fragment_id, before_files)| { + after_files + .get(fragment_id) + .is_some_and(|after_files| after_files != *before_files) + }) + .count(); + assert_eq!( + changed_fragments, 1, + "metadata update should replace one manifest shard fragment" + ); + + let describe = namespace + .describe_table(DescribeTableRequest { + id: Some(vec!["alpha".to_string()]), + ..Default::default() + }) + .await + .unwrap(); + let properties = describe.properties.unwrap(); + assert_eq!( + properties.get(DESCRIPTION_METADATA_KEY), + Some(&"Sharded table description".to_string()) + ); + assert_eq!(properties.get("owner"), Some(&"alice".to_string())); + } + #[tokio::test] async fn test_sharded_table_version_storage_records_versions() { use futures::TryStreamExt; diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index 8c62564db62..188da41a58a 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -68,6 +68,8 @@ use std::{ use tokio::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; use uuid::Uuid; +use super::MetadataUpdate; + const MANIFEST_TABLE_NAME: &str = "__manifest"; const DELIMITER: &str = "$"; /// Bounded concurrency for per-table `_versions/` probes when filtering declared tables. @@ -524,6 +526,84 @@ impl ManifestStreamMutation for DeleteObjectMutation { } } +struct UpdateMetadataMutation { + object_id: String, + object_type: ObjectType, + update: MetadataUpdate, + updated: Option>>, + changed: bool, +} + +impl ManifestStreamMutation for UpdateMetadataMutation { + type Output = Option>>; + + fn process_existing_row( + &mut self, + row: ManifestRowValue, + output: &mut ManifestBatchBuilder, + index_data: &mut ManifestIndexAccumulator, + ) -> Result<()> { + if row.object_id == self.object_id && row.object_type == self.object_type { + let mut metadata = ManifestNamespace::deserialize_metadata( + row.metadata.as_deref(), + row.object_type.as_str(), + &row.object_id, + )?; + let original_metadata = metadata.clone(); + self.update.apply(&mut metadata); + + let serialized = ManifestNamespace::serialize_metadata( + Some(&metadata), + row.object_type.as_str(), + &row.object_id, + )?; + output.append( + index_data, + ManifestOutputRow { + object_id: &row.object_id, + object_type: row.object_type, + location: row.location.as_deref(), + metadata: serialized.as_deref(), + }, + )?; + + self.changed = original_metadata != metadata; + self.updated = Some((!metadata.is_empty()).then_some(metadata)); + return Ok(()); + } + + output.append( + index_data, + ManifestOutputRow { + object_id: &row.object_id, + object_type: row.object_type, + location: row.location.as_deref(), + metadata: row.metadata.as_deref(), + }, + ) + } + + fn append_rows( + &mut self, + _output: &mut ManifestBatchBuilder, + _index_data: &mut ManifestIndexAccumulator, + ) -> Result<()> { + Ok(()) + } + + fn finish(&self) -> CopyOnWriteMutation { + if let Some(updated) = self.updated.clone() { + if self.changed { + CopyOnWriteMutation::updated(Some(updated)) + } else { + CopyOnWriteMutation::unchanged(Some(updated)) + } + } else { + CopyOnWriteMutation::unchanged(None) + } + } +} + enum DeleteTableVersionsTarget { ObjectIds(HashSet), Ranges(Vec), @@ -2295,6 +2375,102 @@ impl ManifestNamespace { } } + fn deserialize_metadata( + metadata: Option<&str>, + object_type: &str, + object_id: &str, + ) -> Result> { + let Some(metadata) = metadata else { + return Ok(HashMap::new()); + }; + + serde_json::from_str::>(metadata).map_err(|e| { + LanceError::from(NamespaceError::Internal { + message: format!( + "Failed to deserialize {} metadata for '{}': {}", + object_type, object_id, e + ), + }) + }) + } + + async fn update_manifest_metadata( + &self, + object_id: &str, + object_type: ObjectType, + update: &MetadataUpdate, + ) -> Result>>> { + if let Some(shard_count) = self.shard_count() { + let shard_index = Self::shard_index_for_key(shard_count, object_id); + return self + .rewrite_manifest_shard( + shard_index, + "Failed to update manifest metadata shard", + || UpdateMetadataMutation { + object_id: object_id.to_string(), + object_type, + update: update.clone(), + updated: None, + changed: false, + }, + ) + .await; + } + + self.rewrite_manifest("Failed to update manifest metadata", || { + UpdateMetadataMutation { + object_id: object_id.to_string(), + object_type, + update: update.clone(), + updated: None, + changed: false, + } + }) + .await + } + + pub async fn update_table_metadata( + &self, + table_id: &[String], + update: &MetadataUpdate, + ) -> Result>> { + if table_id.is_empty() { + return Err(NamespaceError::InvalidInput { + message: "Table ID cannot be empty".to_string(), + } + .into()); + } + + let object_id = Self::str_object_id(table_id); + self.update_manifest_metadata(&object_id, ObjectType::Table, update) + .await? + .ok_or_else(|| { + lance_core::Error::from(NamespaceError::TableNotFound { + message: Self::format_table_id(table_id), + }) + }) + } + + pub async fn update_namespace_metadata( + &self, + namespace_id: &[String], + update: &MetadataUpdate, + ) -> Result>> { + if namespace_id.is_empty() { + return Err(NamespaceError::InvalidInput { + message: "Root namespace metadata cannot be updated".to_string(), + } + .into()); + } + + let object_id = namespace_id.join(DELIMITER); + self.update_manifest_metadata(&object_id, ObjectType::Namespace, update) + .await? + .ok_or_else(|| { + lance_core::Error::from(NamespaceError::NamespaceNotFound { message: object_id }) + }) + } + pub(crate) async fn path_has_actual_manifests( object_store: &ObjectStore, table_path: &Path, @@ -3289,6 +3465,24 @@ impl ShardedManifestNamespace { .await } + pub async fn update_table_metadata( + &self, + table_id: &[String], + update: &MetadataUpdate, + ) -> Result>> { + self.manifest.update_table_metadata(table_id, update).await + } + + pub async fn update_namespace_metadata( + &self, + namespace_id: &[String], + update: &MetadataUpdate, + ) -> Result>> { + self.manifest + .update_namespace_metadata(namespace_id, update) + .await + } + pub async fn query_table_versions( &self, object_id: &str, diff --git a/rust/lance-namespace-impls/src/lib.rs b/rust/lance-namespace-impls/src/lib.rs index 58e29aca5ef..a5344c52db1 100644 --- a/rust/lance-namespace-impls/src/lib.rs +++ b/rust/lance-namespace-impls/src/lib.rs @@ -86,7 +86,9 @@ pub mod rest_adapter; pub use connect::ConnectBuilder; pub use context::{DynamicContextProvider, OperationInfo}; pub use dir::{ - DirectoryNamespace, DirectoryNamespaceBuilder, OpsMetrics, manifest::ManifestNamespace, + DESCRIPTION_METADATA_KEY, DirectoryNamespace, DirectoryNamespaceBuilder, MetadataUpdate, + OpsMetrics, UpdateMetadataResponse, UpdateNamespaceMetadataRequest, UpdateTableMetadataRequest, + manifest::ManifestNamespace, }; // Re-export credential vending