diff --git a/rust/lance-namespace-impls/examples/manifest_bench.rs b/rust/lance-namespace-impls/examples/manifest_bench.rs index 8df657dfcd8..e84026a7382 100644 --- a/rust/lance-namespace-impls/examples/manifest_bench.rs +++ b/rust/lance-namespace-impls/examples/manifest_bench.rs @@ -22,15 +22,18 @@ //! --concurrency 1,10,100 --operations 200 use std::collections::HashMap; +use std::fs; use std::io::{BufRead, BufReader}; +use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::thread; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use arrow::array::{RecordBatch, RecordBatchIterator, StringArray}; use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use bytes::Bytes; -use lance::dataset::{InsertBuilder, WriteMode, WriteParams}; +use lance::dataset::{InsertBuilder, ReadParams, WriteMode, WriteParams, builder::DatasetBuilder}; use lance_arrow::json::JsonArray; use lance_core::datatypes::LANCE_UNENFORCED_PRIMARY_KEY_POSITION; use lance_namespace::LanceNamespace; @@ -157,6 +160,7 @@ fn manifest_schema() -> Arc { async fn build_namespace( root: &str, inline_optimization: bool, + manifest_shard_count: usize, storage_options: &HashMap, ) -> Box { let mut properties = HashMap::new(); @@ -166,6 +170,12 @@ async fn build_namespace( "inline_optimization_enabled".to_string(), inline_optimization.to_string(), ); + if manifest_shard_count > 0 { + properties.insert( + "manifest_shard_count".to_string(), + manifest_shard_count.to_string(), + ); + } for (k, v) in storage_options { properties.insert(format!("storage.{}", k), v.clone()); } @@ -180,10 +190,17 @@ async fn seed( root: &str, count: usize, inline_optimization: bool, + manifest_shard_count: usize, storage_options: &HashMap, ) { eprintln!("Seeding {} entries at {}", count, root); - let ns = build_namespace(root, inline_optimization, storage_options).await; + let ns = build_namespace( + root, + inline_optimization, + manifest_shard_count, + storage_options, + ) + .await; let ipc_data = Bytes::from(create_test_ipc_data()); let ns_count = count / 3; @@ -219,6 +236,144 @@ async fn seed( // Writes a __manifest Lance table directly with N rows, bypassing the namespace API. const SEED_LARGE_BATCH_SIZE: usize = 10_000; +const SEED_LARGE_SHARD_FLUSH_BYTES: usize = 0; +struct ManifestSeedRow { + object_id: String, + object_type: String, + location: Option, + metadata: Option, +} + +fn stable_hash(value: &str) -> u64 { + let mut hash = 0xcbf29ce484222325u64; + for byte in value.as_bytes() { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(0x100000001b3); + } + hash +} + +fn manifest_shard_marker_row(shard_index: usize) -> ManifestSeedRow { + ManifestSeedRow { + object_id: format!("__manifest_shard_marker_{:06}", shard_index), + object_type: "table_version".to_string(), + location: None, + metadata: None, + } +} + +fn manifest_seed_row(i: usize, total_count: usize) -> ManifestSeedRow { + let ns_count = total_count / 3; + if i < ns_count { + ManifestSeedRow { + object_id: format!("ns_{}", i), + object_type: "namespace".to_string(), + location: None, + metadata: None, + } + } else { + let table_idx = i - ns_count; + ManifestSeedRow { + object_id: format!("table_{}", table_idx), + object_type: "table".to_string(), + location: Some(format!("table_{}", table_idx)), + metadata: Some(r#"{"bench":"true"}"#.to_string()), + } + } +} + +fn manifest_batch_from_rows(schema: &Arc, rows: &[ManifestSeedRow]) -> RecordBatch { + let metadata_array = Arc::new( + JsonArray::try_from_iter(rows.iter().map(|row| row.metadata.as_deref())) + .expect("Failed to encode metadata as JSON") + .into_inner(), + ); + + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from( + rows.iter() + .map(|row| row.object_id.clone()) + .collect::>(), + )), + Arc::new(StringArray::from( + rows.iter() + .map(|row| row.object_type.clone()) + .collect::>(), + )), + Arc::new(StringArray::from( + rows.iter() + .map(|row| row.location.clone()) + .collect::>(), + )), + metadata_array, + ], + ) + .expect("Failed to create manifest batch") +} + +async fn write_manifest_dataset( + manifest_uri: &str, + batches: Vec, + schema: Arc, + storage_options: HashMap, + max_bytes_per_file: Option, +) { + let mut write_params = WriteParams { + mode: WriteMode::Create, + ..WriteParams::default() + }; + if let Some(max_bytes_per_file) = max_bytes_per_file { + write_params.max_bytes_per_file = max_bytes_per_file; + } + if !storage_options.is_empty() { + let accessor = Arc::new( + lance_io::object_store::StorageOptionsAccessor::with_static_options(storage_options), + ); + write_params.store_params = Some(lance_io::object_store::ObjectStoreParams { + storage_options_accessor: Some(accessor), + ..Default::default() + }); + } + + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema); + InsertBuilder::new(manifest_uri) + .with_params(&write_params) + .execute_stream(reader) + .await + .expect("Failed to write manifest dataset"); +} + +async fn validate_sharded_manifest_seed( + root: &str, + manifest_shard_count: usize, + storage_options: &HashMap, +) { + let mut read_params = ReadParams::default(); + if !storage_options.is_empty() { + let accessor = Arc::new( + lance_io::object_store::StorageOptionsAccessor::with_static_options( + storage_options.clone(), + ), + ); + read_params.store_options = Some(lance_io::object_store::ObjectStoreParams { + storage_options_accessor: Some(accessor), + ..Default::default() + }); + } + let manifest_uri = format!("{}/__manifest", root); + let dataset = DatasetBuilder::from_uri(manifest_uri) + .with_read_params(read_params) + .load() + .await + .expect("Failed to load seeded sharded manifest"); + assert_eq!( + dataset.manifest().fragments.len(), + manifest_shard_count, + "seed-large sharded manifest must contain exactly one fragment per shard" + ); +} fn generate_manifest_batch( schema: &Arc, @@ -271,8 +426,21 @@ async fn seed_large( root: &str, count: usize, inline_optimization: bool, + manifest_shard_count: usize, storage_options: &HashMap, ) { + if manifest_shard_count > 0 { + seed_large_sharded( + root, + count, + inline_optimization, + manifest_shard_count, + storage_options, + ) + .await; + return; + } + let manifest_uri = format!("{}/{}", root, "__manifest"); eprintln!( "Seed-large: writing {} rows directly to {}", @@ -291,28 +459,14 @@ async fn seed_large( } eprintln!(" generated {} batches", batches.len()); - let mut write_params = WriteParams { - mode: WriteMode::Create, - ..WriteParams::default() - }; - if !storage_options.is_empty() { - let accessor = Arc::new( - lance_io::object_store::StorageOptionsAccessor::with_static_options( - storage_options.clone(), - ), - ); - write_params.store_params = Some(lance_io::object_store::ObjectStoreParams { - storage_options_accessor: Some(accessor), - ..Default::default() - }); - } - - let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); - InsertBuilder::new(manifest_uri.as_str()) - .with_params(&write_params) - .execute_stream(reader) - .await - .expect("Failed to write manifest dataset"); + write_manifest_dataset( + manifest_uri.as_str(), + batches, + schema.clone(), + storage_options.clone(), + None, + ) + .await; eprintln!(" wrote Lance dataset"); @@ -320,7 +474,7 @@ async fn seed_large( if inline_optimization { eprintln!(" triggering initial CoW rewrite to build indices..."); let start = Instant::now(); - let ns = build_namespace(root, true, storage_options).await; + let ns = build_namespace(root, true, manifest_shard_count, storage_options).await; let mut req = CreateNamespaceRequest::new(); req.id = Some(vec!["__seed_trigger__".to_string()]); ns.create_namespace(req) @@ -340,6 +494,64 @@ async fn seed_large( ); } +async fn seed_large_sharded( + root: &str, + count: usize, + inline_optimization: bool, + manifest_shard_count: usize, + storage_options: &HashMap, +) { + eprintln!( + "Seed-large: writing {} rows directly across {} manifest fragments", + count, manifest_shard_count + ); + + let schema = manifest_schema(); + let mut rows_by_shard: Vec> = + (0..manifest_shard_count).map(|_| Vec::new()).collect(); + + for i in 0..count { + let row = manifest_seed_row(i, count); + let shard_index = stable_hash(&row.object_id) as usize % manifest_shard_count; + rows_by_shard[shard_index].push(row); + } + + let batches = rows_by_shard + .into_iter() + .enumerate() + .map(|(shard_index, rows)| { + let mut shard_rows = Vec::with_capacity(rows.len() + 1); + shard_rows.push(manifest_shard_marker_row(shard_index)); + shard_rows.extend(rows); + manifest_batch_from_rows(&schema, &shard_rows) + }) + .collect::>(); + + let manifest_uri = format!("{}/__manifest", root); + write_manifest_dataset( + &manifest_uri, + batches, + schema, + storage_options.clone(), + Some(SEED_LARGE_SHARD_FLUSH_BYTES), + ) + .await; + validate_sharded_manifest_seed(root, manifest_shard_count, storage_options).await; + + if inline_optimization { + eprintln!( + " sharded seed-large wrote raw manifest fragments; fragment writes do not rebuild manifest indices" + ); + } + + let ns_count = count / 3; + let table_count = count - ns_count; + eprintln!( + "Seed-large complete: {} total rows ({} namespaces, {} tables)", + count, ns_count, table_count + ); +} + // ──────────────────── worker mode ──────────────────── async fn worker( @@ -350,9 +562,18 @@ async fn worker( worker_id: usize, table_count: usize, inline_optimization: bool, + manifest_shard_count: usize, + ready_path: Option<&str>, + start_path: Option<&str>, storage_options: &HashMap, ) { - let ns = build_namespace(root, inline_optimization, storage_options).await; + let ns = build_namespace( + root, + inline_optimization, + manifest_shard_count, + storage_options, + ) + .await; let ipc_data = Bytes::from(create_test_ipc_data()); // Warmup (only for warm-read operations) @@ -363,6 +584,8 @@ async fn worker( } } + wait_for_coordinator(ready_path, start_path); + for i in 0..operations { let start = Instant::now(); let err = run_operation(ns.as_ref(), operation, worker_id, i, table_count, &ipc_data) @@ -439,14 +662,25 @@ async fn cold_read_worker( worker_id: usize, table_count: usize, inline_optimization: bool, + manifest_shard_count: usize, + ready_path: Option<&str>, + start_path: Option<&str>, storage_options: &HashMap, ) { let ipc_data = Bytes::from(create_test_ipc_data()); + wait_for_coordinator(ready_path, start_path); + for i in 0..operations { // Fresh namespace for each operation — simulates cold start let start = Instant::now(); - let ns = build_namespace(root, inline_optimization, storage_options).await; + let ns = build_namespace( + root, + inline_optimization, + manifest_shard_count, + storage_options, + ) + .await; let err = run_operation(ns.as_ref(), operation, worker_id, i, table_count, &ipc_data) .await .is_err(); @@ -460,6 +694,18 @@ async fn cold_read_worker( } } +fn wait_for_coordinator(ready_path: Option<&str>, start_path: Option<&str>) { + if let Some(ready_path) = ready_path { + fs::write(ready_path, b"ready").expect("failed to write worker ready marker"); + } + if let Some(start_path) = start_path { + let start_path = Path::new(start_path); + while !start_path.exists() { + thread::sleep(Duration::from_millis(10)); + } + } +} + // ──────────────────── run mode (coordinator) ──────────────────── fn run_workers( @@ -472,6 +718,7 @@ fn run_workers( table_count: usize, initial_entries: usize, inline_optimization: bool, + manifest_shard_count: usize, variant: &str, storage_options: &HashMap, ) -> BenchResult { @@ -488,11 +735,16 @@ fn run_workers( ); } - let wall_start = Instant::now(); + let sync_dir = create_sync_dir(); + let start_path = sync_dir.join("start"); + let ready_paths: Vec<_> = (0..concurrency) + .map(|worker_id| sync_dir.join(format!("ready_{}", worker_id))) + .collect(); - let children: Vec<_> = (0..concurrency) + let mut children: Vec<_> = (0..concurrency) .map(|worker_id| { let mut cmd = Command::new(self_exe); + let unique_worker_id = concurrency * 1_000_000 + worker_id; cmd.arg("worker") .arg("--root") .arg(root) @@ -503,11 +755,17 @@ fn run_workers( .arg("--warmup") .arg(warmup.to_string()) .arg("--worker-id") - .arg(worker_id.to_string()) + .arg(unique_worker_id.to_string()) .arg("--table-count") .arg(table_count.to_string()) .arg("--inline-optimization") - .arg(inline_optimization.to_string()); + .arg(inline_optimization.to_string()) + .arg("--manifest-shard-count") + .arg(manifest_shard_count.to_string()) + .arg("--ready-path") + .arg(ready_paths[worker_id].to_string_lossy().to_string()) + .arg("--start-path") + .arg(start_path.to_string_lossy().to_string()); for (k, v) in storage_options { cmd.arg("--storage-option").arg(format!("{}={}", k, v)); } @@ -518,29 +776,50 @@ fn run_workers( }) .collect(); - let mut all_latencies = Vec::new(); - let mut total_errors = 0; + let output_readers: Vec<_> = children + .iter_mut() + .map(|child| { + let stdout = child.stdout.take().unwrap(); + thread::spawn(move || { + let reader = BufReader::new(stdout); + let mut latencies = Vec::new(); + let mut errors = 0; + for line in reader.lines() { + let line = line.expect("failed to read worker output"); + if let Ok(record) = serde_json::from_str::(&line) { + if record.error { + errors += 1; + } else { + latencies.push(record.latency_ms); + } + } + } + (latencies, errors) + }) + }) + .collect(); + + wait_for_workers_ready(&mut children, &ready_paths); + let wall_start = Instant::now(); + fs::write(&start_path, b"start").expect("failed to write worker start marker"); for mut child in children { - let stdout = child.stdout.take().unwrap(); - let reader = BufReader::new(stdout); - for line in reader.lines() { - let line = line.expect("failed to read worker output"); - if let Ok(record) = serde_json::from_str::(&line) { - if record.error { - total_errors += 1; - } else { - all_latencies.push(record.latency_ms); - } - } - } let status = child.wait().expect("failed to wait for worker"); if !status.success() { eprintln!("Worker exited with status: {}", status); } } + let mut all_latencies = Vec::new(); + let mut total_errors = 0; + for reader in output_readers { + let (mut latencies, errors) = reader.join().expect("failed to join worker output reader"); + all_latencies.append(&mut latencies); + total_errors += errors; + } + let wall_duration = wall_start.elapsed(); + let _ = fs::remove_dir_all(&sync_dir); compute_result( variant, operation, @@ -552,6 +831,44 @@ fn run_workers( ) } +fn create_sync_dir() -> PathBuf { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system clock is before unix epoch") + .as_nanos(); + let dir = std::env::temp_dir().join(format!( + "manifest_bench_sync_{}_{}", + std::process::id(), + now + )); + fs::create_dir_all(&dir).expect("failed to create worker sync directory"); + dir +} + +fn wait_for_workers_ready(children: &mut [std::process::Child], ready_paths: &[PathBuf]) { + let start = Instant::now(); + loop { + if ready_paths.iter().all(|path| path.exists()) { + return; + } + + for child in children.iter_mut() { + if let Some(status) = child + .try_wait() + .expect("failed to check worker readiness status") + { + panic!("worker exited before benchmark start: {}", status); + } + } + + if start.elapsed() > Duration::from_secs(30 * 60) { + panic!("timed out waiting for workers to open namespace"); + } + + thread::sleep(Duration::from_millis(50)); + } +} + fn parse_concurrency_list(s: &str) -> Vec { s.split(',') .filter_map(|v| v.trim().parse::().ok()) @@ -580,7 +897,10 @@ async fn main() { let mut table_count: usize = 667; // default for 1000 seed: 1000 - 1000/3 let mut initial_entries: usize = 0; let mut inline_optimization = true; + let mut manifest_shard_count: usize = 0; let mut variant = String::new(); + let mut ready_path: Option = None; + let mut start_path: Option = None; let mut storage_options: HashMap = HashMap::new(); let mut i = 2; @@ -626,10 +946,22 @@ async fn main() { inline_optimization = args[i + 1].parse().unwrap(); i += 2; } + "--manifest-shard-count" => { + manifest_shard_count = args[i + 1].parse().unwrap(); + i += 2; + } "--variant" => { variant = args[i + 1].clone(); i += 2; } + "--ready-path" => { + ready_path = Some(args[i + 1].clone()); + i += 2; + } + "--start-path" => { + start_path = Some(args[i + 1].clone()); + i += 2; + } "--storage-option" => { let kv = &args[i + 1]; if let Some((k, v)) = kv.split_once('=') { @@ -654,10 +986,24 @@ async fn main() { match mode { "seed" => { - seed(&root, count, inline_optimization, &storage_options).await; + seed( + &root, + count, + inline_optimization, + manifest_shard_count, + &storage_options, + ) + .await; } "seed-large" => { - seed_large(&root, count, inline_optimization, &storage_options).await; + seed_large( + &root, + count, + inline_optimization, + manifest_shard_count, + &storage_options, + ) + .await; } "worker" => { if operation.starts_with("cold-read") { @@ -668,6 +1014,9 @@ async fn main() { worker_id, table_count, inline_optimization, + manifest_shard_count, + ready_path.as_deref(), + start_path.as_deref(), &storage_options, ) .await; @@ -680,6 +1029,9 @@ async fn main() { worker_id, table_count, inline_optimization, + manifest_shard_count, + ready_path.as_deref(), + start_path.as_deref(), &storage_options, ) .await; @@ -714,6 +1066,7 @@ async fn main() { eprintln!("variant: {}", variant); eprintln!("root: {}", root); eprintln!("inline_optimization: {}", inline_optimization); + eprintln!("manifest_shard_count: {}", manifest_shard_count); eprintln!("initial_entries: {}", initial_entries); eprintln!("concurrency: {:?}", concurrency_list); eprintln!("operations per level: {}", operations); @@ -734,6 +1087,7 @@ async fn main() { table_count, initial_entries, inline_optimization, + manifest_shard_count, &variant, &storage_options, ); diff --git a/rust/lance-namespace-impls/src/dir.rs b/rust/lance-namespace-impls/src/dir.rs index 09145b6a4b6..cf8eb3804f5 100644 --- a/rust/lance-namespace-impls/src/dir.rs +++ b/rust/lance-namespace-impls/src/dir.rs @@ -38,7 +38,7 @@ use lance_table::io::commit::{ManifestNamingScheme, VERSIONS_DIR}; use object_store::ObjectStoreExt; use object_store::path::Path; use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, PutMode, PutOptions}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::io::Cursor; use std::sync::{Arc, Mutex}; @@ -192,6 +192,9 @@ pub struct DirectoryNamespaceBuilder { /// When true, table versions are stored in the `__manifest` table instead of /// relying on Lance's native version management. table_version_storage_enabled: bool, + /// Number of manifest fragments used as shards. Zero means a single-fragment + /// `__manifest` table. + manifest_shard_count: usize, /// When true, enables migration mode where the namespace checks the manifest first /// before falling back to directory listing for root-level tables. When false (default), /// root-level tables use directory listing directly without checking the manifest, @@ -231,6 +234,7 @@ impl std::fmt::Debug for DirectoryNamespaceBuilder { "table_version_storage_enabled", &self.table_version_storage_enabled, ) + .field("manifest_shard_count", &self.manifest_shard_count) .field( "dir_listing_to_manifest_migration_enabled", &self.dir_listing_to_manifest_migration_enabled, @@ -268,6 +272,7 @@ impl DirectoryNamespaceBuilder { inline_optimization_enabled: true, table_version_tracking_enabled: false, // Default to disabled table_version_storage_enabled: false, // Default to disabled + manifest_shard_count: 0, dir_listing_to_manifest_migration_enabled: false, // Default to disabled credential_vendor_properties: HashMap::new(), context_provider: None, @@ -341,6 +346,15 @@ impl DirectoryNamespaceBuilder { self } + /// Configure the number of manifest shards. + /// + /// A value of 0 keeps a single-fragment `__manifest` table. Values greater + /// than 0 use one `__manifest` table with that many fragments. + pub fn manifest_shard_count(mut self, shard_count: usize) -> Self { + self.manifest_shard_count = shard_count; + self + } + /// Create a DirectoryNamespaceBuilder from properties HashMap. /// /// This method parses a properties map into builder configuration. @@ -464,6 +478,11 @@ impl DirectoryNamespaceBuilder { .and_then(|v| v.parse::().ok()) .unwrap_or(false); + let manifest_shard_count = properties + .get("manifest_shard_count") + .and_then(|v| v.parse::().ok()) + .unwrap_or(0); + // Extract dir_listing_to_manifest_migration_enabled (default: false) let dir_listing_to_manifest_migration_enabled = properties .get("dir_listing_to_manifest_migration_enabled") @@ -511,6 +530,7 @@ impl DirectoryNamespaceBuilder { inline_optimization_enabled, table_version_tracking_enabled, table_version_storage_enabled, + manifest_shard_count, dir_listing_to_manifest_migration_enabled, credential_vendor_properties, context_provider: None, @@ -699,20 +719,42 @@ impl DirectoryNamespaceBuilder { Self::initialize_object_store(&self.root, &self.storage_options, &self.session).await?; let manifest_ns = if self.manifest_enabled { - match manifest::ManifestNamespace::from_directory( - self.root.clone(), - self.storage_options.clone(), - self.session.clone(), - object_store.clone(), - base_path.clone(), - self.dir_listing_enabled, - self.inline_optimization_enabled, - self.commit_retries, - self.table_version_storage_enabled, - ) - .await - { + let manifest_result = if self.manifest_shard_count > 0 { + manifest::ShardedManifestNamespace::from_directory( + self.root.clone(), + self.storage_options.clone(), + self.session.clone(), + object_store.clone(), + base_path.clone(), + self.dir_listing_enabled, + self.inline_optimization_enabled, + self.commit_retries, + self.table_version_storage_enabled, + self.manifest_shard_count, + ) + .await + .map(|ns| ManifestCatalog::Sharded(Arc::new(ns))) + } else { + manifest::ManifestNamespace::from_directory( + self.root.clone(), + self.storage_options.clone(), + self.session.clone(), + object_store.clone(), + base_path.clone(), + self.dir_listing_enabled, + self.inline_optimization_enabled, + self.commit_retries, + self.table_version_storage_enabled, + ) + .await + .map(|ns| ManifestCatalog::Single(Arc::new(ns))) + }; + + match manifest_result { Ok(ns) => Some(Arc::new(ns)), + Err(e) if self.manifest_shard_count > 0 => { + return Err(e); + } Err(e) => { // Failed to initialize manifest namespace, fall back to directory listing only log::warn!( @@ -805,6 +847,226 @@ impl DirectoryNamespaceBuilder { /// /// ## Manifest-based Listing /// +enum ManifestCatalog { + Single(Arc), + Sharded(Arc), +} + +impl std::fmt::Debug for ManifestCatalog { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Single(ns) => f.debug_tuple("Single").field(ns).finish(), + Self::Sharded(ns) => f.debug_tuple("Sharded").field(ns).finish(), + } + } +} + +impl ManifestCatalog { + async fn list_manifest_table_locations(&self) -> Result> { + match self { + Self::Single(ns) => ns.list_manifest_table_locations().await, + Self::Sharded(ns) => ns.list_manifest_table_locations().await, + } + } + + async fn register_table(&self, name: &str, location: String) -> Result<()> { + match self { + Self::Single(ns) => ns.register_table(name, location).await, + Self::Sharded(ns) => ns.register_table(name, location).await, + } + } + + async fn insert_into_manifest_with_metadata( + &self, + entries: Vec, + ) -> Result<()> { + match self { + Self::Single(ns) => ns.insert_into_manifest_with_metadata(entries).await, + Self::Sharded(ns) => ns.insert_into_manifest_with_metadata(entries).await, + } + } + + #[cfg(test)] + async fn query_table_versions( + &self, + object_id: &str, + descending: bool, + limit: Option, + ) -> Result> { + match self { + Self::Single(ns) => ns.query_table_versions(object_id, descending, limit).await, + Self::Sharded(ns) => ns.query_table_versions(object_id, descending, limit).await, + } + } + + #[cfg(test)] + async fn delete_table_versions(&self, object_id: &str, ranges: &[(i64, i64)]) -> Result { + match self { + Self::Single(ns) => ns.delete_table_versions(object_id, ranges).await, + Self::Sharded(ns) => { + ns.batch_delete_table_versions_by_ranges(&[( + object_id.to_string(), + ranges.to_vec(), + )]) + .await + } + } + } + + async fn list_table_versions( + &self, + table_id: &[String], + descending: bool, + limit: Option, + ) -> Result { + match self { + Self::Single(ns) => ns.list_table_versions(table_id, descending, limit).await, + Self::Sharded(ns) => ns.list_table_versions(table_id, descending, limit).await, + } + } + + async fn describe_table_version( + &self, + table_id: &[String], + version: i64, + ) -> Result { + match self { + Self::Single(ns) => ns.describe_table_version(table_id, version).await, + Self::Sharded(ns) => ns.describe_table_version(table_id, version).await, + } + } + + async fn batch_delete_table_versions_by_ranges( + &self, + table_ranges: &[(String, Vec<(i64, i64)>)], + ) -> Result { + match self { + Self::Single(ns) => ns.batch_delete_table_versions_by_ranges(table_ranges).await, + Self::Sharded(ns) => ns.batch_delete_table_versions_by_ranges(table_ranges).await, + } + } +} + +#[async_trait] +impl LanceNamespace for ManifestCatalog { + fn namespace_id(&self) -> String { + match self { + Self::Single(ns) => ns.namespace_id(), + Self::Sharded(ns) => ns.namespace_id(), + } + } + + async fn list_namespaces( + &self, + request: ListNamespacesRequest, + ) -> Result { + match self { + Self::Single(ns) => ns.list_namespaces(request).await, + Self::Sharded(ns) => ns.list_namespaces(request).await, + } + } + + async fn describe_namespace( + &self, + request: DescribeNamespaceRequest, + ) -> Result { + match self { + Self::Single(ns) => ns.describe_namespace(request).await, + Self::Sharded(ns) => ns.describe_namespace(request).await, + } + } + + async fn create_namespace( + &self, + request: CreateNamespaceRequest, + ) -> Result { + match self { + Self::Single(ns) => ns.create_namespace(request).await, + Self::Sharded(ns) => ns.create_namespace(request).await, + } + } + + async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result { + match self { + Self::Single(ns) => ns.drop_namespace(request).await, + Self::Sharded(ns) => ns.drop_namespace(request).await, + } + } + + async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> { + match self { + Self::Single(ns) => ns.namespace_exists(request).await, + Self::Sharded(ns) => ns.namespace_exists(request).await, + } + } + + async fn list_tables(&self, request: ListTablesRequest) -> Result { + match self { + Self::Single(ns) => ns.list_tables(request).await, + Self::Sharded(ns) => ns.list_tables(request).await, + } + } + + async fn describe_table(&self, request: DescribeTableRequest) -> Result { + match self { + Self::Single(ns) => ns.describe_table(request).await, + Self::Sharded(ns) => ns.describe_table(request).await, + } + } + + async fn table_exists(&self, request: TableExistsRequest) -> Result<()> { + match self { + Self::Single(ns) => ns.table_exists(request).await, + Self::Sharded(ns) => ns.table_exists(request).await, + } + } + + async fn drop_table(&self, request: DropTableRequest) -> Result { + match self { + Self::Single(ns) => ns.drop_table(request).await, + Self::Sharded(ns) => ns.drop_table(request).await, + } + } + + async fn create_table( + &self, + request: CreateTableRequest, + request_data: Bytes, + ) -> Result { + match self { + Self::Single(ns) => ns.create_table(request, request_data).await, + Self::Sharded(ns) => ns.create_table(request, request_data).await, + } + } + + async fn declare_table(&self, request: DeclareTableRequest) -> Result { + match self { + Self::Single(ns) => ns.declare_table(request).await, + Self::Sharded(ns) => ns.declare_table(request).await, + } + } + + async fn register_table( + &self, + request: lance_namespace::models::RegisterTableRequest, + ) -> Result { + match self { + Self::Single(ns) => LanceNamespace::register_table(ns.as_ref(), request).await, + Self::Sharded(ns) => LanceNamespace::register_table(ns.as_ref(), request).await, + } + } + + async fn deregister_table( + &self, + request: lance_namespace::models::DeregisterTableRequest, + ) -> Result { + match self { + Self::Single(ns) => LanceNamespace::deregister_table(ns.as_ref(), request).await, + Self::Sharded(ns) => LanceNamespace::deregister_table(ns.as_ref(), request).await, + } + } +} + /// When `manifest_enabled=true`, the namespace uses a special `__manifest` Lance table to track tables /// instead of scanning the filesystem. This provides: /// - Better performance for listing operations @@ -827,7 +1089,7 @@ pub struct DirectoryNamespace { session: Option>, object_store: Arc, base_path: Path, - manifest_ns: Option>, + manifest_ns: Option>, dir_listing_enabled: bool, /// When true, root-level table operations check the manifest first before /// falling back to directory listing. When false, root-level tables skip @@ -4322,6 +4584,181 @@ mod tests { .unwrap(); } + async fn create_sharded_namespace(temp_path: &str) -> DirectoryNamespace { + DirectoryNamespaceBuilder::new(temp_path) + .dir_listing_enabled(false) + .manifest_shard_count(4) + .build() + .await + .unwrap() + } + + async fn load_sharded_manifest(temp_path: &str) -> Dataset { + DatasetBuilder::from_uri(format!("{}/__manifest", temp_path)) + .load() + .await + .unwrap() + } + + fn fragment_data_files(dataset: &Dataset) -> HashMap> { + dataset + .manifest() + .fragments + .iter() + .map(|fragment| { + ( + fragment.id, + fragment + .files + .iter() + .map(|file| file.path.clone()) + .collect(), + ) + }) + .collect() + } + + #[tokio::test] + async fn test_sharded_manifest_table_operations() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let namespace = create_sharded_namespace(temp_path).await; + + create_scalar_table(&namespace, "alpha").await; + create_scalar_table(&namespace, "beta").await; + + let before = load_sharded_manifest(temp_path).await; + assert_eq!(before.count_fragments(), 4); + let before_files = fragment_data_files(&before); + + let response = namespace + .list_tables(ListTablesRequest { + id: Some(vec![]), + page_token: None, + limit: None, + ..Default::default() + }) + .await + .unwrap(); + let tables = response.tables.into_iter().collect::>(); + assert_eq!( + tables, + HashSet::from(["alpha".to_string(), "beta".to_string()]) + ); + + let describe = namespace + .describe_table(DescribeTableRequest { + id: Some(vec!["alpha".to_string()]), + ..Default::default() + }) + .await + .unwrap(); + assert_eq!(describe.table.as_deref(), Some("alpha")); + + create_scalar_table(&namespace, "gamma").await; + let after = load_sharded_manifest(temp_path).await; + assert_eq!(after.count_fragments(), 4); + let after_files = fragment_data_files(&after); + let changed_fragments = before_files + .iter() + .filter(|(fragment_id, before_files)| { + after_files + .get(fragment_id) + .is_some_and(|after_files| after_files != *before_files) + }) + .count(); + assert_eq!( + changed_fragments, 1, + "each sharded manifest write should replace one fragment" + ); + + for shard_index in 0..4 { + let shard_uri = format!("{}/__manifest_shard_{:06}", temp_path, shard_index); + assert!( + DatasetBuilder::from_uri(shard_uri).load().await.is_err(), + "sharded manifest mode should not create separate manifest tables" + ); + } + } + + #[tokio::test] + async fn test_sharded_table_version_storage_records_versions() { + use futures::TryStreamExt; + use lance_namespace::models::{CreateTableVersionRequest, ListTableVersionsRequest}; + + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + let namespace = DirectoryNamespaceBuilder::new(temp_path) + .dir_listing_enabled(false) + .manifest_shard_count(4) + .table_version_tracking_enabled(true) + .table_version_storage_enabled(true) + .build() + .await + .unwrap(); + + create_scalar_table(&namespace, "versions").await; + let describe = namespace + .describe_table(DescribeTableRequest { + id: Some(vec!["versions".to_string()]), + ..Default::default() + }) + .await + .unwrap(); + let table_uri = describe.location.unwrap(); + let dataset = Dataset::open(&table_uri).await.unwrap(); + let object_store = dataset.object_store(None).await.unwrap(); + let manifest_metas: Vec<_> = object_store + .inner + .list(Some(&dataset.versions_dir())) + .try_collect() + .await + .unwrap(); + let manifest_meta = manifest_metas + .iter() + .find(|meta| { + meta.location + .filename() + .is_some_and(|filename| filename.ends_with(".manifest")) + }) + .unwrap(); + let manifest_data = object_store + .inner + .get(&manifest_meta.location) + .await + .unwrap() + .bytes() + .await + .unwrap(); + let staging_path = dataset.versions_dir().join("staging_manifest"); + object_store + .inner + .put(&staging_path, manifest_data.into()) + .await + .unwrap(); + + let mut create_req = CreateTableVersionRequest::new(2, staging_path.to_string()); + create_req.id = Some(vec!["versions".to_string()]); + create_req.naming_scheme = Some("V2".to_string()); + namespace.create_table_version(create_req).await.unwrap(); + + let mut list_req = ListTableVersionsRequest::new(); + list_req.id = Some(vec!["versions".to_string()]); + let versions = namespace.list_table_versions(list_req).await.unwrap(); + assert_eq!(versions.versions.len(), 1); + assert_eq!(versions.versions[0].version, 2); + + let describe_version = namespace + .describe_table_version(DescribeTableVersionRequest { + id: Some(vec!["versions".to_string()]), + version: Some(2), + ..Default::default() + }) + .await + .unwrap(); + assert_eq!(describe_version.version.version, 2); + } + async fn open_dataset(namespace: &DirectoryNamespace, table_name: &str) -> Dataset { let mut describe_request = DescribeTableRequest::new(); describe_request.id = Some(vec![table_name.to_string()]); @@ -9194,20 +9631,22 @@ mod tests { let version = response.version.unwrap(); assert_eq!(version.version, 2); - // Verify the version is recorded in __manifest by querying it + // Verify the version is recorded in __manifest by listing versions + // through the same production path used by the namespace. let manifest_ns = namespace.manifest_ns.as_ref().unwrap(); - let table_id_str = manifest::ManifestNamespace::str_object_id(&table_id); let versions = manifest_ns - .query_table_versions(&table_id_str, false, None) + .list_table_versions(&table_id, false, None) .await .unwrap(); assert!( - !versions.is_empty(), + !versions.versions.is_empty(), "Version should be recorded in __manifest" ); - let (ver, _path) = &versions[0]; - assert_eq!(*ver, 2, "Recorded version should be 2"); + assert_eq!( + versions.versions[0].version, 2, + "Recorded version should be 2" + ); } } diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index 47b6b128863..8c62564db62 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -24,10 +24,10 @@ use futures::{ stream::{self, StreamExt}, }; use lance::dataset::index::LanceIndexStoreExt; -use lance::dataset::transaction::Operation; +use lance::dataset::transaction::{Operation, TransactionBuilder}; use lance::dataset::{ - InsertBuilder, ManifestWriteConfig, ReadParams, WhenMatched, WriteMode, WriteParams, - builder::DatasetBuilder, write_manifest_file, + CommitBuilder, InsertBuilder, ManifestWriteConfig, ReadParams, WhenMatched, WriteMode, + WriteParams, builder::DatasetBuilder, write_manifest_file, }; use lance::session::Session; use lance::{Dataset, dataset::scanner::Scanner}; @@ -54,7 +54,7 @@ use lance_namespace::models::{ TableVersion, }; use lance_namespace::schema::arrow_schema_to_json; -use lance_table::format::{IndexMetadata, Manifest}; +use lance_table::format::{Fragment, IndexMetadata, Manifest}; use lance_table::io::commit::CommitError; use object_store::{Error as ObjectStoreError, path::Path}; use roaring::RoaringBitmap; @@ -85,6 +85,7 @@ const METADATA_INDEX_NAME: &str = "metadata_fts"; // commit retry budget so multi-process namespace writes can make progress. const DEFAULT_MANIFEST_REWRITE_COMMIT_RETRIES: u32 = 20; const MANIFEST_INDEX_BATCH_SIZE: usize = 8192; +const MANIFEST_FRAGMENT_SHARD_MARKER_ROWS_PER_FILE: usize = 1; /// Object types that can be stored in the manifest #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -482,7 +483,7 @@ struct DeleteObjectMutation { } impl ManifestStreamMutation for DeleteObjectMutation { - type Output = (); + type Output = bool; fn process_existing_row( &mut self, @@ -516,9 +517,9 @@ impl ManifestStreamMutation for DeleteObjectMutation { fn finish(&self) -> CopyOnWriteMutation { if self.deleted { - CopyOnWriteMutation::updated(()) + CopyOnWriteMutation::updated(true) } else { - CopyOnWriteMutation::unchanged(()) + CopyOnWriteMutation::unchanged(false) } } } @@ -528,10 +529,13 @@ enum DeleteTableVersionsTarget { Ranges(Vec), } +type TableVersionRange = (i64, i64); +type TableVersionRangeRequest = (String, Vec); + #[derive(Clone)] struct DeleteTableVersionRangeTarget { object_id_prefix: String, - ranges: Vec<(i64, i64)>, + ranges: Vec, } impl DeleteTableVersionRangeTarget { @@ -761,6 +765,7 @@ impl DerefMut for DatasetWriteGuard<'_> { /// Uses a special `__manifest` Lance table to track tables and nested namespaces. pub struct ManifestNamespace { root: String, + manifest_table_name: String, storage_options: Option>, session: Option>, object_store: Arc, @@ -777,6 +782,10 @@ pub struct ManifestNamespace { /// Number of retries for commit operations on the manifest table. /// If None, defaults to [`lance_table::io::commit::CommitConfig`] default (20). commit_retries: Option, + /// When set, the single `__manifest` table is partitioned into this many + /// Lance fragments and each manifest mutation rewrites only the routed + /// fragment. + manifest_fragment_shard_count: Option, /// Serialize manifest mutations within a single namespace instance so concurrent /// create/drop calls do not compete with each other on the same in-memory snapshot. manifest_mutation_lock: Arc>, @@ -786,12 +795,17 @@ impl std::fmt::Debug for ManifestNamespace { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ManifestNamespace") .field("root", &self.root) + .field("manifest_table_name", &self.manifest_table_name) .field("storage_options", &self.storage_options) .field("dir_listing_enabled", &self.dir_listing_enabled) .field( "inline_optimization_enabled", &self.inline_optimization_enabled, ) + .field( + "manifest_fragment_shard_count", + &self.manifest_fragment_shard_count, + ) .finish() } } @@ -869,17 +883,54 @@ impl ManifestNamespace { inline_optimization_enabled: bool, commit_retries: Option, table_version_storage_enabled: bool, + ) -> Result { + Self::from_directory_with_manifest_table( + root, + MANIFEST_TABLE_NAME.to_string(), + storage_options, + session, + object_store, + base_path, + dir_listing_enabled, + inline_optimization_enabled, + commit_retries, + table_version_storage_enabled, + None, + ) + .await + } + + /// Create a namespace using a specific Lance table to store manifest rows. + /// + /// `root` remains the catalog root used for user table locations. The + /// `manifest_table_name` controls only the system table that stores catalog rows. + #[allow(clippy::too_many_arguments)] + pub async fn from_directory_with_manifest_table( + root: String, + manifest_table_name: String, + storage_options: Option>, + session: Option>, + object_store: Arc, + base_path: Path, + dir_listing_enabled: bool, + inline_optimization_enabled: bool, + commit_retries: Option, + table_version_storage_enabled: bool, + manifest_fragment_shard_count: Option, ) -> Result { let manifest_dataset = Self::ensure_manifest_table_up_to_date( &root, + &manifest_table_name, &storage_options, session.clone(), table_version_storage_enabled, + manifest_fragment_shard_count, ) .await?; Ok(Self { root, + manifest_table_name, storage_options, session, object_store, @@ -888,6 +939,7 @@ impl ManifestNamespace { dir_listing_enabled, inline_optimization_enabled, commit_retries, + manifest_fragment_shard_count, manifest_mutation_lock: Arc::new(Mutex::new(())), }) } @@ -919,6 +971,58 @@ impl ManifestNamespace { } } + fn stable_hash(value: &str) -> u64 { + let mut hash = 0xcbf29ce484222325u64; + for byte in value.as_bytes() { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(0x100000001b3); + } + hash + } + + fn table_version_table_id(version_object_id: &str) -> &str { + version_object_id + .rsplit_once(DELIMITER) + .map(|(table_id, _)| table_id) + .unwrap_or(version_object_id) + } + + fn table_version_table_id_if_version_object_id(object_id: &str) -> Option<&str> { + let (table_id, version) = object_id.rsplit_once(DELIMITER)?; + if version.len() == 20 && version.bytes().all(|byte| byte.is_ascii_digit()) { + Some(table_id) + } else { + None + } + } + + fn shard_key_for_entry(entry: &ManifestEntry) -> &str { + match entry.object_type { + ObjectType::TableVersion => Self::table_version_table_id(&entry.object_id), + _ => &entry.object_id, + } + } + + fn shard_index_for_key(shard_count: usize, key: &str) -> usize { + Self::stable_hash(key) as usize % shard_count + } + + fn shard_count(&self) -> Option { + self.manifest_fragment_shard_count + } + + fn delete_shard_indices_for_object_id(&self, object_id: &str) -> Option> { + let shard_count = self.shard_count()?; + let mut shard_indices = vec![Self::shard_index_for_key(shard_count, object_id)]; + if let Some(table_id) = Self::table_version_table_id_if_version_object_id(object_id) { + let table_shard_index = Self::shard_index_for_key(shard_count, table_id); + if !shard_indices.contains(&table_shard_index) { + shard_indices.push(table_shard_index); + } + } + Some(shard_indices) + } + /// Split an object ID (vec of strings) into namespace and table name pub fn split_object_id(object_id: &[String]) -> (Vec, String) { if object_id.len() == 1 { @@ -1066,12 +1170,60 @@ impl ManifestNamespace { ])) } + fn shard_marker_object_id(shard_index: usize) -> String { + format!("__manifest_shard_marker_{:06}", shard_index) + } + + fn manifest_shard_marker_batch( + schema: Arc, + shard_index: usize, + ) -> Result { + let metadata_array = Arc::new( + JsonArray::try_from_iter([None::<&str>]) + .map_err(|e| { + lance_core::Error::from(NamespaceError::Internal { + message: format!("Failed to encode shard marker metadata: {}", e), + }) + })? + .into_inner(), + ); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(vec![Self::shard_marker_object_id( + shard_index, + )])), + Arc::new(StringArray::from(vec![ObjectType::TableVersion.as_str()])), + Arc::new(StringArray::from(vec![None::])), + metadata_array, + ], + ) + .map_err(Into::into) + } + /// Get a scanner for the manifest dataset async fn manifest_scanner(&self) -> Result { let dataset_guard = self.manifest_dataset.get().await?; Ok(dataset_guard.scan()) } + async fn manifest_scanner_for_shard(&self, shard_index: usize) -> Result { + let dataset_guard = self.manifest_dataset.get().await?; + let fragment = Self::manifest_shard_fragment(&dataset_guard, shard_index)?; + let mut scanner = dataset_guard.scan(); + scanner.with_fragments(vec![fragment]); + Ok(scanner) + } + + async fn manifest_scanner_for_shard_key(&self, shard_key: &str) -> Result { + let Some(shard_count) = self.shard_count() else { + return self.manifest_scanner().await; + }; + let shard_index = Self::shard_index_for_key(shard_count, shard_key); + self.manifest_scanner_for_shard(shard_index).await + } + /// Helper to execute a scanner and collect results into a Vec async fn execute_scanner(scanner: Scanner) -> Result> { let mut stream = scanner.try_into_stream().await.map_err(|e| { @@ -1179,11 +1331,30 @@ impl ManifestNamespace { } async fn manifest_projected_stream(dataset: &Dataset) -> Result { + Self::manifest_projected_stream_for_fragments(dataset, None).await + } + + async fn manifest_projected_stream_for_fragments( + dataset: &Dataset, + fragments: Option>, + ) -> Result { // Use the dataset's own schema so that old datasets with Utf8 metadata // (instead of JSON/LargeBinary) stream correctly. The downstream // rewrite path (metadata_column_values) handles both column types. let schema = Self::projected_schema(dataset)?; + if fragments + .as_ref() + .is_some_and(|fragments| fragments.iter().all(|fragment| fragment.files.is_empty())) + { + return Ok(Box::pin(DatafusionRecordBatchStreamAdapter::new( + schema.clone(), + stream::empty(), + ))); + } let mut scanner = dataset.scan(); + if let Some(fragments) = fragments { + scanner.with_fragments(fragments); + } scanner .project(&["object_id", "object_type", "location", "metadata"]) .map_err(|e| { @@ -1835,13 +2006,155 @@ impl ManifestNamespace { } } - /// Check if the manifest contains an object with the given ID - async fn manifest_contains_object(&self, object_id: &str) -> Result { + fn manifest_shard_fragment(dataset: &Dataset, shard_index: usize) -> Result { + let mut fragments = dataset.manifest().fragments.iter().collect::>(); + fragments.sort_by_key(|fragment| fragment.id); + fragments + .get(shard_index) + .map(|fragment| (*fragment).clone()) + .ok_or_else(|| { + lance_core::Error::from(NamespaceError::Internal { + message: format!( + "Manifest shard fragment {} is missing from __manifest", + shard_index + ), + }) + }) + } + + fn all_manifest_field_ids(dataset: &Dataset) -> Vec { + dataset + .schema() + .fields + .iter() + .map(|field| field.id as u32) + .collect() + } + + async fn rewrite_manifest_shard( + &self, + shard_index: usize, + operation: &str, + mut make_mutation: F, + ) -> Result + where + M: ManifestStreamMutation + 'static, + F: FnMut() -> M, + { + let _mutation_guard = self.manifest_mutation_lock.lock().await; + let max_retries = self.manifest_rewrite_commit_retries(); + let mut retries = 0; + + loop { + let dataset_guard = if retries == 0 { + self.manifest_dataset.get_cached().await + } else { + self.manifest_dataset.get_refreshed().await? + }; + let dataset = Arc::new(dataset_guard.clone()); + drop(dataset_guard); + + let old_fragment = Self::manifest_shard_fragment(&dataset, shard_index)?; + let source = Self::manifest_projected_stream_for_fragments( + &dataset, + Some(vec![old_fragment.clone()]), + ) + .await?; + let shared = Arc::new(StdMutex::new(ManifestRewriteShared::new(make_mutation()))); + let output_stream = Self::manifest_rewrite_output_stream(source, shared.clone()); + let write_params = WriteParams { + mode: WriteMode::Append, + session: self.session.clone(), + max_rows_per_file: u32::MAX as usize, + skip_auto_cleanup: true, + ..WriteParams::default() + }; + + let transaction = match InsertBuilder::new(dataset.clone()) + .with_params(&write_params) + .execute_uncommitted_stream(output_stream) + .await + { + Ok(transaction) => transaction, + Err(err) => { + if let Some(stream_err) = Self::take_manifest_rewrite_error(&shared)? { + return Err(stream_err); + } + return Err(convert_lance_commit_error(&err, operation, None)); + } + }; + + let (mutation, _index_data) = Self::take_manifest_rewrite_result(&shared)?; + if !mutation.has_changes { + return Ok(mutation.result); + } + + let Operation::Append { mut fragments } = transaction.operation else { + return Err(NamespaceError::Internal { + message: "Manifest shard rewrite transaction is not an append".to_string(), + } + .into()); + }; + if fragments.len() != 1 { + return Err(NamespaceError::Internal { + message: format!( + "Manifest shard rewrite expected one replacement fragment, got {}", + fragments.len() + ), + } + .into()); + } + + let mut replacement_fragment = fragments.remove(0); + replacement_fragment.id = old_fragment.id; + let fields_modified = Self::all_manifest_field_ids(&dataset); + let update = Operation::Update { + removed_fragment_ids: vec![], + updated_fragments: vec![replacement_fragment], + new_fragments: vec![], + fields_modified, + merged_generations: vec![], + fields_for_preserving_frag_bitmap: vec![], + update_mode: None, + inserted_rows_filter: None, + updated_fragment_offsets: None, + }; + let transaction = TransactionBuilder::new(dataset.manifest().version, update).build(); + let result = CommitBuilder::new(dataset.clone()) + .with_max_retries(max_retries) + .with_skip_auto_cleanup(true) + .execute(transaction) + .await; + + match result { + Ok(new_dataset) => { + self.manifest_dataset.set_latest(new_dataset).await; + return Ok(mutation.result); + } + Err(LanceError::CommitConflict { .. }) + | Err(LanceError::RetryableCommitConflict { .. }) + | Err(LanceError::TooMuchWriteContention { .. }) + | Err(LanceError::IncompatibleTransaction { .. }) + if retries < max_retries => + { + retries += 1; + tokio::time::sleep(std::time::Duration::from_millis(10 * u64::from(retries))) + .await; + } + Err(err) => return Err(convert_lance_commit_error(&err, operation, None)), + } + } + } + + async fn manifest_contains_object_in_shard( + &self, + object_id: &str, + shard_key: &str, + ) -> Result { let escaped_id = object_id.replace('\'', "''"); let filter = format!("object_id = '{}'", escaped_id); - let dataset_guard = self.manifest_dataset.get().await?; - let mut scanner = dataset_guard.scan(); + let mut scanner = self.manifest_scanner_for_shard_key(shard_key).await?; scanner.filter(&filter).map_err(|e| { lance_core::Error::from(NamespaceError::Internal { @@ -1867,11 +2180,36 @@ impl ManifestNamespace { Ok(count > 0) } + /// Check if the manifest contains an object with the given ID + pub(crate) async fn manifest_contains_object(&self, object_id: &str) -> Result { + let Some(shard_count) = self.shard_count() else { + return self + .manifest_contains_object_in_shard(object_id, object_id) + .await; + }; + + if self + .manifest_contains_object_in_shard(object_id, object_id) + .await? + { + return Ok(true); + } + if let Some(table_id) = Self::table_version_table_id_if_version_object_id(object_id) + && Self::shard_index_for_key(shard_count, table_id) + != Self::shard_index_for_key(shard_count, object_id) + { + return self + .manifest_contains_object_in_shard(object_id, table_id) + .await; + } + Ok(false) + } + /// Query the manifest for a table with the given object ID async fn query_manifest_for_table(&self, object_id: &str) -> Result> { let escaped_id = object_id.replace('\'', "''"); let filter = format!("object_id = '{}' AND object_type = 'table'", escaped_id); - let mut scanner = self.manifest_scanner().await?; + let mut scanner = self.manifest_scanner_for_shard_key(object_id).await?; scanner.filter(&filter).map_err(|e| { lance_core::Error::from(NamespaceError::Internal { message: format!("Failed to filter: {:?}", e), @@ -1937,7 +2275,7 @@ impl ManifestNamespace { Ok(found_result) } - fn serialize_metadata( + pub(crate) fn serialize_metadata( properties: Option<&HashMap>, object_type: &str, object_id: &str, @@ -1973,9 +2311,16 @@ impl ManifestNamespace { .is_some()) } + pub(crate) fn relative_location_path(base_path: &Path, location: &str) -> Path { + location + .split('/') + .filter(|segment| !segment.is_empty()) + .fold(base_path.clone(), |path, segment| path.join(segment)) + } + async fn location_has_actual_manifests(&self, location: &str) -> Result { - Self::path_has_actual_manifests(&self.object_store, &self.base_path.clone().join(location)) - .await + let table_path = Self::relative_location_path(&self.base_path, location); + Self::path_has_actual_manifests(&self.object_store, &table_path).await } pub(crate) fn is_not_found_load_error(err: &LanceError) -> bool { @@ -2056,7 +2401,10 @@ impl ManifestNamespace { .await } - async fn upsert_into_manifest_with_metadata(&self, entries: Vec) -> Result<()> { + pub(crate) async fn upsert_into_manifest_with_metadata( + &self, + entries: Vec, + ) -> Result<()> { self.merge_into_manifest_with_metadata(entries, WhenMatched::UpdateAll) .await } @@ -2070,6 +2418,25 @@ impl ManifestNamespace { return Ok(()); } + if let Some(shard_count) = self.shard_count() { + let mut grouped: HashMap> = HashMap::new(); + for entry in entries { + let shard_index = + Self::shard_index_for_key(shard_count, Self::shard_key_for_entry(&entry)); + grouped.entry(shard_index).or_default().push(entry); + } + + for (shard_index, entries) in grouped { + self.rewrite_manifest_shard( + shard_index, + "Failed to overwrite manifest shard", + || UpsertManifestMutation::new(entries.clone(), when_matched.clone()), + ) + .await?; + } + return Ok(()); + } + self.rewrite_manifest("Failed to overwrite manifest", || { UpsertManifestMutation::new(entries.clone(), when_matched.clone()) }) @@ -2079,11 +2446,30 @@ impl ManifestNamespace { /// Delete an entry from the manifest table pub async fn delete_from_manifest(&self, object_id: &str) -> Result<()> { let object_id = object_id.to_string(); + if let Some(shard_indices) = self.delete_shard_indices_for_object_id(&object_id) { + for shard_index in shard_indices { + let deleted = self + .rewrite_manifest_shard( + shard_index, + "Failed to delete from manifest shard", + || DeleteObjectMutation { + object_id: object_id.clone(), + deleted: false, + }, + ) + .await?; + if deleted { + return Ok(()); + } + } + return Ok(()); + } self.rewrite_manifest("Failed to delete from manifest", || DeleteObjectMutation { object_id: object_id.clone(), deleted: false, }) .await + .map(|_| ()) } /// Query the manifest for all versions of a table, sorted by version. @@ -2107,7 +2493,7 @@ impl ManifestNamespace { "object_type = 'table_version' AND starts_with(object_id, '{}{}')", escaped_id, DELIMITER ); - let mut scanner = self.manifest_scanner().await?; + let mut scanner = self.manifest_scanner_for_shard_key(object_id).await?; scanner.filter(&filter).map_err(|e| { lance_core::Error::from(NamespaceError::Internal { message: format!("Failed to filter: {:?}", e), @@ -2174,7 +2560,9 @@ impl ManifestNamespace { "object_id = '{}' AND object_type = 'table_version'", escaped_id ); - let mut scanner = self.manifest_scanner().await?; + let mut scanner = self + .manifest_scanner_for_shard_key(Self::table_version_table_id(version_object_id)) + .await?; scanner.filter(&filter).map_err(|e| { lance_core::Error::from(NamespaceError::Internal { message: format!("Failed to filter: {:?}", e), @@ -2199,6 +2587,34 @@ impl ManifestNamespace { } async fn delete_table_version_rows_by_object_ids(&self, object_ids: &[String]) -> Result { + if let Some(shard_count) = self.shard_count() { + let mut grouped: HashMap> = HashMap::new(); + for object_id in object_ids { + let shard_index = + Self::shard_index_for_key(shard_count, Self::table_version_table_id(object_id)); + grouped + .entry(shard_index) + .or_default() + .push(object_id.clone()); + } + + let mut deleted = 0; + for (shard_index, object_ids) in grouped { + let object_ids = object_ids.into_iter().collect::>(); + deleted += self + .rewrite_manifest_shard( + shard_index, + "Failed to delete table versions from manifest shard", + || DeleteTableVersionsMutation { + target: DeleteTableVersionsTarget::ObjectIds(object_ids.clone()), + deleted_count: 0, + }, + ) + .await?; + } + return Ok(deleted); + } + let object_ids = object_ids.iter().cloned().collect::>(); self.rewrite_manifest("Failed to delete table versions from manifest", || { DeleteTableVersionsMutation { @@ -2232,6 +2648,50 @@ impl ManifestNamespace { &self, table_ranges: &[(String, Vec<(i64, i64)>)], ) -> Result { + if let Some(shard_count) = self.shard_count() { + let mut grouped: HashMap> = HashMap::new(); + for (object_id, ranges) in table_ranges { + let shard_index = Self::shard_index_for_key(shard_count, object_id); + grouped + .entry(shard_index) + .or_default() + .push((object_id.clone(), ranges.clone())); + } + + let mut deleted = 0; + for (shard_index, table_ranges) in grouped { + let targets = table_ranges + .iter() + .filter_map(|(object_id, ranges)| { + let ranges = Self::normalize_table_version_ranges(ranges); + if ranges.is_empty() { + None + } else { + Some(DeleteTableVersionRangeTarget { + object_id_prefix: Self::build_version_object_id_prefix(object_id), + ranges, + }) + } + }) + .collect::>(); + if targets.is_empty() { + continue; + } + + deleted += self + .rewrite_manifest_shard( + shard_index, + "Failed to delete table versions from manifest shard", + || DeleteTableVersionsMutation { + target: DeleteTableVersionsTarget::Ranges(targets.clone()), + deleted_count: 0, + }, + ) + .await?; + } + return Ok(deleted); + } + let targets = table_ranges .iter() .filter_map(|(object_id, ranges)| { @@ -2435,7 +2895,7 @@ impl ManifestNamespace { async fn query_manifest_for_namespace(&self, object_id: &str) -> Result> { let escaped_id = object_id.replace('\'', "''"); let filter = format!("object_id = '{}' AND object_type = 'namespace'", escaped_id); - let mut scanner = self.manifest_scanner().await?; + let mut scanner = self.manifest_scanner_for_shard_key(object_id).await?; scanner.filter(&filter).map_err(|e| { lance_core::Error::from(NamespaceError::Internal { message: format!("Failed to filter: {:?}", e), @@ -2508,11 +2968,20 @@ impl ManifestNamespace { /// 4. Persist feature flags (e.g., table_version_storage_enabled) if requested async fn ensure_manifest_table_up_to_date( root: &str, + manifest_table_name: &str, storage_options: &Option>, session: Option>, table_version_storage_enabled: bool, + manifest_fragment_shard_count: Option, ) -> Result { - let manifest_path = format!("{}/{}", root, MANIFEST_TABLE_NAME); + if matches!(manifest_fragment_shard_count, Some(0)) { + return Err(NamespaceError::InvalidInput { + message: "manifest_shard_count must be greater than 0".to_string(), + } + .into()); + } + + let manifest_path = format!("{}/{}", root, manifest_table_name); log::debug!("Attempting to load manifest from {}", manifest_path); let store_options = ObjectStoreParams { storage_options_accessor: storage_options.as_ref().map(|opts| { @@ -2534,6 +3003,20 @@ impl ManifestNamespace { .load() .await; if let Ok(mut dataset) = dataset_result { + if let Some(shard_count) = manifest_fragment_shard_count + && dataset.manifest().fragments.len() != shard_count + { + return Err(NamespaceError::InvalidInput { + message: format!( + "Existing {} table has {} fragments but manifest_shard_count is {}", + manifest_table_name, + dataset.manifest().fragments.len(), + shard_count + ), + } + .into()); + } + // Check if the object_id field has primary key metadata, migrate if not let needs_pk_migration = dataset .schema() @@ -2545,7 +3028,10 @@ impl ManifestNamespace { .unwrap_or(false); if needs_pk_migration { - log::info!("Migrating __manifest table to add primary key metadata on object_id"); + log::info!( + "Migrating {} table to add primary key metadata on object_id", + manifest_table_name + ); dataset .update_field_metadata() .update("object_id", [(LANCE_UNENFORCED_PRIMARY_KEY_POSITION, "0")]) @@ -2565,7 +3051,7 @@ impl ManifestNamespace { })?; } - // Persist table_version_storage_enabled flag in __manifest so that once + // Persist table_version_storage_enabled flag in the manifest table so that once // enabled, it becomes a permanent property of this namespace. if table_version_storage_enabled { let needs_flag = dataset @@ -2580,7 +3066,8 @@ impl ManifestNamespace { .await { log::warn!( - "Failed to persist table_version_storage_enabled flag in __manifest: {:?}", + "Failed to persist table_version_storage_enabled flag in {}: {:?}", + manifest_table_name, e ); } @@ -2590,8 +3077,19 @@ impl ManifestNamespace { } else { log::info!("Creating new manifest table at {}", manifest_path); let schema = Self::manifest_schema(); - let empty_batch = RecordBatch::new_empty(schema.clone()); - let reader = RecordBatchIterator::new(vec![Ok(empty_batch)], schema.clone()); + let batches = if let Some(shard_count) = manifest_fragment_shard_count { + let mut batches = Vec::with_capacity(shard_count); + for shard_index in 0..shard_count { + batches.push(Self::manifest_shard_marker_batch( + schema.clone(), + shard_index, + )?); + } + batches + } else { + vec![RecordBatch::new_empty(schema.clone())] + }; + let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); let store_params = ObjectStoreParams { storage_options_accessor: storage_options.as_ref().map(|opts| { @@ -2606,6 +3104,7 @@ impl ManifestNamespace { let write_params = WriteParams { session: session.clone(), store_params: Some(store_params), + max_rows_per_file: MANIFEST_FRAGMENT_SHARD_MARKER_ROWS_PER_FILE, ..Default::default() }; @@ -2714,6 +3213,199 @@ impl ManifestNamespace { } } +/// Manifest-backed catalog split across fragments of one Lance manifest table. +pub struct ShardedManifestNamespace { + root: String, + manifest: ManifestNamespace, + shard_count: usize, +} + +impl std::fmt::Debug for ShardedManifestNamespace { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ShardedManifestNamespace") + .field("root", &self.root) + .field("shard_count", &self.shard_count) + .finish() + } +} + +impl ShardedManifestNamespace { + #[allow(clippy::too_many_arguments)] + pub async fn from_directory( + root: String, + storage_options: Option>, + session: Option>, + object_store: Arc, + base_path: Path, + dir_listing_enabled: bool, + inline_optimization_enabled: bool, + commit_retries: Option, + table_version_storage_enabled: bool, + shard_count: usize, + ) -> Result { + if shard_count == 0 { + return Err(NamespaceError::InvalidInput { + message: "manifest_shard_count must be greater than 0".to_string(), + } + .into()); + } + + let manifest = ManifestNamespace::from_directory_with_manifest_table( + root.clone(), + MANIFEST_TABLE_NAME.to_string(), + storage_options, + session, + object_store, + base_path, + dir_listing_enabled, + inline_optimization_enabled, + commit_retries, + table_version_storage_enabled, + Some(shard_count), + ) + .await?; + + Ok(Self { + root, + manifest, + shard_count, + }) + } + + pub async fn list_manifest_table_locations(&self) -> Result> { + self.manifest.list_manifest_table_locations().await + } + + pub async fn register_table(&self, name: &str, location: String) -> Result<()> { + self.manifest.register_table(name, location).await + } + + pub async fn insert_into_manifest_with_metadata( + &self, + entries: Vec, + ) -> Result<()> { + self.manifest + .insert_into_manifest_with_metadata(entries) + .await + } + + pub async fn query_table_versions( + &self, + object_id: &str, + descending: bool, + limit: Option, + ) -> Result> { + self.manifest + .query_table_versions(object_id, descending, limit) + .await + } + + pub async fn list_table_versions( + &self, + table_id: &[String], + descending: bool, + limit: Option, + ) -> Result { + self.manifest + .list_table_versions(table_id, descending, limit) + .await + } + + pub async fn describe_table_version( + &self, + table_id: &[String], + version: i64, + ) -> Result { + self.manifest + .describe_table_version(table_id, version) + .await + } + + pub async fn batch_delete_table_versions_by_ranges( + &self, + table_ranges: &[(String, Vec<(i64, i64)>)], + ) -> Result { + self.manifest + .batch_delete_table_versions_by_ranges(table_ranges) + .await + } +} + +#[async_trait] +impl LanceNamespace for ShardedManifestNamespace { + fn namespace_id(&self) -> String { + self.root.clone() + } + + async fn list_tables(&self, request: ListTablesRequest) -> Result { + self.manifest.list_tables(request).await + } + + async fn describe_table(&self, request: DescribeTableRequest) -> Result { + self.manifest.describe_table(request).await + } + + async fn table_exists(&self, request: TableExistsRequest) -> Result<()> { + self.manifest.table_exists(request).await + } + + async fn create_table( + &self, + request: CreateTableRequest, + data: Bytes, + ) -> Result { + self.manifest.create_table(request, data).await + } + + async fn drop_table(&self, request: DropTableRequest) -> Result { + self.manifest.drop_table(request).await + } + + async fn declare_table(&self, request: DeclareTableRequest) -> Result { + self.manifest.declare_table(request).await + } + + async fn register_table(&self, request: RegisterTableRequest) -> Result { + ::register_table(&self.manifest, request).await + } + + async fn deregister_table( + &self, + request: DeregisterTableRequest, + ) -> Result { + self.manifest.deregister_table(request).await + } + + async fn list_namespaces( + &self, + request: ListNamespacesRequest, + ) -> Result { + self.manifest.list_namespaces(request).await + } + + async fn describe_namespace( + &self, + request: DescribeNamespaceRequest, + ) -> Result { + self.manifest.describe_namespace(request).await + } + + async fn create_namespace( + &self, + request: CreateNamespaceRequest, + ) -> Result { + self.manifest.create_namespace(request).await + } + + async fn drop_namespace(&self, request: DropNamespaceRequest) -> Result { + self.manifest.drop_namespace(request).await + } + + async fn namespace_exists(&self, request: NamespaceExistsRequest) -> Result<()> { + self.manifest.namespace_exists(request).await + } +} + #[async_trait] impl LanceNamespace for ManifestNamespace { fn namespace_id(&self) -> String { @@ -3182,7 +3874,7 @@ impl LanceNamespace for ManifestNamespace { self.delete_from_manifest(&object_id).boxed().await?; // Delete physical data directory using the dir_name from manifest - let table_path = self.base_path.clone().join(info.location.as_str()); + let table_path = Self::relative_location_path(&self.base_path, &info.location); let table_uri = Self::construct_full_uri(&self.root, &info.location)?; // Remove the table directory @@ -3687,6 +4379,8 @@ mod tests { use rstest::rstest; use std::collections::HashMap; + use crate::dir::ManifestCatalog; + fn create_test_ipc_data() -> Vec { use arrow::array::{Int32Array, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; @@ -3982,6 +4676,54 @@ mod tests { assert_eq!(versions, vec![(1, String::new())]); } + #[tokio::test] + async fn test_sharded_manifest_direct_delete_table_version_routes_to_table_shard() { + let temp_dir = TempStdDir::default(); + let temp_path = temp_dir.to_str().unwrap(); + + let namespace = DirectoryNamespaceBuilder::new(temp_path) + .manifest_shard_count(4) + .build() + .await + .unwrap(); + let table_object_id = "table"; + let version_object_id = ManifestNamespace::build_version_object_id(table_object_id, 1); + let manifest_ns = namespace.manifest_ns.as_ref().unwrap(); + manifest_ns + .insert_into_manifest_with_metadata(vec![super::ManifestEntry { + object_id: version_object_id.clone(), + object_type: super::ObjectType::TableVersion, + location: None, + metadata: None, + }]) + .await + .unwrap(); + assert_eq!( + manifest_ns + .query_table_versions(table_object_id, false, None) + .await + .unwrap(), + vec![(1, String::new())] + ); + + let ManifestCatalog::Sharded(sharded) = manifest_ns.as_ref() else { + panic!("expected sharded manifest catalog"); + }; + sharded + .manifest + .delete_from_manifest(&version_object_id) + .await + .unwrap(); + + assert!( + manifest_ns + .query_table_versions(table_object_id, false, None) + .await + .unwrap() + .is_empty() + ); + } + #[tokio::test] async fn test_manifest_delete_table_versions_streams_large_ranges() { let temp_dir = TempStdDir::default();