Skip to content

Commit db02fc9

Browse files
feat: add write ahead log appender and tailer primitives (#6669)
## Summary Surface a generic read/write surface for MemWAL shards so callers can drive a shard directly without going through the existing flusher. - **Shard self-description.** `ShardManifest` now records the `(shard_spec_id, field_id -> bytes)` assignment that produced the shard, so a manifest found on disk can be mapped back to a shard spec without consulting the inline index. Proto adds `ShardFieldEntry`; manifest carries `shard_field_values: HashMap<String, Vec<u8>>`. - **Idempotent shard initialization.** `ShardManifestStore::initialize_shard()` writes manifest v1 at writer epoch 0 and treats `AlreadyExists` as success when the existing manifest matches. - **Generic WAL primitives.** - `WalAppender::open()` claims a writer epoch via the manifest store; `append(batches)` serializes Arrow IPC and writes with put-if-not-exists, retrying on conflict and fencing on PUT failure. - `WalTailer::read_entry`, `next_position`, `first_position` use `wal_entry_position_last_seen` as a probe hint with a listing fallback. `WalReadEntry` includes the `writer_epoch` recorded in the entry so callers can fence-check on replay. - **Index discovery helpers.** New `Dataset` methods `mem_wal_index_details()` and `list_mem_wal_latest_shard_ids()` (object-storage directory listing). The inline shard snapshot is positioned as a stale read-optimization rather than the source of truth; `docs/src/format/table/mem_wal.md` is updated to match. ## Tests Added unit tests for: - `ShardManifestStore::initialize_shard` happy path, idempotent on match, rejects mismatched conflict. - `WalAppender` / `WalTailer` round-trip including `writer_epoch` propagation; position increment. - Writer-epoch fencing: a stale appender hits the conflict path on append and surfaces the fence error. - Input validation: empty batch list and zero-row batches are rejected. - `WalTailer::with_cursor_updates(true)` updates `wal_entry_position_last_seen` asynchronously, and `next_position()` still resolves correctly. - `mem_wal_path()` helper. cc @jackye1995 for review.
1 parent 23003a7 commit db02fc9

8 files changed

Lines changed: 1057 additions & 128 deletions

File tree

docs/src/format/table/mem_wal.md

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,14 @@ A table has at most one MemWAL index. It stores:
4545
- **Configuration**: Shard specs defining how rows map to shards, and which indexes to maintain
4646
- **Merge progress**: Last generation merged to base table for each shard
4747
- **Index catchup progress**: Which merged generation each base table index has been rebuilt to cover
48-
- **Shard snapshots**: Snapshot of all shard states for read optimization
48+
- **Shard snapshots**: Point-in-time snapshot of shard states for read optimization
4949

5050
The index is the source of truth for **configuration**, **merge progress** and **index catchup progress**
5151
Writers and mergers read the MemWAL index to get these configurations before writing.
5252

5353
Each [shard's manifest](#shard-manifest) is authoritative for its own state.
54-
Readers use **shard snapshots** is a read-only optimization to see a point-in-time view of all shards without the need to open each shard manifest.
54+
Readers may use **shard snapshots** as a read-only optimization to see a point-in-time view of shards without opening each shard manifest.
55+
Readers that need the latest shard set must discover shard directories in storage and read each shard's latest manifest.
5556

5657
See [MemWAL Index Details](#memwal-index-details) for the complete structure.
5758

@@ -173,6 +174,7 @@ Each shard has a manifest file. This is the source of truth for the state of a s
173174
The manifest contains:
174175

175176
- **Fencing state**: `writer_epoch` as the latest writer fencing token, see [Writer Fencing](#writer-fencing) for more details.
177+
- **Shard assignment**: `shard_spec_id` and `shard_field_values` record how this shard maps to its shard spec. `shard_field_values` is a map from shard field id to the raw Arrow scalar bytes of the computed value; the matching `ShardField.result_type` in the `ShardSpec` determines how to interpret each entry (e.g., 4 little-endian bytes for int32, raw UTF-8 bytes for utf8).
176178
- **WAL pointers**: `replay_after_wal_entry_position` (last entry position flushed to MemTable, 0-based), `wal_entry_position_last_seen` (last entry position seen at manifest update, 0-based)
177179
- **Generation trackers**: `current_generation` (next generation to flush), `flushed_generations` list of generation number and directory path pairs (e.g., generation 1 at `a1b2c3d4_gen_1`)
178180

@@ -258,8 +260,17 @@ The `index_details` field in `IndexMetadata` contains a `MemWalIndexDetails` pro
258260

259261
### Shard Identifier
260262

261-
Each shard has a unique identifier across all shards following UUID v4 standard.
262-
When a new shard is created, it is assigned a new identifier.
263+
Each shard has a unique UUID identifier within the table.
264+
When a new shard is created, implementations may assign either a random UUID or
265+
a deterministic UUID derived from the shard assignment when deterministic
266+
writer fencing is required.
267+
268+
### Shard Discovery
269+
270+
The MemWAL index can store shard snapshots for read optimization, but those snapshots may lag the latest shard set.
271+
Implementations that need to discover the current shard set should list `_mem_wal/` shard directories and read each shard's latest [shard manifest](#shard-manifest).
272+
273+
Each shard manifest records the shard UUID, shard spec ID, and computed shard field values needed to map the shard back to a shard spec assignment.
263274

264275
### Shard Spec
265276

@@ -342,28 +353,25 @@ This file uses standard Lance format with the shard snapshot schema, enabling ef
342353
### Shard Snapshot Arrow Schema
343354

344355
Shard snapshots are stored as a Lance file with one row per shard.
345-
The schema has one column per `ShardManifest` field plus shard spec columns:
346-
347-
| Column | Type | Description |
348-
| --------------------------------- | ------------------------------------------------ | -------------------------------------------------------- |
349-
| `shard_id` | `fixed_size_binary(16)` | Shard UUID bytes |
350-
| `version` | `uint64` | Shard manifest version |
351-
| `shard_spec_id` | `uint32` | Shard spec ID (0 if manual) |
352-
| `writer_epoch` | `uint64` | Writer fencing token |
353-
| `replay_after_wal_entry_position` | `uint64` | Last WAL entry position (0-based) flushed to MemTable |
354-
| `wal_entry_position_last_seen` | `uint64` | Last WAL entry position (0-based) seen (hint) |
355-
| `current_generation` | `uint64` | Next generation to flush |
356-
| `flushed_generations` | `list<struct<generation: uint64, path: string>>` | Flushed MemTable paths |
357-
| `region_field_{field_id}` | varies | Shard field value (one column per field in shard spec) |
356+
The snapshot schema is optimized for shard discovery. Full mutable shard state
357+
remains in the authoritative shard manifest files.
358+
359+
| Column | Type | Description |
360+
| ------------------------ | ------------- | ---------------------------------------------------------------------------------------------------------- |
361+
| `shard_id` | `utf8` | Shard UUID string |
362+
| `shard_spec_id` | `uint32` | Shard spec ID (0 if manual) |
363+
| `shard_field_{field_id}` | varies | One column per shard field defined in the shard spec, typed to match the field's `ShardField.result_type`. |
358364

359365
For example, with a shard spec containing a field `user_bucket` of type `int32`:
360366

361367
| Column | Type | Description |
362368
| -------------------------- | ------- | ---------------------------- |
363369
| ... | ... | (base columns above) |
364-
| `region_field_user_bucket` | `int32` | Bucket value for this shard |
370+
| `shard_field_user_bucket` | `int32` | Bucket value for this shard |
365371

366-
This schema directly corresponds to the fields in the `ShardManifest` protobuf message plus the computed shard field values.
372+
This schema records the fields needed to map each shard back to its shard spec
373+
assignment. Readers that need fencing epochs, WAL positions, or flushed
374+
generation state must read the latest shard manifests directly.
367375

368376
## Storage Layout
369377

protos/table.proto

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,12 @@ message ShardManifest {
537537
// A value of 0 indicates a manually-created shard not governed by any spec.
538538
uint32 shard_spec_id = 10;
539539

540+
// Computed shard field values as raw Arrow scalar bytes, keyed by shard
541+
// field id. The byte encoding follows Arrow's little-endian convention:
542+
// int32 is 4 LE bytes, utf8 is raw UTF-8 bytes, etc. The receiver looks
543+
// up the result_type from the ShardSpec to interpret each value.
544+
repeated ShardFieldEntry shard_field_entries = 14;
545+
540546
// Writer fencing token - monotonically increasing.
541547
// A writer must increment this when claiming the shard.
542548
uint64 writer_epoch = 2;
@@ -559,6 +565,16 @@ message ShardManifest {
559565
repeated FlushedGeneration flushed_generations = 8;
560566
}
561567

568+
// A shard field value stored as raw Arrow scalar bytes.
569+
message ShardFieldEntry {
570+
// Shard field id (matches ShardField.field_id in the ShardSpec).
571+
string field_id = 1;
572+
573+
// Raw Arrow scalar value bytes in little-endian encoding.
574+
// The data type is determined by the result_type of the matching ShardField.
575+
bytes value = 2;
576+
}
577+
562578
// A flushed MemTable generation and its storage location.
563579
message FlushedGeneration {
564580
// Generation number.
@@ -596,20 +612,17 @@ message IndexCatchupProgress {
596612
// - Shard state snapshots
597613
//
598614
// Writers read this index to get configuration before writing.
599-
// Readers read this index to discover shards and their state.
615+
// Readers may use shard snapshots in this index as a point-in-time
616+
// optimization. Readers that need the latest shard set should list shard
617+
// directories in storage and read each shard's latest manifest.
600618
// A background process updates the index periodically to keep shard snapshots current.
601619
//
602620
// Shard snapshots are stored as a Lance file with one row per shard.
603-
// The schema has one column per ShardManifest field, with shard fields as columns:
604-
// shard_id: fixed_size_binary(16) -- UUID bytes
605-
// version: uint64
621+
// The schema records shard discovery fields. Full mutable shard state remains
622+
// authoritative in the shard manifest files.
623+
// shard_id: utf8
606624
// shard_spec_id: uint32
607-
// writer_epoch: uint64
608-
// replay_after_wal_entry_position: uint64
609-
// wal_entry_position_last_seen: uint64
610-
// current_generation: uint64
611-
// merged_generation: uint64
612-
// flushed_generations: list<struct<generation: uint64, path: string>>
625+
// shard_field_{field_id}: typed per the matching ShardField.result_type
613626
message MemWalIndexDetails {
614627
// Snapshot timestamp (Unix timestamp in milliseconds).
615628
int64 snapshot_ts_millis = 1;
@@ -690,4 +703,3 @@ message ShardField {
690703
// Transform parameters (e.g., num_buckets for bucket transform).
691704
map<string, string> parameters = 6;
692705
}
693-

rust/lance-index/src/mem_wal.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,11 @@ pub struct ShardManifest {
153153
pub shard_id: Uuid,
154154
pub version: u64,
155155
pub shard_spec_id: u32,
156+
/// Computed shard field values as raw Arrow scalar bytes, keyed by field id.
157+
/// The byte encoding follows Arrow's little-endian convention: int32 is 4 LE
158+
/// bytes, utf8 is raw UTF-8 bytes, etc. The result_type in the corresponding
159+
/// ShardField from the ShardSpec determines how to interpret each value.
160+
pub shard_field_values: HashMap<String, Vec<u8>>,
156161
pub writer_epoch: u64,
157162
/// The most recent WAL entry position (0-based) flushed to a MemTable.
158163
/// Recovery replays from `replay_after_wal_entry_position + 1`.
@@ -165,7 +170,8 @@ pub struct ShardManifest {
165170

166171
impl DeepSizeOf for ShardManifest {
167172
fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
168-
self.flushed_generations.deep_size_of_children(context)
173+
self.shard_field_values.deep_size_of_children(context)
174+
+ self.flushed_generations.deep_size_of_children(context)
169175
}
170176
}
171177

@@ -175,6 +181,14 @@ impl From<&ShardManifest> for pb::ShardManifest {
175181
shard_id: Some((&rm.shard_id).into()),
176182
version: rm.version,
177183
shard_spec_id: rm.shard_spec_id,
184+
shard_field_entries: rm
185+
.shard_field_values
186+
.iter()
187+
.map(|(k, v)| pb::ShardFieldEntry {
188+
field_id: k.clone(),
189+
value: v.clone(),
190+
})
191+
.collect(),
178192
writer_epoch: rm.writer_epoch,
179193
replay_after_wal_entry_position: rm.replay_after_wal_entry_position,
180194
wal_entry_position_last_seen: rm.wal_entry_position_last_seen,
@@ -193,10 +207,16 @@ impl TryFrom<pb::ShardManifest> for ShardManifest {
193207
.as_ref()
194208
.map(Uuid::try_from)
195209
.ok_or_else(|| Error::invalid_input("Missing shard_id in ShardManifest"))??;
210+
let shard_field_values = rm
211+
.shard_field_entries
212+
.into_iter()
213+
.map(|e| (e.field_id, e.value))
214+
.collect();
196215
Ok(Self {
197216
shard_id,
198217
version: rm.version,
199218
shard_spec_id: rm.shard_spec_id,
219+
shard_field_values,
200220
writer_epoch: rm.writer_epoch,
201221
replay_after_wal_entry_position: rm.replay_after_wal_entry_position,
202222
wal_entry_position_last_seen: rm.wal_entry_position_last_seen,

rust/lance/src/dataset/mem_wal.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ mod util;
4141
mod wal;
4242
pub mod write;
4343

44-
pub use api::{DatasetMemWalExt, MemWalConfig};
44+
pub use api::{DatasetMemWalExt, MemWalConfig, MemWalShardConfig};
4545
pub use manifest::ShardManifestStore;
4646
pub use memtable::scanner::MemTableScanner;
4747
pub use scanner::{LsmDataSource, LsmGeneration, LsmScanner, ShardSnapshot};
48+
pub use wal::{WalAppendResult, WalAppender, WalReadEntry, WalTailer};
4849
pub use write::ShardWriter;
4950
pub use write::ShardWriterConfig;

0 commit comments

Comments
 (0)