Skip to content

Commit e9d1617

Browse files
feat: unify WalAppender into ShardWriter via enable_memtable mode (#6675)
## Summary - Add `ShardWriterConfig::enable_memtable` (default `true`); when `false`, `ShardWriter` runs in a new WAL-only mode that keeps the async batched WAL pipeline but skips MemTable allocation, in-memory indexes, MemTable freezing, and MemTable-bytes backpressure (a separate WAL-only backpressure budget reuses `max_unflushed_memtable_bytes`). - `WalAppender` becomes the WAL write primitive used by both modes inside `WalFlusher`, replacing the prior duplicate plain-`put` path. MemTable mode now also gets atomic put-if-not-exists, conflict retry, and fence-on-write. - `WalAppender` stays public as the lowest-level synchronous-atomic appender. New `pub(crate) WalAppender::with_claimed_epoch` lets `ShardWriter::open` claim the epoch once and inject it. - WAL-only mode uses a drainable `WalOnlyState` queue with snapshot/commit semantics so a failed append leaves batches in the queue for retry instead of dropping them silently. - `memtable_stats()`, `scan()`, `active_memtable_ref()` now return `Result<...>` and produce a clear `invalid_input` error in WAL-only mode. Behavior change to call out: first WAL entry on a fresh shard is now position 0 instead of 1, matching the 0-based positions documented in the [MemTable & WAL spec](https://github.com/lance-format/lance/blob/main/docs/src/format/table/mem_wal.md). The previous flusher seeded its counter from `wal_entry_position_last_seen + 1` and so always skipped position 0. Context: this realizes the layering discussed in #6669 (comment) — keep `WalAppender` as the low-level primitive and let users always use `ShardWriter`, with a config switch to turn the MemTable on or off. cc @jackye1995 --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 84b5d25 commit e9d1617

7 files changed

Lines changed: 2198 additions & 421 deletions

File tree

python/src/mem_wal.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ impl PyRegionWriter {
201201
// Snapshot stats before close so the captured state reflects
202202
// what was written, not any internal bookkeeping done by close().
203203
let stats_snapshot = stats_handle.snapshot();
204-
let memtable_stats_before_close = writer.memtable_stats().await;
204+
let memtable_stats_before_close = writer.memtable_stats().await?;
205205
writer.close().await?;
206206
let closed_memtable_stats = closed_memtable_stats(memtable_stats_before_close);
207207
let mut closed_guard = closed_state.lock().await;
@@ -255,7 +255,7 @@ impl PyRegionWriter {
255255
.block_on(Some(py), async move {
256256
let guard = inner.lock().await;
257257
match guard.as_ref() {
258-
Some(w) => Ok(w.memtable_stats().await),
258+
Some(w) => w.memtable_stats().await,
259259
None => {
260260
let closed_guard = closed_state.lock().await;
261261
closed_guard
@@ -267,7 +267,7 @@ impl PyRegionWriter {
267267
}
268268
}
269269
})?
270-
.map_err(|e| PyIOError::new_err(e.to_string()))?;
270+
.map_err(|e: lance::Error| PyIOError::new_err(e.to_string()))?;
271271

272272
memtable_stats_to_pydict(py, &stats)
273273
}
@@ -296,7 +296,7 @@ impl PyRegionWriter {
296296
let guard = inner.lock().await;
297297
match guard.as_ref() {
298298
Some(w) => {
299-
let active_ref = w.active_memtable_ref().await;
299+
let active_ref = w.active_memtable_ref().await?;
300300
let writer_snapshot = w
301301
.manifest()
302302
.await?

rust/lance/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,5 +210,9 @@ harness = false
210210
name = "mem_wal_read"
211211
harness = false
212212

213+
[[bench]]
214+
name = "mem_wal_replay"
215+
harness = false
216+
213217
[lints]
214218
workspace = true

rust/lance/benches/mem_wal_read.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ async fn setup_benchmark(
307307
let manifest = writer.manifest().await.unwrap();
308308

309309
// Get active memtable reference
310-
let active_memtable_ref = writer.active_memtable_ref().await;
310+
let active_memtable_ref = writer.active_memtable_ref().await.unwrap();
311311

312312
// Build shard snapshot
313313
let mut shard_snapshot = ShardSnapshot::new(shard_id);
@@ -906,7 +906,7 @@ async fn setup_vector_benchmark(
906906
}
907907

908908
let manifest = writer.manifest().await.unwrap();
909-
let active_memtable_ref = writer.active_memtable_ref().await;
909+
let active_memtable_ref = writer.active_memtable_ref().await.unwrap();
910910

911911
let mut shard_snapshot = ShardSnapshot::new(shard_id);
912912
if let Some(ref m) = manifest {

0 commit comments

Comments
 (0)