Skip to content

Commit 170446a

Browse files
fix: always do wal tailer cursor update (#6673)
## Summary - Switch WAL entry positions to 0-based; appender uses the manifest `wal_entry_position_last_seen` hint with HEAD probing to discover its next write position, falling back to a directory listing only when the hint is stale. - `WalTailer` always emits a best-effort manifest cursor update on successful `read_entry` (removes the opt-in `with_cursor_updates` flag). - Extract the shared HEAD-probe loop into `probe_forward_from()`.
1 parent ad05eae commit 170446a

1 file changed

Lines changed: 76 additions & 57 deletions

File tree

  • rust/lance/src/dataset/mem_wal

rust/lance/src/dataset/mem_wal/wal.rs

Lines changed: 76 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ impl WalEntryData {
550550
// Generic WAL Appender and Tailer
551551
// ============================================================================
552552

553-
const FIRST_WAL_ENTRY_POSITION: u64 = 1;
553+
const FIRST_WAL_ENTRY_POSITION: u64 = 0;
554554
const MAX_APPEND_CREATE_CONFLICTS: usize = 1024;
555555
const APPEND_CONFLICT_REFRESH_INTERVAL: usize = 16;
556556
const MAX_CURSOR_PROBE: u64 = 4096;
@@ -635,7 +635,7 @@ impl WalAppender {
635635

636636
let mut next_pos = self.next_entry_position.lock().await;
637637
if next_pos.is_none() {
638-
*next_pos = Some(self.scan_next_position().await?);
638+
*next_pos = Some(self.discover_next_position().await?);
639639
}
640640

641641
let mut conflicts = 0;
@@ -676,7 +676,7 @@ impl WalAppender {
676676
)));
677677
}
678678
if conflicts % APPEND_CONFLICT_REFRESH_INTERVAL == 0 {
679-
*next_pos = Some(self.scan_next_position().await?);
679+
*next_pos = Some(self.discover_next_position().await?);
680680
} else {
681681
*next_pos = Some(pos.checked_add(1).ok_or_else(|| {
682682
Error::io(format!("WAL position overflow for shard {}", self.shard_id))
@@ -696,7 +696,20 @@ impl WalAppender {
696696
self.manifest_store.check_fenced(self.writer_epoch).await
697697
}
698698

699-
async fn scan_next_position(&self) -> Result<u64> {
699+
async fn discover_next_position(&self) -> Result<u64> {
700+
if let Ok(Some(manifest)) = self.manifest_store.read_latest().await {
701+
let hint = manifest.wal_entry_position_last_seen;
702+
if let Some(tip) = probe_forward_from(
703+
self.object_store.as_ref(),
704+
&self.wal_dir,
705+
self.shard_id,
706+
hint,
707+
)
708+
.await?
709+
{
710+
return Ok(tip);
711+
}
712+
}
700713
scan_next_position(self.object_store.as_ref(), &self.wal_dir, self.shard_id).await
701714
}
702715
}
@@ -706,13 +719,15 @@ impl WalAppender {
706719
/// Uses `wal_entry_position_last_seen` from the shard manifest as a cursor
707720
/// hint for `next_position()`, probing forward from the hint to find the true
708721
/// tip before falling back to a full directory listing.
722+
///
723+
/// Successful `read_entry` calls asynchronously update
724+
/// `wal_entry_position_last_seen` in the shard manifest (fire-and-forget).
709725
#[derive(Debug, Clone)]
710726
pub struct WalTailer {
711727
object_store: Arc<ObjectStore>,
712728
wal_dir: Path,
713729
manifest_store: Arc<ShardManifestStore>,
714730
shard_id: Uuid,
715-
update_cursor: bool,
716731
}
717732

718733
impl WalTailer {
@@ -729,20 +744,12 @@ impl WalTailer {
729744
wal_dir: shard_wal_path(&base_path, &shard_id),
730745
manifest_store,
731746
shard_id,
732-
update_cursor: false,
733747
}
734748
}
735749

736-
/// Enable async best-effort cursor updates on read.
737-
///
738-
/// When enabled, successful `read_entry` calls asynchronously update
739-
/// `wal_entry_position_last_seen` in the shard manifest.
740-
pub fn with_cursor_updates(mut self, enabled: bool) -> Self {
741-
self.update_cursor = enabled;
742-
self
743-
}
744-
745750
/// Read a WAL entry at the given position. Returns `None` if no entry exists.
751+
/// On success, asynchronously updates `wal_entry_position_last_seen` in the
752+
/// shard manifest as a best-effort cursor hint for future readers.
746753
pub async fn read_entry(&self, entry_position: u64) -> Result<Option<WalReadEntry>> {
747754
let path = self.wal_dir.child(wal_entry_filename(entry_position));
748755
let data = match self.object_store.inner.get(&path).await {
@@ -763,12 +770,10 @@ impl WalTailer {
763770
})?;
764771
let (writer_epoch, batches) = deserialize_appender_batches(bytes)?;
765772

766-
if self.update_cursor {
767-
let ms = self.manifest_store.clone();
768-
tokio::spawn(async move {
769-
let _ = best_effort_cursor_update(&ms, entry_position).await;
770-
});
771-
}
773+
let ms = self.manifest_store.clone();
774+
tokio::spawn(async move {
775+
let _ = best_effort_cursor_update(&ms, entry_position).await;
776+
});
772777

773778
Ok(Some(WalReadEntry {
774779
shard_id: self.shard_id,
@@ -781,7 +786,6 @@ impl WalTailer {
781786
/// Find the next append position (one past the latest entry).
782787
pub async fn next_position(&self) -> Result<u64> {
783788
if let Some(hint) = self.manifest_cursor_hint().await
784-
&& hint >= FIRST_WAL_ENTRY_POSITION
785789
&& let Some(tip) = self.probe_forward(hint).await?
786790
{
787791
return Ok(tip);
@@ -796,37 +800,17 @@ impl WalTailer {
796800

797801
async fn manifest_cursor_hint(&self) -> Option<u64> {
798802
let manifest = self.manifest_store.read_latest().await.ok()??;
799-
let hint = manifest.wal_entry_position_last_seen;
800-
if hint > 0 { Some(hint) } else { None }
803+
Some(manifest.wal_entry_position_last_seen)
801804
}
802805

803806
async fn probe_forward(&self, hint: u64) -> Result<Option<u64>> {
804-
let path = self.wal_dir.child(wal_entry_filename(hint));
805-
match self.object_store.inner.head(&path).await {
806-
Ok(_) => {}
807-
Err(object_store::Error::NotFound { .. }) => return Ok(None),
808-
Err(e) => {
809-
return Err(Error::io(format!(
810-
"failed to check WAL entry {} for shard {}: {}",
811-
hint, self.shard_id, e
812-
)));
813-
}
814-
}
815-
let mut pos = hint + 1;
816-
while pos - hint <= MAX_CURSOR_PROBE {
817-
let p = self.wal_dir.child(wal_entry_filename(pos));
818-
match self.object_store.inner.head(&p).await {
819-
Ok(_) => pos += 1,
820-
Err(object_store::Error::NotFound { .. }) => return Ok(Some(pos)),
821-
Err(e) => {
822-
return Err(Error::io(format!(
823-
"failed to check WAL entry {} for shard {}: {}",
824-
pos, self.shard_id, e
825-
)));
826-
}
827-
}
828-
}
829-
Ok(None)
807+
probe_forward_from(
808+
self.object_store.as_ref(),
809+
&self.wal_dir,
810+
self.shard_id,
811+
hint,
812+
)
813+
.await
830814
}
831815
}
832816

@@ -972,6 +956,42 @@ async fn atomic_put(
972956
}
973957
}
974958

959+
/// Probe forward from a hint position to find the next unwritten position.
960+
/// Returns `None` if the hint position itself doesn't exist (stale hint).
961+
async fn probe_forward_from(
962+
object_store: &ObjectStore,
963+
wal_dir: &Path,
964+
shard_id: Uuid,
965+
hint: u64,
966+
) -> Result<Option<u64>> {
967+
let path = wal_dir.child(wal_entry_filename(hint));
968+
match object_store.inner.head(&path).await {
969+
Ok(_) => {}
970+
Err(object_store::Error::NotFound { .. }) => return Ok(None),
971+
Err(e) => {
972+
return Err(Error::io(format!(
973+
"failed to check WAL entry {} for shard {}: {}",
974+
hint, shard_id, e
975+
)));
976+
}
977+
}
978+
let mut pos = hint + 1;
979+
while pos - hint <= MAX_CURSOR_PROBE {
980+
let p = wal_dir.child(wal_entry_filename(pos));
981+
match object_store.inner.head(&p).await {
982+
Ok(_) => pos += 1,
983+
Err(object_store::Error::NotFound { .. }) => return Ok(Some(pos)),
984+
Err(e) => {
985+
return Err(Error::io(format!(
986+
"failed to check WAL entry {} for shard {}: {}",
987+
pos, shard_id, e
988+
)));
989+
}
990+
}
991+
}
992+
Ok(None)
993+
}
994+
975995
async fn scan_next_position(
976996
object_store: &ObjectStore,
977997
wal_dir: &Path,
@@ -1314,26 +1334,25 @@ mod tests {
13141334
.unwrap();
13151335
}
13161336

1317-
let tailer =
1318-
WalTailer::new(store.clone(), base_path.clone(), shard_id).with_cursor_updates(true);
1319-
let entry = tailer.read_entry(2).await.unwrap().unwrap();
1320-
assert_eq!(entry.entry_position, 2);
1337+
let tailer = WalTailer::new(store.clone(), base_path.clone(), shard_id);
1338+
let entry = tailer.read_entry(1).await.unwrap().unwrap();
1339+
assert_eq!(entry.entry_position, 1);
13211340

13221341
// Best-effort cursor update is async; poll briefly until it lands.
13231342
let manifest_store = ShardManifestStore::new(store, &base_path, shard_id, 2);
13241343
let mut hint = 0u64;
13251344
for _ in 0..50 {
13261345
if let Some(m) = manifest_store.read_latest().await.unwrap() {
13271346
hint = m.wal_entry_position_last_seen;
1328-
if hint >= 2 {
1347+
if hint >= 1 {
13291348
break;
13301349
}
13311350
}
13321351
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
13331352
}
1334-
assert!(hint >= 2, "cursor hint never updated, last={hint}");
1353+
assert!(hint >= 1, "cursor hint never updated, last={hint}");
13351354

13361355
// next_position must still resolve to one past the last appended entry.
1337-
assert_eq!(tailer.next_position().await.unwrap(), 4);
1356+
assert_eq!(tailer.next_position().await.unwrap(), 3);
13381357
}
13391358
}

0 commit comments

Comments
 (0)