From 97d46e2835aeaab23811d18a4149e67e30bbfa96 Mon Sep 17 00:00:00 2001 From: Mathieu Baudet Date: Mon, 18 May 2026 21:28:47 -0400 Subject: [PATCH 01/12] ScyllaDB: eliminate per-batch read in write_batch via two-phase writes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, every ScyllaDB `write_batch` containing a `DeletePrefix` that overlapped a `Put` in the same batch paid a `find_keys_by_prefix` read inside `UnorderedBatch::expand_colliding_prefix_deletions`. The expansion existed because all statements in a CQL unlogged batch share one write timestamp by default, and at equal timestamps a range tombstone shadows a same-batch insert. This fired on the hot path (notably `reset_chain_manager` clearing `pending_*_blobs` views). Split the write into two sequential unlogged batches: phase 1 issues the prefix-deletes only, phase 2 issues the puts and single-key deletes. The `.await` between phases plus token-aware routing gives phase 2 a strictly later coordinator timestamp, so LWW resolves the intended ordering without any read. In exclusive mode we additionally set explicit client timestamps `T` and `T+1` (via `Batch::set_timestamp`) to keep ordering robust against client/coordinator clock interactions; the per-store timestamp floor is seeded once on first write by reading `WRITETIME` of a reserved sentinel row (empty clustering key, which is unused by views and journaling alike), then advanced monotonically by 2 µs per batch. Shared mode uses coordinator-assigned timestamps. Drop the now-unused `expand_colliding_prefix_deletions`. The `SimplifiedBatch::from_batch` impl for `UnorderedBatch` becomes pure (`Ok(batch.simplify())`), so the per-batch read disappears at the journaling-layer boundary. --- linera-views/src/backends/scylla_db.rs | 240 ++++++++++++++++++++++--- linera-views/src/batch.rs | 85 ++------- 2 files changed, 226 insertions(+), 99 deletions(-) diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 2c1fcec28dee..7782650cde54 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -10,7 +10,11 @@ use std::{ collections::{BTreeSet, HashMap}, ops::Deref, - sync::Arc, + sync::{ + atomic::{AtomicI64, Ordering}, + Arc, + }, + time::{SystemTime, UNIX_EPOCH}, }; use async_lock::{Semaphore, SemaphoreGuard}; @@ -42,7 +46,7 @@ use crate::metering::MeteredDatabase; #[cfg(with_testing)] use crate::store::TestKeyValueDatabase; use crate::{ - batch::UnorderedBatch, + batch::{SimpleUnorderedBatch, UnorderedBatch}, common::{get_uleb128_size, get_upper_bound_option}, journaling::{JournalingError, JournalingKeyValueDatabase}, lru_caching::{LruCachingConfig, LruCachingDatabase}, @@ -104,6 +108,7 @@ struct ScyllaDbClient { session: Session, namespace: String, read_value: PreparedStatement, + read_writetime: PreparedStatement, contains_key: PreparedStatement, write_batch_delete_prefix_unbounded: PreparedStatement, write_batch_delete_prefix_bounded: PreparedStatement, @@ -126,6 +131,12 @@ impl ScyllaDbClient { )) .await?; + let read_writetime = session + .prepare(format!( + "SELECT WRITETIME(v) FROM {KEYSPACE}.{namespace} WHERE root_key = ? AND k = ?" + )) + .await?; + let contains_key = session .prepare(format!( "SELECT root_key FROM {KEYSPACE}.{namespace} WHERE root_key = ? AND k = ?" @@ -184,6 +195,7 @@ impl ScyllaDbClient { session, namespace, read_value, + read_writetime, contains_key, write_batch_delete_prefix_unbounded, write_batch_delete_prefix_bounded, @@ -399,46 +411,107 @@ impl ScyllaDbClient { Ok(rows.next().is_some()) } - async fn write_batch_internal( + /// Reads the write-time of a single row in microseconds since Unix epoch, + /// returning `None` if the row does not exist or carries no live value. + async fn read_writetime_internal( + &self, + root_key: &[u8], + key: Vec, + ) -> Result, ScyllaDbStoreInternalError> { + Self::check_key_size(&key)?; + let session = &self.session; + let values = (root_key.to_vec(), key); + let (result, _) = session + .execute_single_page(&self.read_writetime, &values, PagingState::start()) + .await + .map_err(ScyllaDbStoreInternalError::ExecutionError)?; + let rows = result.into_rows_result()?; + let mut rows = rows.rows::<(Option,)>()?; + Ok(match rows.next() { + Some(row) => row?.0, + None => None, + }) + } + + /// Issues an unlogged batch that contains only prefix-delete statements. + /// Phase 1 of the two-phase write. If `timestamp` is `Some(t)`, the batch + /// carries that explicit timestamp (exclusive mode); otherwise the + /// coordinator picks one (shared mode). No-op when `key_prefix_deletions` + /// is empty. + async fn write_batch_prefix_deletes( &self, root_key: &[u8], - batch: UnorderedBatch, + key_prefix_deletions: Vec>, + timestamp: Option, ) -> Result<(), ScyllaDbStoreInternalError> { + if key_prefix_deletions.is_empty() { + return Ok(()); + } let session = &self.session; let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged); - let mut batch_values = Vec::new(); - let query1 = &self.write_batch_delete_prefix_unbounded; - let query2 = &self.write_batch_delete_prefix_bounded; - Self::check_batch_len(&batch)?; - for key_prefix in batch.key_prefix_deletions { + batch_query.set_timestamp(timestamp); + let mut batch_values: Vec>> = Vec::new(); + let q_unbounded = &self.write_batch_delete_prefix_unbounded; + let q_bounded = &self.write_batch_delete_prefix_bounded; + for key_prefix in key_prefix_deletions { Self::check_key_size(&key_prefix)?; match get_upper_bound_option(&key_prefix) { None => { - let values = vec![root_key.to_vec(), key_prefix]; - batch_values.push(values); - batch_query.append_statement(query1.clone()); + batch_values.push(vec![root_key.to_vec(), key_prefix]); + batch_query.append_statement(q_unbounded.clone()); } Some(upper_bound) => { - let values = vec![root_key.to_vec(), key_prefix, upper_bound]; - batch_values.push(values); - batch_query.append_statement(query2.clone()); + batch_values.push(vec![root_key.to_vec(), key_prefix, upper_bound]); + batch_query.append_statement(q_bounded.clone()); } } } - let query3 = &self.write_batch_deletion; - for key in batch.simple_unordered_batch.deletions { + session + .batch(&batch_query, batch_values) + .await + .map_err(ScyllaDbStoreInternalError::WriteBatchExecutionError)?; + Ok(()) + } + + /// Issues an unlogged batch containing the single-key deletions and the + /// insertions. Phase 2 of the two-phase write. If `timestamp` is + /// `Some(t)`, the batch carries that explicit timestamp (exclusive mode); + /// otherwise the coordinator picks one (shared mode). When + /// `write_sentinel` is true, the batch additionally writes an empty + /// value at the reserved sentinel clustering key (`WRITETIME_SENTINEL_KEY`) in the + /// same partition — used in exclusive mode so future seeding reads can + /// recover the maximum timestamp issued by this store. + async fn write_batch_data( + &self, + root_key: &[u8], + deletions: Vec>, + insertions: Vec<(Vec, Vec)>, + timestamp: Option, + write_sentinel: bool, + ) -> Result<(), ScyllaDbStoreInternalError> { + if deletions.is_empty() && insertions.is_empty() && !write_sentinel { + return Ok(()); + } + let session = &self.session; + let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged); + batch_query.set_timestamp(timestamp); + let mut batch_values: Vec>> = Vec::new(); + let q_deletion = &self.write_batch_deletion; + for key in deletions { Self::check_key_size(&key)?; - let values = vec![root_key.to_vec(), key]; - batch_values.push(values); - batch_query.append_statement(query3.clone()); + batch_values.push(vec![root_key.to_vec(), key]); + batch_query.append_statement(q_deletion.clone()); } - let query4 = &self.write_batch_insertion; - for (key, value) in batch.simple_unordered_batch.insertions { + let q_insertion = &self.write_batch_insertion; + for (key, value) in insertions { Self::check_key_size(&key)?; Self::check_value_size(&value)?; - let values = vec![root_key.to_vec(), key, value]; - batch_values.push(values); - batch_query.append_statement(query4.clone()); + batch_values.push(vec![root_key.to_vec(), key, value]); + batch_query.append_statement(q_insertion.clone()); + } + if write_sentinel { + batch_values.push(vec![root_key.to_vec(), WRITETIME_SENTINEL_KEY.to_vec(), Vec::new()]); + batch_query.append_statement(q_insertion.clone()); } session .batch(&batch_query, batch_values) @@ -517,6 +590,15 @@ pub struct ScyllaDbStoreInternal { semaphore: Option>, max_stream_queries: usize, root_key: Vec, + /// Whether this store was opened with `open_exclusive`. When true, `write_batch` + /// resolves in-batch prefix/insert collisions via per-statement `USING TIMESTAMP`; + /// when false, it splits the batch into two sequential sub-batches with + /// server-side timestamps to preserve ordering across writers. + is_exclusive: bool, + /// Per-partition timestamp floor for exclusive-mode `USING TIMESTAMP` writes. + /// Value 0 means unseeded; populated lazily on first write by reading + /// `WRITETIME` of a sentinel row. Each batch reserves 2 µs (T and T+1). + ts_floor: Arc, } /// Database-level connection to ScyllaDB for managing namespaces and partitions. @@ -703,12 +785,96 @@ impl DirectWritableKeyValueStore for ScyllaDbStoreInternal { // tie-breaking rule when two cells have the same write timestamp is that dead cells // win over live cells" from // https://github.com/scylladb/scylladb/blob/master/docs/dev/timestamp-conflict-resolution.md + // + // Instead, we split the batch into two CQL batches: + // * Phase 1: the prefix-deletions. + // * Phase 2: the single-key deletions and insertions. + // In exclusive mode we attach explicit client timestamps `T` and `T+1`; in + // shared mode we let the coordinator pick timestamps and rely on token-aware + // routing plus the `.await` between phases to keep them ordered. type Batch = UnorderedBatch; async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> { let store = self.store.deref(); let _guard = self.acquire().await; - store.write_batch_internal(&self.root_key, batch).await + ScyllaDbClient::check_batch_len(&batch)?; + let UnorderedBatch { + key_prefix_deletions, + simple_unordered_batch: + SimpleUnorderedBatch { + deletions, + insertions, + }, + } = batch; + let (t_prefix, t_data, write_sentinel) = if self.is_exclusive { + let t = self.next_batch_ts().await?; + (Some(t), Some(t + 1), true) + } else { + (None, None, false) + }; + store + .write_batch_prefix_deletes(&self.root_key, key_prefix_deletions, t_prefix) + .await?; + store + .write_batch_data(&self.root_key, deletions, insertions, t_data, write_sentinel) + .await?; + Ok(()) + } +} + +impl ScyllaDbStoreInternal { + /// Seeds the per-store timestamp floor on first write in exclusive mode. + /// Reads `WRITETIME` of this chain's row in the reserved sentinel + /// partition (written by every prior exclusive batch). Falls back to the + /// current wall clock if the row does not yet exist. Idempotent — only + /// the first caller wins the compare-exchange. + async fn ensure_ts_seeded(&self) -> Result<(), ScyllaDbStoreInternalError> { + if self.ts_floor.load(Ordering::Relaxed) > 0 { + return Ok(()); + } + let writetime = self + .store + .read_writetime_internal(&self.root_key, WRITETIME_SENTINEL_KEY.to_vec()) + .await? + .unwrap_or(0); + let now_us = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_micros() as i64) + .unwrap_or(0); + // Reserve room for `T` and `T+1` of the upcoming first batch. If + // another caller raced ahead and set a higher floor already, leave it. + let seed = now_us.max(writetime + 2); + if self + .ts_floor + .compare_exchange(0, seed, Ordering::Relaxed, Ordering::Relaxed) + .is_err() + { + // Another caller seeded first; their value wins. + } + Ok(()) + } + + /// Returns the base timestamp `T` for the next batch in exclusive mode. + /// The batch may also use `T + 1`; the generator advances by 2 per call, + /// preserving monotonicity across batches in this process. + async fn next_batch_ts(&self) -> Result { + self.ensure_ts_seeded().await?; + loop { + let prev = self.ts_floor.load(Ordering::Relaxed); + let now_us = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_micros() as i64) + .unwrap_or(prev); + let next = std::cmp::max(now_us, prev + 2); + if self + .ts_floor + .compare_exchange_weak(prev, next, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + // `next` is `T + 1`; the caller's T+1 == next, base T == next - 1. + return Ok(next - 1); + } + } } } @@ -719,6 +885,13 @@ fn get_big_root_key(root_key: &[u8]) -> Vec { big_key } +/// Reserved clustering key inside each chain's partition that holds the +/// timestamp sentinel used to seed the per-store client timestamp generator +/// in exclusive mode. The empty clustering key is unused by any caller: +/// views always write keys prefixed with a tag byte (>= `MIN_VIEW_TAG`), +/// and the journaling layer writes 6-byte keys starting with `[0, ...]`. +const WRITETIME_SENTINEL_KEY: &[u8] = &[]; + /// The type for building a new ScyllaDB Key Value Store #[derive(Clone, Debug, Deserialize, Serialize)] pub struct ScyllaDbStoreInternalConfig { @@ -769,11 +942,24 @@ impl KeyValueDatabase for ScyllaDbDatabaseInternal { semaphore, max_stream_queries, root_key, + is_exclusive: false, + ts_floor: Arc::new(AtomicI64::new(0)), }) } fn open_exclusive(&self, root_key: &[u8]) -> Result { - self.open_shared(root_key) + let store = self.store.clone(); + let semaphore = self.semaphore.clone(); + let max_stream_queries = self.max_stream_queries; + let root_key = get_big_root_key(root_key); + Ok(ScyllaDbStoreInternal { + store, + semaphore, + max_stream_queries, + root_key, + is_exclusive: true, + ts_floor: Arc::new(AtomicI64::new(0)), + }) } async fn list_all(config: &Self::Config) -> Result, ScyllaDbStoreInternalError> { diff --git a/linera-views/src/batch.rs b/linera-views/src/batch.rs index 17ab58b85473..4f2eeaee6111 100644 --- a/linera-views/src/batch.rs +++ b/linera-views/src/batch.rs @@ -82,7 +82,12 @@ pub struct SimpleUnorderedBatch { } /// An unordered batch of deletions and insertions, together with a set of key-prefixes to -/// delete. Key-prefix deletions must happen before the insertions and the deletions. +/// delete. +/// +/// The batch may contain collisions between `key_prefix_deletions` and `insertions`: a +/// prefix in `key_prefix_deletions` may cover one or more keys in `insertions`. Backends +/// are responsible for ordering execution so that insertions "happen after" the prefix +/// deletions semantically. #[derive(Default, Serialize, Deserialize)] pub struct UnorderedBatch { /// The key-prefix deletions. @@ -119,44 +124,6 @@ impl UnorderedBatch { }) } - /// Modifies an [`UnorderedBatch`] so that the key-prefix deletions do not conflict - /// with subsequent insertions. This may require accessing the database to compute - /// lists of deleted keys. - pub async fn expand_colliding_prefix_deletions( - &mut self, - db: &DB, - ) -> Result<(), DB::Error> { - if self.key_prefix_deletions.is_empty() { - return Ok(()); - } - let inserted_keys = self - .simple_unordered_batch - .insertions - .iter() - .map(|x| x.0.clone()) - .collect::>(); - let mut key_prefix_deletions = Vec::new(); - for key_prefix in &self.key_prefix_deletions { - if inserted_keys - .range(get_key_range_for_prefix(key_prefix.clone())) - .next() - .is_some() - { - for short_key in &db.expand_delete_prefix(key_prefix).await? { - let mut key = key_prefix.clone(); - key.extend(short_key); - if !inserted_keys.contains(&key) { - self.simple_unordered_batch.deletions.push(key); - } - } - } else { - key_prefix_deletions.push(key_prefix.to_vec()); - } - } - self.key_prefix_deletions = key_prefix_deletions; - Ok(()) - } - /// The total number of entries of the batch. pub fn len(&self) -> usize { self.key_prefix_deletions.len() + self.simple_unordered_batch.len() @@ -541,7 +508,7 @@ impl BatchValueWriter for SimpleUnorderedBatchIter { } } -/// The iterator that corresponds to a `SimpleUnorderedBatch` +/// The iterator that corresponds to an [`UnorderedBatch`]. pub struct UnorderedBatchIter { delete_prefix_iter: Peekable>>, insert_deletion_iter: SimpleUnorderedBatchIter, @@ -584,12 +551,11 @@ impl SimplifiedBatch for UnorderedBatch { self.simple_unordered_batch.add_insert(key, value) } - async fn from_batch(store: S, batch: Batch) -> Result { - let mut unordered_batch = batch.simplify(); - unordered_batch - .expand_colliding_prefix_deletions(&store) - .await?; - Ok(unordered_batch) + async fn from_batch( + _store: S, + batch: Batch, + ) -> Result { + Ok(batch.simplify()) } } @@ -637,7 +603,7 @@ impl BatchValueWriter for UnorderedBatchIter { #[cfg(test)] mod tests { use linera_views::{ - batch::{Batch, SimpleUnorderedBatch, UnorderedBatch}, + batch::Batch, context::{Context, MemoryContext}, store::WritableKeyValueStore as _, }; @@ -716,29 +682,4 @@ mod tests { ); assert!(simple_unordered_batch.insertions.is_empty()); } - - #[tokio::test] - async fn test_simplify_batch6() { - let context = MemoryContext::new_for_testing(()); - let insertions = vec![(vec![1, 2, 3], vec![])]; - let simple_unordered_batch = SimpleUnorderedBatch { - insertions: insertions.clone(), - deletions: vec![], - }; - let key_prefix_deletions = vec![vec![1, 2]]; - let mut unordered_batch = UnorderedBatch { - simple_unordered_batch, - key_prefix_deletions, - }; - unordered_batch - .expand_colliding_prefix_deletions(&context) - .await - .unwrap(); - assert!(unordered_batch.simple_unordered_batch.deletions.is_empty()); - assert_eq!( - unordered_batch.simple_unordered_batch.insertions, - insertions - ); - assert!(unordered_batch.key_prefix_deletions.is_empty()); - } } From 5442b1ff7d969a8f6117cf9e95bc90a262f480fd Mon Sep 17 00:00:00 2001 From: Mathieu Baudet Date: Mon, 18 May 2026 22:48:47 -0400 Subject: [PATCH 02/12] fix formatting --- linera-views/src/backends/scylla_db.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 7782650cde54..81f2dcf8b0b2 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -510,7 +510,11 @@ impl ScyllaDbClient { batch_query.append_statement(q_insertion.clone()); } if write_sentinel { - batch_values.push(vec![root_key.to_vec(), WRITETIME_SENTINEL_KEY.to_vec(), Vec::new()]); + batch_values.push(vec![ + root_key.to_vec(), + WRITETIME_SENTINEL_KEY.to_vec(), + Vec::new(), + ]); batch_query.append_statement(q_insertion.clone()); } session @@ -816,7 +820,13 @@ impl DirectWritableKeyValueStore for ScyllaDbStoreInternal { .write_batch_prefix_deletes(&self.root_key, key_prefix_deletions, t_prefix) .await?; store - .write_batch_data(&self.root_key, deletions, insertions, t_data, write_sentinel) + .write_batch_data( + &self.root_key, + deletions, + insertions, + t_data, + write_sentinel, + ) .await?; Ok(()) } From 4c2e5dcd0ec594819692130e3d18738f109f1477 Mon Sep 17 00:00:00 2001 From: Mathieu Baudet Date: Tue, 19 May 2026 18:30:41 -0400 Subject: [PATCH 03/12] nit: simplify writetime arithmetic --- linera-views/src/backends/scylla_db.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 81f2dcf8b0b2..a3e1d743c306 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -851,9 +851,9 @@ impl ScyllaDbStoreInternal { .duration_since(UNIX_EPOCH) .map(|d| d.as_micros() as i64) .unwrap_or(0); - // Reserve room for `T` and `T+1` of the upcoming first batch. If - // another caller raced ahead and set a higher floor already, leave it. - let seed = now_us.max(writetime + 2); + // `writetime` is the last batch's `T + 1`, i.e. the highest timestamp it + // consumed; that is exactly what `ts_floor` tracks, so seed it directly. + let seed = now_us.max(writetime); if self .ts_floor .compare_exchange(0, seed, Ordering::Relaxed, Ordering::Relaxed) @@ -875,14 +875,15 @@ impl ScyllaDbStoreInternal { .duration_since(UNIX_EPOCH) .map(|d| d.as_micros() as i64) .unwrap_or(prev); - let next = std::cmp::max(now_us, prev + 2); + let next = std::cmp::max(now_us, prev + 1); + // The batch uses `next` (`T`) and `next + 1` (`T + 1`); store the latter + // so the following batch starts strictly above both. if self .ts_floor - .compare_exchange_weak(prev, next, Ordering::Relaxed, Ordering::Relaxed) + .compare_exchange_weak(prev, next + 1, Ordering::Relaxed, Ordering::Relaxed) .is_ok() { - // `next` is `T + 1`; the caller's T+1 == next, base T == next - 1. - return Ok(next - 1); + return Ok(next); } } } From ad9c75afaae48871a6d0993b64661b450ed24700 Mon Sep 17 00:00:00 2001 From: Mathieu Baudet Date: Tue, 19 May 2026 19:00:40 -0400 Subject: [PATCH 04/12] add tests --- linera-views/tests/store_tests.rs | 65 +++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/linera-views/tests/store_tests.rs b/linera-views/tests/store_tests.rs index 8ea1ab421d73..a76fd5aeee94 100644 --- a/linera-views/tests/store_tests.rs +++ b/linera-views/tests/store_tests.rs @@ -339,6 +339,71 @@ async fn test_scylla_db_writes_from_state() { run_writes_from_state(&store).await; } +// `new_test_store` opens a shared store, so the test above only exercises the +// coordinator-timestamp path. This variant opens an exclusive store, routing the +// same batches (including the `DeletePrefix`-overlapping-`Put` cases in +// `generate_specific_state_batch`) through the explicit `T`/`T + 1` timestamps. +#[cfg(with_scylladb)] +#[tokio::test] +async fn test_scylla_db_writes_from_state_exclusive() { + use linera_views::store::KeyValueDatabase as _; + + let database = linera_views::scylla_db::ScyllaDbDatabase::connect_test_namespace() + .await + .unwrap(); + let store = database.open_exclusive(&[]).unwrap(); + run_writes_from_state(&store).await; +} + +// Exclusive mode keeps timestamps monotonic across batches via an in-memory +// floor seeded, on first write, from the `WRITETIME` of a persisted sentinel. +// A reconnect resets that floor to 0, so this test checks that a "restarted" +// store resumes strictly above the previously persisted data: the second +// process's `DeletePrefix` + `Put` must win over the first process's value. +#[cfg(with_scylladb)] +#[tokio::test] +async fn test_scylla_db_exclusive_seed_after_restart() { + use linera_views::{ + random::generate_test_namespace, scylla_db::ScyllaDbDatabase, store::KeyValueDatabase as _, + }; + + let config = ScyllaDbDatabase::new_test_config().await.unwrap(); + let namespace = generate_test_namespace(); + let key = vec![42]; + + // First process: write an initial value through an exclusive store. + { + let database = ScyllaDbDatabase::recreate_and_connect(&config, &namespace) + .await + .unwrap(); + let store = database.open_exclusive(&[]).unwrap(); + let mut batch = Batch::new(); + batch.put_key_value_bytes(key.clone(), vec![1]); + store.write_batch(batch).await.unwrap(); + } + + // Second process: a fresh database + store resets the timestamp floor to 0, + // forcing the seed to read the persisted sentinel and resume above it. + { + let database = ScyllaDbDatabase::connect(&config, &namespace) + .await + .unwrap(); + let store = database.open_exclusive(&[]).unwrap(); + let mut batch = Batch::new(); + batch.delete_key_prefix(key.clone()); + batch.put_key_value_bytes(key.clone(), vec![2]); + store.write_batch(batch).await.unwrap(); + } + + // Third process: a fresh connection (empty cache) reads from the database and + // must observe the second write, proving it was resolved as the later one. + let database = ScyllaDbDatabase::connect(&config, &namespace) + .await + .unwrap(); + let store = database.open_exclusive(&[]).unwrap(); + assert_eq!(store.read_value_bytes(&key).await.unwrap(), Some(vec![2])); +} + #[cfg(with_scylladb)] #[tokio::test] async fn test_scylladb_access() { From 8ab833894dd54cf32da7294f99b2b4a4058da272 Mon Sep 17 00:00:00 2001 From: Mathieu Baudet Date: Wed, 20 May 2026 10:10:28 -0400 Subject: [PATCH 05/12] ScyllaDB: make exclusive-mode write_batch a single atomic batch In exclusive mode we own the write timestamps, so there is no need to split write_batch into two sequential CQL batches (the split exists only for shared mode, where the .await between phases lets the coordinator stamp the data later). Issuing two batches also broke the atomicity that write_batch callers rely on: if the prefix-delete batch committed and the data batch failed, the partition was left with prefixes deleted but their replacement data missing. Issue the whole exclusive write as one unlogged batch with explicit per-statement USING TIMESTAMP: prefix-deletions at T, single-key deletions, insertions, and the sentinel at T+1. The higher data timestamp keeps a range tombstone from shadowing a same-batch insert, and ordering is pinned by the timestamps rather than by send order, so atomicity is preserved. Shared mode keeps its two-phase, coordinator-timestamped path. --- linera-views/src/backends/scylla_db.rs | 194 +++++++++++++++++++------ 1 file changed, 151 insertions(+), 43 deletions(-) diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index a3e1d743c306..c4965c6ad353 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -37,6 +37,7 @@ use scylla::{ }, response::PagingState, statement::{batch::BatchType, prepared::PreparedStatement, Consistency}, + value::CqlValue, }; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -114,6 +115,12 @@ struct ScyllaDbClient { write_batch_delete_prefix_bounded: PreparedStatement, write_batch_deletion: PreparedStatement, write_batch_insertion: PreparedStatement, + // Variants carrying an explicit `USING TIMESTAMP ?` marker, used by the + // single-batch exclusive-mode write path (`write_batch_exclusive`). + write_batch_delete_prefix_unbounded_ts: PreparedStatement, + write_batch_delete_prefix_bounded_ts: PreparedStatement, + write_batch_deletion_ts: PreparedStatement, + write_batch_insertion_ts: PreparedStatement, find_keys_by_prefix_unbounded: PreparedStatement, find_keys_by_prefix_bounded: PreparedStatement, find_key_values_by_prefix_unbounded: PreparedStatement, @@ -167,6 +174,36 @@ impl ScyllaDbClient { )) .await?; + // Timestamped variants used by the single-batch exclusive-mode path. The + // explicit `USING TIMESTAMP ?` lets prefix-deletions (`T`) and the + // insertions/deletions (`T + 1`) share one atomic batch without the range + // tombstone shadowing the inserts. + let write_batch_delete_prefix_unbounded_ts = session + .prepare(format!( + "DELETE FROM {KEYSPACE}.{namespace} USING TIMESTAMP ? WHERE root_key = ? AND k >= ?" + )) + .await?; + + let write_batch_delete_prefix_bounded_ts = session + .prepare(format!( + "DELETE FROM {KEYSPACE}.{namespace} USING TIMESTAMP ? \ + WHERE root_key = ? AND k >= ? AND k < ?" + )) + .await?; + + let write_batch_deletion_ts = session + .prepare(format!( + "DELETE FROM {KEYSPACE}.{namespace} USING TIMESTAMP ? WHERE root_key = ? AND k = ?" + )) + .await?; + + let write_batch_insertion_ts = session + .prepare(format!( + "INSERT INTO {KEYSPACE}.{namespace} (root_key, k, v) VALUES (?, ?, ?) \ + USING TIMESTAMP ?" + )) + .await?; + let find_keys_by_prefix_unbounded = session .prepare(format!( "SELECT k FROM {KEYSPACE}.{namespace} WHERE root_key = ? AND k >= ?" @@ -201,6 +238,10 @@ impl ScyllaDbClient { write_batch_delete_prefix_bounded, write_batch_deletion, write_batch_insertion, + write_batch_delete_prefix_unbounded_ts, + write_batch_delete_prefix_bounded_ts, + write_batch_deletion_ts, + write_batch_insertion_ts, find_keys_by_prefix_unbounded, find_keys_by_prefix_bounded, find_key_values_by_prefix_unbounded, @@ -433,23 +474,19 @@ impl ScyllaDbClient { }) } - /// Issues an unlogged batch that contains only prefix-delete statements. - /// Phase 1 of the two-phase write. If `timestamp` is `Some(t)`, the batch - /// carries that explicit timestamp (exclusive mode); otherwise the - /// coordinator picks one (shared mode). No-op when `key_prefix_deletions` - /// is empty. + /// Issues an unlogged batch that contains only prefix-delete statements, + /// letting the coordinator assign the write timestamp. Phase 1 of the + /// shared-mode two-phase write. No-op when `key_prefix_deletions` is empty. async fn write_batch_prefix_deletes( &self, root_key: &[u8], key_prefix_deletions: Vec>, - timestamp: Option, ) -> Result<(), ScyllaDbStoreInternalError> { if key_prefix_deletions.is_empty() { return Ok(()); } let session = &self.session; let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged); - batch_query.set_timestamp(timestamp); let mut batch_values: Vec>> = Vec::new(); let q_unbounded = &self.write_batch_delete_prefix_unbounded; let q_bounded = &self.write_batch_delete_prefix_bounded; @@ -474,27 +511,21 @@ impl ScyllaDbClient { } /// Issues an unlogged batch containing the single-key deletions and the - /// insertions. Phase 2 of the two-phase write. If `timestamp` is - /// `Some(t)`, the batch carries that explicit timestamp (exclusive mode); - /// otherwise the coordinator picks one (shared mode). When - /// `write_sentinel` is true, the batch additionally writes an empty - /// value at the reserved sentinel clustering key (`WRITETIME_SENTINEL_KEY`) in the - /// same partition — used in exclusive mode so future seeding reads can - /// recover the maximum timestamp issued by this store. + /// insertions, letting the coordinator assign the write timestamp. Phase 2 + /// of the shared-mode two-phase write; the `.await` between the phases lets + /// the coordinator give this batch a strictly later timestamp than the + /// prefix-deletes, so a range tombstone never shadows these insertions. async fn write_batch_data( &self, root_key: &[u8], deletions: Vec>, insertions: Vec<(Vec, Vec)>, - timestamp: Option, - write_sentinel: bool, ) -> Result<(), ScyllaDbStoreInternalError> { - if deletions.is_empty() && insertions.is_empty() && !write_sentinel { + if deletions.is_empty() && insertions.is_empty() { return Ok(()); } let session = &self.session; let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged); - batch_query.set_timestamp(timestamp); let mut batch_values: Vec>> = Vec::new(); let q_deletion = &self.write_batch_deletion; for key in deletions { @@ -509,14 +540,97 @@ impl ScyllaDbClient { batch_values.push(vec![root_key.to_vec(), key, value]); batch_query.append_statement(q_insertion.clone()); } - if write_sentinel { + session + .batch(&batch_query, batch_values) + .await + .map_err(ScyllaDbStoreInternalError::WriteBatchExecutionError)?; + Ok(()) + } + + /// Issues the whole write as a single atomic unlogged batch, used in + /// exclusive mode. Every statement carries an explicit `USING TIMESTAMP`: + /// the prefix-deletions use `t`, while the single-key deletions, the + /// insertions, and the sentinel write use `t + 1`. The higher timestamp on + /// the data ensures a range tombstone never shadows an insertion belonging + /// to the same logical batch (at equal timestamps, dead cells win over live + /// cells). Because the intended ordering is fixed by these timestamps rather + /// than by send order, the prefix-deletions and the data can — and must — + /// share one batch, preserving the atomicity that `write_batch` callers rely + /// on. The sentinel write at `WRITETIME_SENTINEL_KEY` lets a future process + /// recover this store's timestamp floor (see `ensure_ts_seeded`). + async fn write_batch_exclusive( + &self, + root_key: &[u8], + batch: UnorderedBatch, + t: i64, + ) -> Result<(), ScyllaDbStoreInternalError> { + let UnorderedBatch { + key_prefix_deletions, + simple_unordered_batch: + SimpleUnorderedBatch { + deletions, + insertions, + }, + } = batch; + let session = &self.session; + let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged); + let mut batch_values: Vec> = Vec::new(); + + // Prefix-deletions at timestamp `t`. + for key_prefix in key_prefix_deletions { + Self::check_key_size(&key_prefix)?; + match get_upper_bound_option(&key_prefix) { + None => { + batch_values.push(vec![ + CqlValue::BigInt(t), + CqlValue::Blob(root_key.to_vec()), + CqlValue::Blob(key_prefix), + ]); + batch_query + .append_statement(self.write_batch_delete_prefix_unbounded_ts.clone()); + } + Some(upper_bound) => { + batch_values.push(vec![ + CqlValue::BigInt(t), + CqlValue::Blob(root_key.to_vec()), + CqlValue::Blob(key_prefix), + CqlValue::Blob(upper_bound), + ]); + batch_query.append_statement(self.write_batch_delete_prefix_bounded_ts.clone()); + } + } + } + + // Single-key deletions, insertions, and the sentinel at timestamp `t + 1`. + let t_data = t + 1; + for key in deletions { + Self::check_key_size(&key)?; batch_values.push(vec![ - root_key.to_vec(), - WRITETIME_SENTINEL_KEY.to_vec(), - Vec::new(), + CqlValue::BigInt(t_data), + CqlValue::Blob(root_key.to_vec()), + CqlValue::Blob(key), ]); - batch_query.append_statement(q_insertion.clone()); + batch_query.append_statement(self.write_batch_deletion_ts.clone()); } + for (key, value) in insertions { + Self::check_key_size(&key)?; + Self::check_value_size(&value)?; + batch_values.push(vec![ + CqlValue::Blob(root_key.to_vec()), + CqlValue::Blob(key), + CqlValue::Blob(value), + CqlValue::BigInt(t_data), + ]); + batch_query.append_statement(self.write_batch_insertion_ts.clone()); + } + batch_values.push(vec![ + CqlValue::Blob(root_key.to_vec()), + CqlValue::Blob(WRITETIME_SENTINEL_KEY.to_vec()), + CqlValue::Blob(Vec::new()), + CqlValue::BigInt(t_data), + ]); + batch_query.append_statement(self.write_batch_insertion_ts.clone()); + session .batch(&batch_query, batch_values) .await @@ -790,18 +904,24 @@ impl DirectWritableKeyValueStore for ScyllaDbStoreInternal { // win over live cells" from // https://github.com/scylladb/scylladb/blob/master/docs/dev/timestamp-conflict-resolution.md // - // Instead, we split the batch into two CQL batches: - // * Phase 1: the prefix-deletions. - // * Phase 2: the single-key deletions and insertions. - // In exclusive mode we attach explicit client timestamps `T` and `T+1`; in - // shared mode we let the coordinator pick timestamps and rely on token-aware - // routing plus the `.await` between phases to keep them ordered. + // We therefore order the prefix-deletions strictly before the insertions: + // * In exclusive mode we own the timestamps, so we issue a single atomic CQL + // batch with explicit per-statement `USING TIMESTAMP` (`T` for the + // prefix-deletions, `T + 1` for the data). See `write_batch_exclusive`. + // * In shared mode the coordinator owns the timestamps, so we split the write + // into two sequential CQL batches; the `.await` between them, together with + // token-aware routing, gives the data batch a strictly later timestamp. type Batch = UnorderedBatch; async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> { let store = self.store.deref(); let _guard = self.acquire().await; ScyllaDbClient::check_batch_len(&batch)?; + if self.is_exclusive { + // A single atomic batch; ordering is pinned by the explicit timestamps. + let t = self.next_batch_ts().await?; + return store.write_batch_exclusive(&self.root_key, batch, t).await; + } let UnorderedBatch { key_prefix_deletions, simple_unordered_batch: @@ -810,23 +930,11 @@ impl DirectWritableKeyValueStore for ScyllaDbStoreInternal { insertions, }, } = batch; - let (t_prefix, t_data, write_sentinel) = if self.is_exclusive { - let t = self.next_batch_ts().await?; - (Some(t), Some(t + 1), true) - } else { - (None, None, false) - }; store - .write_batch_prefix_deletes(&self.root_key, key_prefix_deletions, t_prefix) + .write_batch_prefix_deletes(&self.root_key, key_prefix_deletions) .await?; store - .write_batch_data( - &self.root_key, - deletions, - insertions, - t_data, - write_sentinel, - ) + .write_batch_data(&self.root_key, deletions, insertions) .await?; Ok(()) } From c9d8f906736b4fac87b10cfd0129587048b2b629 Mon Sep 17 00:00:00 2001 From: Mathieu Baudet <1105398+ma2bd@users.noreply.github.com> Date: Wed, 20 May 2026 12:31:32 -0400 Subject: [PATCH 06/12] Update linera-views/src/backends/scylla_db.rs Co-authored-by: Andreas Fackler Signed-off-by: Mathieu Baudet <1105398+ma2bd@users.noreply.github.com> --- linera-views/src/backends/scylla_db.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index c4965c6ad353..74318441ab29 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -487,7 +487,7 @@ impl ScyllaDbClient { } let session = &self.session; let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged); - let mut batch_values: Vec>> = Vec::new(); + let mut batch_values = Vec::>>::new(); let q_unbounded = &self.write_batch_delete_prefix_unbounded; let q_bounded = &self.write_batch_delete_prefix_bounded; for key_prefix in key_prefix_deletions { @@ -526,7 +526,7 @@ impl ScyllaDbClient { } let session = &self.session; let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged); - let mut batch_values: Vec>> = Vec::new(); + let mut batch_values = Vec::new(); let q_deletion = &self.write_batch_deletion; for key in deletions { Self::check_key_size(&key)?; @@ -574,7 +574,7 @@ impl ScyllaDbClient { } = batch; let session = &self.session; let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged); - let mut batch_values: Vec> = Vec::new(); + let mut batch_values = Vec::new(); // Prefix-deletions at timestamp `t`. for key_prefix in key_prefix_deletions { From 0a47a02271569f15256b8a0eed4d40d245491224 Mon Sep 17 00:00:00 2001 From: Mathieu Baudet Date: Wed, 20 May 2026 12:43:35 -0400 Subject: [PATCH 07/12] more nits --- linera-views/src/backends/scylla_db.rs | 47 ++++++++++---------------- 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 74318441ab29..b882505bc6cd 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -475,8 +475,7 @@ impl ScyllaDbClient { } /// Issues an unlogged batch that contains only prefix-delete statements, - /// letting the coordinator assign the write timestamp. Phase 1 of the - /// shared-mode two-phase write. No-op when `key_prefix_deletions` is empty. + /// letting the coordinator assign the write timestamp. async fn write_batch_prefix_deletes( &self, root_key: &[u8], @@ -511,30 +510,26 @@ impl ScyllaDbClient { } /// Issues an unlogged batch containing the single-key deletions and the - /// insertions, letting the coordinator assign the write timestamp. Phase 2 - /// of the shared-mode two-phase write; the `.await` between the phases lets - /// the coordinator give this batch a strictly later timestamp than the - /// prefix-deletes, so a range tombstone never shadows these insertions. - async fn write_batch_data( + /// insertions, letting the coordinator assign the write timestamp. + async fn write_simple_batch( &self, root_key: &[u8], - deletions: Vec>, - insertions: Vec<(Vec, Vec)>, + batch: SimpleUnorderedBatch, ) -> Result<(), ScyllaDbStoreInternalError> { - if deletions.is_empty() && insertions.is_empty() { + if batch.deletions.is_empty() && batch.insertions.is_empty() { return Ok(()); } let session = &self.session; let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged); let mut batch_values = Vec::new(); let q_deletion = &self.write_batch_deletion; - for key in deletions { + for key in batch.deletions { Self::check_key_size(&key)?; batch_values.push(vec![root_key.to_vec(), key]); batch_query.append_statement(q_deletion.clone()); } let q_insertion = &self.write_batch_insertion; - for (key, value) in insertions { + for (key, value) in batch.insertions { Self::check_key_size(&key)?; Self::check_value_size(&value)?; batch_values.push(vec![root_key.to_vec(), key, value]); @@ -909,8 +904,7 @@ impl DirectWritableKeyValueStore for ScyllaDbStoreInternal { // batch with explicit per-statement `USING TIMESTAMP` (`T` for the // prefix-deletions, `T + 1` for the data). See `write_batch_exclusive`. // * In shared mode the coordinator owns the timestamps, so we split the write - // into two sequential CQL batches; the `.await` between them, together with - // token-aware routing, gives the data batch a strictly later timestamp. + // into two sequential CQL batches. type Batch = UnorderedBatch; async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> { @@ -920,23 +914,16 @@ impl DirectWritableKeyValueStore for ScyllaDbStoreInternal { if self.is_exclusive { // A single atomic batch; ordering is pinned by the explicit timestamps. let t = self.next_batch_ts().await?; - return store.write_batch_exclusive(&self.root_key, batch, t).await; + store.write_batch_exclusive(&self.root_key, batch, t).await + } else { + store + .write_batch_prefix_deletes(&self.root_key, batch.key_prefix_deletions) + .await?; + store + .write_simple_batch(&self.root_key, batch.simple_unordered_batch) + .await?; + Ok(()) } - let UnorderedBatch { - key_prefix_deletions, - simple_unordered_batch: - SimpleUnorderedBatch { - deletions, - insertions, - }, - } = batch; - store - .write_batch_prefix_deletes(&self.root_key, key_prefix_deletions) - .await?; - store - .write_batch_data(&self.root_key, deletions, insertions) - .await?; - Ok(()) } } From 8c0b8a750f77744a2baa17476523f7b51d2219d4 Mon Sep 17 00:00:00 2001 From: Mathieu Baudet Date: Wed, 20 May 2026 21:27:02 -0400 Subject: [PATCH 08/12] ScyllaDB: hide the timestamp sentinel from prefix scans In exclusive mode, `write_batch` writes a per-store timestamp sentinel at the reserved empty clustering key. That row is an internal implementation detail, but `find_keys_by_prefix` / `find_key_values_by_prefix` did not hide it: an empty-prefix scan (`k >= ''`) matches everything, including the sentinel. The row then reached the `ValueSplittingDatabase` wrapper, whose `read_index_from_key` requires a >=4-byte chunk-index suffix and failed the 0-byte sentinel key with `TooShortKey` (seen as a `test_reads_scylla_db` panic in CI). Skip the sentinel row in both prefix-scan helpers. It can only ever match an empty-prefix scan, since for any non-empty prefix `p`, `k >= p` excludes the empty key. --- linera-views/src/backends/scylla_db.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index b882505bc6cd..1f0d9d944f37 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -658,6 +658,12 @@ impl ScyllaDbClient { let mut keys = Vec::new(); while let Some(row) = rows.next().await { let (key,) = row?; + // Skip the reserved timestamp sentinel (exclusive mode writes it at the + // empty clustering key). It is an internal implementation detail and must + // not surface to callers; it can only match an empty-prefix scan. + if key == WRITETIME_SENTINEL_KEY { + continue; + } let short_key = key[len..].to_vec(); keys.push(short_key); } @@ -689,6 +695,10 @@ impl ScyllaDbClient { let mut key_values = Vec::new(); while let Some(row) = rows.next().await { let (key, value) = row?; + // Skip the reserved timestamp sentinel; see `find_keys_by_prefix_internal`. + if key == WRITETIME_SENTINEL_KEY { + continue; + } let short_key = key[len..].to_vec(); key_values.push((short_key, value)); } From c41416210c6eae047e941caaa1ebcf9a9ce7fa6c Mon Sep 17 00:00:00 2001 From: Mathieu Baudet Date: Wed, 20 May 2026 21:46:56 -0400 Subject: [PATCH 09/12] ScyllaDB: reject zero-length keys in incoming batches Exclusive mode reserves the empty clustering key (WRITETIME_SENTINEL_KEY) for the per-store timestamp sentinel, and prefix scans now deliberately hide that key. So any caller content written at the empty key would be silently invisible to reads (besides colliding with the sentinel row). Enforce the reservation: `write_batch` rejects any insertion or single-key deletion with a zero-length key via a new `ZeroLengthKey` error, so a violation fails loudly. This also matches DynamoDB, which already forbids zero-length keys. Prefix deletions are left untouched: an empty prefix is a legitimate range operation, and in exclusive mode the sentinel is rewritten at T+1 within the same batch. --- linera-views/src/backends/scylla_db.rs | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 1f0d9d944f37..590187026364 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -342,6 +342,18 @@ impl ScyllaDbClient { Ok(()) } + /// Validates a key supplied by a caller's batch. Besides the size limit, the + /// key must be non-empty: the empty (zero-length) key is `WRITETIME_SENTINEL_KEY`, + /// reserved for the per-store timestamp sentinel that exclusive mode writes + /// internally. Prefix scans now deliberately hide that key, so any caller + /// content stored there would be silently invisible to reads. DynamoDB + /// likewise forbids zero-length keys. + fn check_batch_key(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> { + Self::check_key_size(key)?; + ensure!(!key.is_empty(), ScyllaDbStoreInternalError::ZeroLengthKey); + Ok(()) + } + fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> { ensure!( batch.len() <= MAX_BATCH_SIZE, @@ -524,13 +536,13 @@ impl ScyllaDbClient { let mut batch_values = Vec::new(); let q_deletion = &self.write_batch_deletion; for key in batch.deletions { - Self::check_key_size(&key)?; + Self::check_batch_key(&key)?; batch_values.push(vec![root_key.to_vec(), key]); batch_query.append_statement(q_deletion.clone()); } let q_insertion = &self.write_batch_insertion; for (key, value) in batch.insertions { - Self::check_key_size(&key)?; + Self::check_batch_key(&key)?; Self::check_value_size(&value)?; batch_values.push(vec![root_key.to_vec(), key, value]); batch_query.append_statement(q_insertion.clone()); @@ -599,7 +611,7 @@ impl ScyllaDbClient { // Single-key deletions, insertions, and the sentinel at timestamp `t + 1`. let t_data = t + 1; for key in deletions { - Self::check_key_size(&key)?; + Self::check_batch_key(&key)?; batch_values.push(vec![ CqlValue::BigInt(t_data), CqlValue::Blob(root_key.to_vec()), @@ -608,7 +620,7 @@ impl ScyllaDbClient { batch_query.append_statement(self.write_batch_deletion_ts.clone()); } for (key, value) in insertions { - Self::check_key_size(&key)?; + Self::check_batch_key(&key)?; Self::check_value_size(&value)?; batch_values.push(vec![ CqlValue::Blob(root_key.to_vec()), @@ -798,6 +810,11 @@ pub enum ScyllaDbStoreInternalError { /// The batch is too long to be written #[error("The batch is too long to be written")] BatchTooLong, + + /// Keys have to be of nonzero length (the empty key is reserved for the + /// timestamp sentinel). + #[error("The key must be of nonzero length")] + ZeroLengthKey, } impl KeyValueStoreError for ScyllaDbStoreInternalError { From 6358fea59018a4b2c7a619d4667e35bb2121502c Mon Sep 17 00:00:00 2001 From: Mathieu Baudet Date: Wed, 20 May 2026 21:56:12 -0400 Subject: [PATCH 10/12] ScyllaDB: fix u128->i64 truncation lint; simplify batch_values `d.as_micros()` returns u128; the `as i64` casts in the timestamp-floor seeding tripped `clippy::cast_possible_truncation` (denied in CI). Convert via `i64::try_from(..).ok()`, falling back to the existing default. Also drop the redundant turbofish on `batch_values` (the element type is inferred from the pushes), matching the other two batch helpers. --- linera-views/src/backends/scylla_db.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 590187026364..0a3f27c80284 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -498,7 +498,7 @@ impl ScyllaDbClient { } let session = &self.session; let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged); - let mut batch_values = Vec::>>::new(); + let mut batch_values = Vec::new(); let q_unbounded = &self.write_batch_delete_prefix_unbounded; let q_bounded = &self.write_batch_delete_prefix_bounded; for key_prefix in key_prefix_deletions { @@ -971,7 +971,8 @@ impl ScyllaDbStoreInternal { .unwrap_or(0); let now_us = SystemTime::now() .duration_since(UNIX_EPOCH) - .map(|d| d.as_micros() as i64) + .ok() + .and_then(|d| i64::try_from(d.as_micros()).ok()) .unwrap_or(0); // `writetime` is the last batch's `T + 1`, i.e. the highest timestamp it // consumed; that is exactly what `ts_floor` tracks, so seed it directly. @@ -995,7 +996,8 @@ impl ScyllaDbStoreInternal { let prev = self.ts_floor.load(Ordering::Relaxed); let now_us = SystemTime::now() .duration_since(UNIX_EPOCH) - .map(|d| d.as_micros() as i64) + .ok() + .and_then(|d| i64::try_from(d.as_micros()).ok()) .unwrap_or(prev); let next = std::cmp::max(now_us, prev + 1); // The batch uses `next` (`T`) and `next + 1` (`T + 1`); store the latter From 07dc71efeecd4aa33ea6a99d9906a5eea8a2cf91 Mon Sep 17 00:00:00 2001 From: Mathieu Baudet Date: Wed, 20 May 2026 21:46:56 -0400 Subject: [PATCH 11/12] test_utils: scan the empty prefix in run_reads Align run_reads with `main`: iterate prefixes of length 0..=len (was 1..=len) so the empty-prefix scan is exercised. This covers `find_keys_by_prefix(&[])` against the exclusive ScyllaDB store, which must not surface the timestamp sentinel row. --- linera-views/src/test_utils/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linera-views/src/test_utils/mod.rs b/linera-views/src/test_utils/mod.rs index 08aa65d27acc..3c29f0f89b87 100644 --- a/linera-views/src/test_utils/mod.rs +++ b/linera-views/src/test_utils/mod.rs @@ -176,7 +176,7 @@ pub async fn run_reads(store: S, key_values: Vec<(Vec, Vec store.write_batch(batch).await.unwrap(); for key_prefix in keys .iter() - .flat_map(|key| (0..key.len()).map(|u| &key[..=u])) + .flat_map(|key| (0..=key.len()).map(|u| &key[..u])) { // Getting the find_keys_by_prefix / find_key_values_by_prefix let len_prefix = key_prefix.len(); From ada748d84d7b15d0ecab668dfa9e15a6a0fe36bd Mon Sep 17 00:00:00 2001 From: Mathieu Baudet Date: Thu, 21 May 2026 11:57:40 -0400 Subject: [PATCH 12/12] Revert "test_utils: scan the empty prefix in run_reads" This reverts commit 07dc71efeecd4aa33ea6a99d9906a5eea8a2cf91. --- linera-views/src/test_utils/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linera-views/src/test_utils/mod.rs b/linera-views/src/test_utils/mod.rs index 3c29f0f89b87..08aa65d27acc 100644 --- a/linera-views/src/test_utils/mod.rs +++ b/linera-views/src/test_utils/mod.rs @@ -176,7 +176,7 @@ pub async fn run_reads(store: S, key_values: Vec<(Vec, Vec store.write_batch(batch).await.unwrap(); for key_prefix in keys .iter() - .flat_map(|key| (0..=key.len()).map(|u| &key[..u])) + .flat_map(|key| (0..key.len()).map(|u| &key[..=u])) { // Getting the find_keys_by_prefix / find_key_values_by_prefix let len_prefix = key_prefix.len();