Skip to content

Commit 4ea9166

Browse files
authored
fix(namespace): serialize manifest mutations (#6525)
This fixes the directory namespace CI failure where single-instance concurrent create/drop operations on `__manifest` could time out with `TooMuchWriteContention`, especially in the Windows build. Manifest mutations are now serialized within a single `ManifestNamespace` instance so concurrent operations stop racing on stale in-memory snapshots, and inline manifest maintenance now defers compaction/index merges until the table has accumulated enough fragments. Context: https://github.com/lance-format/lance/actions/runs/24439767878/job/71401857043
1 parent 88eabc7 commit 4ea9166

1 file changed

Lines changed: 20 additions & 1 deletion

File tree

rust/lance-namespace-impls/src/dir/manifest.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use std::{
4949
ops::{Deref, DerefMut},
5050
sync::Arc,
5151
};
52-
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
52+
use tokio::sync::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
5353

5454
const MANIFEST_TABLE_NAME: &str = "__manifest";
5555
const DELIMITER: &str = "$";
@@ -61,6 +61,9 @@ const OBJECT_ID_INDEX_NAME: &str = "object_id_btree";
6161
const OBJECT_TYPE_INDEX_NAME: &str = "object_type_bitmap";
6262
/// LabelList index on the base_objects column for view dependencies
6363
const BASE_OBJECTS_INDEX_NAME: &str = "base_objects_label_list";
64+
/// Inline maintenance on the manifest table is expensive relative to a single-row mutation.
65+
/// Wait until enough fragments accumulate before compacting files or merging indices.
66+
const MANIFEST_INLINE_OPTIMIZATION_FRAGMENT_THRESHOLD: usize = 8;
6467

6568
/// Object types that can be stored in the manifest
6669
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -268,6 +271,9 @@ pub struct ManifestNamespace {
268271
/// Number of retries for commit operations on the manifest table.
269272
/// If None, defaults to [`lance_table::io::commit::CommitConfig`] default (20).
270273
commit_retries: Option<u32>,
274+
/// Serialize manifest mutations within a single namespace instance so concurrent
275+
/// create/drop calls do not compete with each other on the same in-memory snapshot.
276+
manifest_mutation_lock: Arc<Mutex<()>>,
271277
}
272278

273279
impl std::fmt::Debug for ManifestNamespace {
@@ -374,6 +380,7 @@ impl ManifestNamespace {
374380
dir_listing_enabled,
375381
inline_optimization_enabled,
376382
commit_retries,
383+
manifest_mutation_lock: Arc::new(Mutex::new(())),
377384
})
378385
}
379386

@@ -615,6 +622,13 @@ impl ManifestNamespace {
615622
}
616623
}
617624

625+
let should_compact_and_optimize =
626+
dataset.count_fragments() >= MANIFEST_INLINE_OPTIMIZATION_FRAGMENT_THRESHOLD;
627+
628+
if !should_compact_and_optimize {
629+
return Ok(());
630+
}
631+
618632
// Step 2: Run file compaction
619633
log::debug!("Running file compaction on __manifest table");
620634
match compact_files(dataset, CompactionOptions::default(), None).await {
@@ -933,6 +947,7 @@ impl ManifestNamespace {
933947
let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
934948

935949
// Use MergeInsert to ensure uniqueness on object_id
950+
let _mutation_guard = self.manifest_mutation_lock.lock().await;
936951
let dataset_guard = self.manifest_dataset.get().await?;
937952
let dataset_arc = Arc::new(dataset_guard.clone());
938953
drop(dataset_guard); // Drop read guard before merge insert
@@ -994,6 +1009,7 @@ impl ManifestNamespace {
9941009
let predicate = format!("object_id = '{}'", object_id);
9951010

9961011
// Get dataset and use DeleteBuilder with configured retries
1012+
let _mutation_guard = self.manifest_mutation_lock.lock().await;
9971013
let dataset_guard = self.manifest_dataset.get().await?;
9981014
let dataset = Arc::new(dataset_guard.clone());
9991015
drop(dataset_guard); // Drop read guard before delete
@@ -1191,6 +1207,7 @@ impl ManifestNamespace {
11911207
}
11921208

11931209
// Execute a single bulk delete with the combined filter
1210+
let _mutation_guard = self.manifest_mutation_lock.lock().await;
11941211
let dataset_guard = self.manifest_dataset.get().await?;
11951212
let dataset = Arc::new(dataset_guard.clone());
11961213
drop(dataset_guard);
@@ -1267,6 +1284,7 @@ impl ManifestNamespace {
12671284
}
12681285

12691286
// Execute a single atomic bulk delete covering all tables
1287+
let _mutation_guard = self.manifest_mutation_lock.lock().await;
12701288
let dataset_guard = self.manifest_dataset.get().await?;
12711289
let dataset = Arc::new(dataset_guard.clone());
12721290
drop(dataset_guard);
@@ -1304,6 +1322,7 @@ impl ManifestNamespace {
13041322
/// __manifest dataset's table metadata, rather than inserting a row.
13051323
/// If the property already exists with the same value, this is a no-op.
13061324
pub async fn set_property(&self, name: &str, value: &str) -> Result<()> {
1325+
let _mutation_guard = self.manifest_mutation_lock.lock().await;
13071326
let dataset_guard = self.manifest_dataset.get().await?;
13081327
if dataset_guard.metadata().get(name) == Some(&value.to_string()) {
13091328
return Ok(());

0 commit comments

Comments
 (0)