@@ -57,7 +57,6 @@ use tokio::sync::{RwLock, mpsc, oneshot};
5757use tracing:: { trace, warn} ;
5858
5959use super :: DatasetHandle ;
60- use super :: schema:: KvSchema ;
6160use crate :: kvs:: err:: { Error , Result } ;
6261use crate :: kvs:: { Key , Val } ;
6362
@@ -342,9 +341,23 @@ async fn execute_batch(dataset: &Arc<RwLock<DatasetHandle>>, batch: Vec<Submissi
342341 }
343342}
344343
345- /// Issue a single Lance `MergeInsertBuilder::execute_reader` (for writes)
346- /// followed by a `Dataset::delete` (for deletes). One dataset version is
347- /// produced per call — concurrent submitters in the same batch share it.
344+ /// Apply a batch's writes **and** deletes as a SINGLE Lance commit.
345+ ///
346+ /// Writes become live rows (`tombstone = false`); deletes become
347+ /// tombstone rows (`tombstone = true`). Both are streamed into one
348+ /// `MergeInsertBuilder::execute_reader` keyed on `key`. `execute_batch`
349+ /// coalesces every key to exactly one of write/delete, so the merge
350+ /// source has unique keys and the upsert is well-defined - producing
351+ /// exactly ONE dataset version per call, which concurrent submitters in
352+ /// the same batch share.
353+ ///
354+ /// Folding deletes in as tombstone rows (rather than a separate
355+ /// `Dataset::delete`) is what keeps the Lance version history aligned
356+ /// with SurrealDB commit boundaries. The old `merge_insert` +
357+ /// `Dataset::delete` pair produced *two* versions for a write+delete
358+ /// batch; the intermediate write-applied/delete-pending version, though
359+ /// hidden from live readers by the write lock, leaked through
360+ /// `Timeline::versions()` as a snapshot that was never an atomic commit.
348361async fn single_lance_commit (
349362 dataset : & Arc < RwLock < DatasetHandle > > ,
350363 writes : Vec < ( Key , Val ) > ,
@@ -357,38 +370,42 @@ async fn single_lance_commit(
357370 return Ok ( ( ) ) ;
358371 }
359372
360- let mut ds = dataset . write ( ) . await ;
361-
362- // ── writes ─────────────────────────────────────────────────────────
373+ // Build the merge source: live rows for writes, tombstone rows for
374+ // deletes. Identical schema, so both stream through one reader.
375+ let mut batches : Vec < arrow_array :: RecordBatch > = Vec :: with_capacity ( 2 ) ;
363376 if !writes. is_empty ( ) {
364- let batch = super :: Transaction :: build_write_batch_lance ( & writes, version)
365- . map_err ( |e| Error :: Datastore ( format ! ( "lance build batch: {e}" ) ) ) ?;
366- let schema_ref = batch. schema ( ) ;
367- let reader = arrow_array:: RecordBatchIterator :: new ( vec ! [ Ok ( batch) ] , schema_ref) ;
368-
369- let arc_ds = Arc :: new ( ds. inner . clone ( ) ) ;
370- let ( new_ds, _stats) = MergeInsertBuilder :: try_new ( arc_ds, vec ! [ "key" . into( ) ] )
371- . map_err ( |e| Error :: Datastore ( format ! ( "lance merge builder: {e}" ) ) ) ?
372- . when_matched ( WhenMatched :: UpdateAll )
373- . when_not_matched ( WhenNotMatched :: InsertAll )
374- . try_build ( )
375- . map_err ( |e| Error :: Datastore ( format ! ( "lance merge build: {e}" ) ) ) ?
376- . execute_reader ( reader)
377- . await
378- . map_err ( |e| Error :: Datastore ( format ! ( "lance merge_insert: {e}" ) ) ) ?;
379-
380- ds. inner = Arc :: try_unwrap ( new_ds) . unwrap_or_else ( |arc| ( * arc) . clone ( ) ) ;
377+ batches. push (
378+ super :: Transaction :: build_write_batch_lance ( & writes, version)
379+ . map_err ( |e| Error :: Datastore ( format ! ( "lance build batch: {e}" ) ) ) ?,
380+ ) ;
381381 }
382-
383- // ── deletes ────────────────────────────────────────────────────────
384382 if !deletes. is_empty ( ) {
385- let predicate = KvSchema :: build_delete_predicate ( & deletes) ;
386- ds. inner
387- . delete ( & predicate)
388- . await
389- . map_err ( |e| Error :: Datastore ( format ! ( "lance delete: {e}" ) ) ) ?;
383+ batches. push (
384+ super :: Transaction :: build_tombstone_batch_lance ( & deletes, version)
385+ . map_err ( |e| Error :: Datastore ( format ! ( "lance build tombstones: {e}" ) ) ) ?,
386+ ) ;
390387 }
391388
389+ let schema_ref = batches[ 0 ] . schema ( ) ;
390+ let reader = arrow_array:: RecordBatchIterator :: new (
391+ batches. into_iter ( ) . map ( Ok :: < _ , arrow_schema:: ArrowError > ) . collect :: < Vec < _ > > ( ) ,
392+ schema_ref,
393+ ) ;
394+
395+ let mut ds = dataset. write ( ) . await ;
396+ let arc_ds = Arc :: new ( ds. inner . clone ( ) ) ;
397+ let ( new_ds, _stats) = MergeInsertBuilder :: try_new ( arc_ds, vec ! [ "key" . into( ) ] )
398+ . map_err ( |e| Error :: Datastore ( format ! ( "lance merge builder: {e}" ) ) ) ?
399+ . when_matched ( WhenMatched :: UpdateAll )
400+ . when_not_matched ( WhenNotMatched :: InsertAll )
401+ . try_build ( )
402+ . map_err ( |e| Error :: Datastore ( format ! ( "lance merge build: {e}" ) ) ) ?
403+ . execute_reader ( reader)
404+ . await
405+ . map_err ( |e| Error :: Datastore ( format ! ( "lance merge_insert: {e}" ) ) ) ?;
406+
407+ ds. inner = Arc :: try_unwrap ( new_ds) . unwrap_or_else ( |arc| ( * arc) . clone ( ) ) ;
408+
392409 Ok ( ( ) )
393410}
394411
0 commit comments