diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 2c1fcec28dee..0a3f27c80284 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -10,7 +10,11 @@ use std::{ collections::{BTreeSet, HashMap}, ops::Deref, - sync::Arc, + sync::{ + atomic::{AtomicI64, Ordering}, + Arc, + }, + time::{SystemTime, UNIX_EPOCH}, }; use async_lock::{Semaphore, SemaphoreGuard}; @@ -33,6 +37,7 @@ use scylla::{ }, response::PagingState, statement::{batch::BatchType, prepared::PreparedStatement, Consistency}, + value::CqlValue, }; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -42,7 +47,7 @@ use crate::metering::MeteredDatabase; #[cfg(with_testing)] use crate::store::TestKeyValueDatabase; use crate::{ - batch::UnorderedBatch, + batch::{SimpleUnorderedBatch, UnorderedBatch}, common::{get_uleb128_size, get_upper_bound_option}, journaling::{JournalingError, JournalingKeyValueDatabase}, lru_caching::{LruCachingConfig, LruCachingDatabase}, @@ -104,11 +109,18 @@ struct ScyllaDbClient { session: Session, namespace: String, read_value: PreparedStatement, + read_writetime: PreparedStatement, contains_key: PreparedStatement, write_batch_delete_prefix_unbounded: PreparedStatement, write_batch_delete_prefix_bounded: PreparedStatement, write_batch_deletion: PreparedStatement, write_batch_insertion: PreparedStatement, + // Variants carrying an explicit `USING TIMESTAMP ?` marker, used by the + // single-batch exclusive-mode write path (`write_batch_exclusive`). + write_batch_delete_prefix_unbounded_ts: PreparedStatement, + write_batch_delete_prefix_bounded_ts: PreparedStatement, + write_batch_deletion_ts: PreparedStatement, + write_batch_insertion_ts: PreparedStatement, find_keys_by_prefix_unbounded: PreparedStatement, find_keys_by_prefix_bounded: PreparedStatement, find_key_values_by_prefix_unbounded: PreparedStatement, @@ -126,6 +138,12 @@ impl ScyllaDbClient { )) .await?; + let read_writetime = session + .prepare(format!( + "SELECT WRITETIME(v) FROM {KEYSPACE}.{namespace} WHERE root_key = ? AND k = ?" + )) + .await?; + let contains_key = session .prepare(format!( "SELECT root_key FROM {KEYSPACE}.{namespace} WHERE root_key = ? AND k = ?" @@ -156,6 +174,36 @@ impl ScyllaDbClient { )) .await?; + // Timestamped variants used by the single-batch exclusive-mode path. The + // explicit `USING TIMESTAMP ?` lets prefix-deletions (`T`) and the + // insertions/deletions (`T + 1`) share one atomic batch without the range + // tombstone shadowing the inserts. + let write_batch_delete_prefix_unbounded_ts = session + .prepare(format!( + "DELETE FROM {KEYSPACE}.{namespace} USING TIMESTAMP ? WHERE root_key = ? AND k >= ?" + )) + .await?; + + let write_batch_delete_prefix_bounded_ts = session + .prepare(format!( + "DELETE FROM {KEYSPACE}.{namespace} USING TIMESTAMP ? \ + WHERE root_key = ? AND k >= ? AND k < ?" + )) + .await?; + + let write_batch_deletion_ts = session + .prepare(format!( + "DELETE FROM {KEYSPACE}.{namespace} USING TIMESTAMP ? WHERE root_key = ? AND k = ?" + )) + .await?; + + let write_batch_insertion_ts = session + .prepare(format!( + "INSERT INTO {KEYSPACE}.{namespace} (root_key, k, v) VALUES (?, ?, ?) \ + USING TIMESTAMP ?" + )) + .await?; + let find_keys_by_prefix_unbounded = session .prepare(format!( "SELECT k FROM {KEYSPACE}.{namespace} WHERE root_key = ? AND k >= ?" @@ -184,11 +232,16 @@ impl ScyllaDbClient { session, namespace, read_value, + read_writetime, contains_key, write_batch_delete_prefix_unbounded, write_batch_delete_prefix_bounded, write_batch_deletion, write_batch_insertion, + write_batch_delete_prefix_unbounded_ts, + write_batch_delete_prefix_bounded_ts, + write_batch_deletion_ts, + write_batch_insertion_ts, find_keys_by_prefix_unbounded, find_keys_by_prefix_bounded, find_key_values_by_prefix_unbounded, @@ -289,6 +342,18 @@ impl ScyllaDbClient { Ok(()) } + /// Validates a key supplied by a caller's batch. Besides the size limit, the + /// key must be non-empty: the empty (zero-length) key is `WRITETIME_SENTINEL_KEY`, + /// reserved for the per-store timestamp sentinel that exclusive mode writes + /// internally. Prefix scans now deliberately hide that key, so any caller + /// content stored there would be silently invisible to reads. DynamoDB + /// likewise forbids zero-length keys. + fn check_batch_key(key: &[u8]) -> Result<(), ScyllaDbStoreInternalError> { + Self::check_key_size(key)?; + ensure!(!key.is_empty(), ScyllaDbStoreInternalError::ZeroLengthKey); + Ok(()) + } + fn check_batch_len(batch: &UnorderedBatch) -> Result<(), ScyllaDbStoreInternalError> { ensure!( batch.len() <= MAX_BATCH_SIZE, @@ -399,47 +464,180 @@ impl ScyllaDbClient { Ok(rows.next().is_some()) } - async fn write_batch_internal( + /// Reads the write-time of a single row in microseconds since Unix epoch, + /// returning `None` if the row does not exist or carries no live value. + async fn read_writetime_internal( + &self, + root_key: &[u8], + key: Vec, + ) -> Result, ScyllaDbStoreInternalError> { + Self::check_key_size(&key)?; + let session = &self.session; + let values = (root_key.to_vec(), key); + let (result, _) = session + .execute_single_page(&self.read_writetime, &values, PagingState::start()) + .await + .map_err(ScyllaDbStoreInternalError::ExecutionError)?; + let rows = result.into_rows_result()?; + let mut rows = rows.rows::<(Option,)>()?; + Ok(match rows.next() { + Some(row) => row?.0, + None => None, + }) + } + + /// Issues an unlogged batch that contains only prefix-delete statements, + /// letting the coordinator assign the write timestamp. + async fn write_batch_prefix_deletes( + &self, + root_key: &[u8], + key_prefix_deletions: Vec>, + ) -> Result<(), ScyllaDbStoreInternalError> { + if key_prefix_deletions.is_empty() { + return Ok(()); + } + let session = &self.session; + let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged); + let mut batch_values = Vec::new(); + let q_unbounded = &self.write_batch_delete_prefix_unbounded; + let q_bounded = &self.write_batch_delete_prefix_bounded; + for key_prefix in key_prefix_deletions { + Self::check_key_size(&key_prefix)?; + match get_upper_bound_option(&key_prefix) { + None => { + batch_values.push(vec![root_key.to_vec(), key_prefix]); + batch_query.append_statement(q_unbounded.clone()); + } + Some(upper_bound) => { + batch_values.push(vec![root_key.to_vec(), key_prefix, upper_bound]); + batch_query.append_statement(q_bounded.clone()); + } + } + } + session + .batch(&batch_query, batch_values) + .await + .map_err(ScyllaDbStoreInternalError::WriteBatchExecutionError)?; + Ok(()) + } + + /// Issues an unlogged batch containing the single-key deletions and the + /// insertions, letting the coordinator assign the write timestamp. + async fn write_simple_batch( + &self, + root_key: &[u8], + batch: SimpleUnorderedBatch, + ) -> Result<(), ScyllaDbStoreInternalError> { + if batch.deletions.is_empty() && batch.insertions.is_empty() { + return Ok(()); + } + let session = &self.session; + let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged); + let mut batch_values = Vec::new(); + let q_deletion = &self.write_batch_deletion; + for key in batch.deletions { + Self::check_batch_key(&key)?; + batch_values.push(vec![root_key.to_vec(), key]); + batch_query.append_statement(q_deletion.clone()); + } + let q_insertion = &self.write_batch_insertion; + for (key, value) in batch.insertions { + Self::check_batch_key(&key)?; + Self::check_value_size(&value)?; + batch_values.push(vec![root_key.to_vec(), key, value]); + batch_query.append_statement(q_insertion.clone()); + } + session + .batch(&batch_query, batch_values) + .await + .map_err(ScyllaDbStoreInternalError::WriteBatchExecutionError)?; + Ok(()) + } + + /// Issues the whole write as a single atomic unlogged batch, used in + /// exclusive mode. Every statement carries an explicit `USING TIMESTAMP`: + /// the prefix-deletions use `t`, while the single-key deletions, the + /// insertions, and the sentinel write use `t + 1`. The higher timestamp on + /// the data ensures a range tombstone never shadows an insertion belonging + /// to the same logical batch (at equal timestamps, dead cells win over live + /// cells). Because the intended ordering is fixed by these timestamps rather + /// than by send order, the prefix-deletions and the data can — and must — + /// share one batch, preserving the atomicity that `write_batch` callers rely + /// on. The sentinel write at `WRITETIME_SENTINEL_KEY` lets a future process + /// recover this store's timestamp floor (see `ensure_ts_seeded`). + async fn write_batch_exclusive( &self, root_key: &[u8], batch: UnorderedBatch, + t: i64, ) -> Result<(), ScyllaDbStoreInternalError> { + let UnorderedBatch { + key_prefix_deletions, + simple_unordered_batch: + SimpleUnorderedBatch { + deletions, + insertions, + }, + } = batch; let session = &self.session; let mut batch_query = scylla::statement::batch::Batch::new(BatchType::Unlogged); let mut batch_values = Vec::new(); - let query1 = &self.write_batch_delete_prefix_unbounded; - let query2 = &self.write_batch_delete_prefix_bounded; - Self::check_batch_len(&batch)?; - for key_prefix in batch.key_prefix_deletions { + + // Prefix-deletions at timestamp `t`. + for key_prefix in key_prefix_deletions { Self::check_key_size(&key_prefix)?; match get_upper_bound_option(&key_prefix) { None => { - let values = vec![root_key.to_vec(), key_prefix]; - batch_values.push(values); - batch_query.append_statement(query1.clone()); + batch_values.push(vec![ + CqlValue::BigInt(t), + CqlValue::Blob(root_key.to_vec()), + CqlValue::Blob(key_prefix), + ]); + batch_query + .append_statement(self.write_batch_delete_prefix_unbounded_ts.clone()); } Some(upper_bound) => { - let values = vec![root_key.to_vec(), key_prefix, upper_bound]; - batch_values.push(values); - batch_query.append_statement(query2.clone()); + batch_values.push(vec![ + CqlValue::BigInt(t), + CqlValue::Blob(root_key.to_vec()), + CqlValue::Blob(key_prefix), + CqlValue::Blob(upper_bound), + ]); + batch_query.append_statement(self.write_batch_delete_prefix_bounded_ts.clone()); } } } - let query3 = &self.write_batch_deletion; - for key in batch.simple_unordered_batch.deletions { - Self::check_key_size(&key)?; - let values = vec![root_key.to_vec(), key]; - batch_values.push(values); - batch_query.append_statement(query3.clone()); + + // Single-key deletions, insertions, and the sentinel at timestamp `t + 1`. + let t_data = t + 1; + for key in deletions { + Self::check_batch_key(&key)?; + batch_values.push(vec![ + CqlValue::BigInt(t_data), + CqlValue::Blob(root_key.to_vec()), + CqlValue::Blob(key), + ]); + batch_query.append_statement(self.write_batch_deletion_ts.clone()); } - let query4 = &self.write_batch_insertion; - for (key, value) in batch.simple_unordered_batch.insertions { - Self::check_key_size(&key)?; + for (key, value) in insertions { + Self::check_batch_key(&key)?; Self::check_value_size(&value)?; - let values = vec![root_key.to_vec(), key, value]; - batch_values.push(values); - batch_query.append_statement(query4.clone()); + batch_values.push(vec![ + CqlValue::Blob(root_key.to_vec()), + CqlValue::Blob(key), + CqlValue::Blob(value), + CqlValue::BigInt(t_data), + ]); + batch_query.append_statement(self.write_batch_insertion_ts.clone()); } + batch_values.push(vec![ + CqlValue::Blob(root_key.to_vec()), + CqlValue::Blob(WRITETIME_SENTINEL_KEY.to_vec()), + CqlValue::Blob(Vec::new()), + CqlValue::BigInt(t_data), + ]); + batch_query.append_statement(self.write_batch_insertion_ts.clone()); + session .batch(&batch_query, batch_values) .await @@ -472,6 +670,12 @@ impl ScyllaDbClient { let mut keys = Vec::new(); while let Some(row) = rows.next().await { let (key,) = row?; + // Skip the reserved timestamp sentinel (exclusive mode writes it at the + // empty clustering key). It is an internal implementation detail and must + // not surface to callers; it can only match an empty-prefix scan. + if key == WRITETIME_SENTINEL_KEY { + continue; + } let short_key = key[len..].to_vec(); keys.push(short_key); } @@ -503,6 +707,10 @@ impl ScyllaDbClient { let mut key_values = Vec::new(); while let Some(row) = rows.next().await { let (key, value) = row?; + // Skip the reserved timestamp sentinel; see `find_keys_by_prefix_internal`. + if key == WRITETIME_SENTINEL_KEY { + continue; + } let short_key = key[len..].to_vec(); key_values.push((short_key, value)); } @@ -517,6 +725,15 @@ pub struct ScyllaDbStoreInternal { semaphore: Option>, max_stream_queries: usize, root_key: Vec, + /// Whether this store was opened with `open_exclusive`. When true, `write_batch` + /// resolves in-batch prefix/insert collisions via per-statement `USING TIMESTAMP`; + /// when false, it splits the batch into two sequential sub-batches with + /// server-side timestamps to preserve ordering across writers. + is_exclusive: bool, + /// Per-partition timestamp floor for exclusive-mode `USING TIMESTAMP` writes. + /// Value 0 means unseeded; populated lazily on first write by reading + /// `WRITETIME` of a sentinel row. Each batch reserves 2 µs (T and T+1). + ts_floor: Arc, } /// Database-level connection to ScyllaDB for managing namespaces and partitions. @@ -593,6 +810,11 @@ pub enum ScyllaDbStoreInternalError { /// The batch is too long to be written #[error("The batch is too long to be written")] BatchTooLong, + + /// Keys have to be of nonzero length (the empty key is reserved for the + /// timestamp sentinel). + #[error("The key must be of nonzero length")] + ZeroLengthKey, } impl KeyValueStoreError for ScyllaDbStoreInternalError { @@ -703,12 +925,91 @@ impl DirectWritableKeyValueStore for ScyllaDbStoreInternal { // tie-breaking rule when two cells have the same write timestamp is that dead cells // win over live cells" from // https://github.com/scylladb/scylladb/blob/master/docs/dev/timestamp-conflict-resolution.md + // + // We therefore order the prefix-deletions strictly before the insertions: + // * In exclusive mode we own the timestamps, so we issue a single atomic CQL + // batch with explicit per-statement `USING TIMESTAMP` (`T` for the + // prefix-deletions, `T + 1` for the data). See `write_batch_exclusive`. + // * In shared mode the coordinator owns the timestamps, so we split the write + // into two sequential CQL batches. type Batch = UnorderedBatch; async fn write_batch(&self, batch: Self::Batch) -> Result<(), ScyllaDbStoreInternalError> { let store = self.store.deref(); let _guard = self.acquire().await; - store.write_batch_internal(&self.root_key, batch).await + ScyllaDbClient::check_batch_len(&batch)?; + if self.is_exclusive { + // A single atomic batch; ordering is pinned by the explicit timestamps. + let t = self.next_batch_ts().await?; + store.write_batch_exclusive(&self.root_key, batch, t).await + } else { + store + .write_batch_prefix_deletes(&self.root_key, batch.key_prefix_deletions) + .await?; + store + .write_simple_batch(&self.root_key, batch.simple_unordered_batch) + .await?; + Ok(()) + } + } +} + +impl ScyllaDbStoreInternal { + /// Seeds the per-store timestamp floor on first write in exclusive mode. + /// Reads `WRITETIME` of this chain's row in the reserved sentinel + /// partition (written by every prior exclusive batch). Falls back to the + /// current wall clock if the row does not yet exist. Idempotent — only + /// the first caller wins the compare-exchange. + async fn ensure_ts_seeded(&self) -> Result<(), ScyllaDbStoreInternalError> { + if self.ts_floor.load(Ordering::Relaxed) > 0 { + return Ok(()); + } + let writetime = self + .store + .read_writetime_internal(&self.root_key, WRITETIME_SENTINEL_KEY.to_vec()) + .await? + .unwrap_or(0); + let now_us = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_micros()).ok()) + .unwrap_or(0); + // `writetime` is the last batch's `T + 1`, i.e. the highest timestamp it + // consumed; that is exactly what `ts_floor` tracks, so seed it directly. + let seed = now_us.max(writetime); + if self + .ts_floor + .compare_exchange(0, seed, Ordering::Relaxed, Ordering::Relaxed) + .is_err() + { + // Another caller seeded first; their value wins. + } + Ok(()) + } + + /// Returns the base timestamp `T` for the next batch in exclusive mode. + /// The batch may also use `T + 1`; the generator advances by 2 per call, + /// preserving monotonicity across batches in this process. + async fn next_batch_ts(&self) -> Result { + self.ensure_ts_seeded().await?; + loop { + let prev = self.ts_floor.load(Ordering::Relaxed); + let now_us = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .and_then(|d| i64::try_from(d.as_micros()).ok()) + .unwrap_or(prev); + let next = std::cmp::max(now_us, prev + 1); + // The batch uses `next` (`T`) and `next + 1` (`T + 1`); store the latter + // so the following batch starts strictly above both. + if self + .ts_floor + .compare_exchange_weak(prev, next + 1, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + return Ok(next); + } + } } } @@ -719,6 +1020,13 @@ fn get_big_root_key(root_key: &[u8]) -> Vec { big_key } +/// Reserved clustering key inside each chain's partition that holds the +/// timestamp sentinel used to seed the per-store client timestamp generator +/// in exclusive mode. The empty clustering key is unused by any caller: +/// views always write keys prefixed with a tag byte (>= `MIN_VIEW_TAG`), +/// and the journaling layer writes 6-byte keys starting with `[0, ...]`. +const WRITETIME_SENTINEL_KEY: &[u8] = &[]; + /// The type for building a new ScyllaDB Key Value Store #[derive(Clone, Debug, Deserialize, Serialize)] pub struct ScyllaDbStoreInternalConfig { @@ -769,11 +1077,24 @@ impl KeyValueDatabase for ScyllaDbDatabaseInternal { semaphore, max_stream_queries, root_key, + is_exclusive: false, + ts_floor: Arc::new(AtomicI64::new(0)), }) } fn open_exclusive(&self, root_key: &[u8]) -> Result { - self.open_shared(root_key) + let store = self.store.clone(); + let semaphore = self.semaphore.clone(); + let max_stream_queries = self.max_stream_queries; + let root_key = get_big_root_key(root_key); + Ok(ScyllaDbStoreInternal { + store, + semaphore, + max_stream_queries, + root_key, + is_exclusive: true, + ts_floor: Arc::new(AtomicI64::new(0)), + }) } async fn list_all(config: &Self::Config) -> Result, ScyllaDbStoreInternalError> { diff --git a/linera-views/src/batch.rs b/linera-views/src/batch.rs index 17ab58b85473..4f2eeaee6111 100644 --- a/linera-views/src/batch.rs +++ b/linera-views/src/batch.rs @@ -82,7 +82,12 @@ pub struct SimpleUnorderedBatch { } /// An unordered batch of deletions and insertions, together with a set of key-prefixes to -/// delete. Key-prefix deletions must happen before the insertions and the deletions. +/// delete. +/// +/// The batch may contain collisions between `key_prefix_deletions` and `insertions`: a +/// prefix in `key_prefix_deletions` may cover one or more keys in `insertions`. Backends +/// are responsible for ordering execution so that insertions "happen after" the prefix +/// deletions semantically. #[derive(Default, Serialize, Deserialize)] pub struct UnorderedBatch { /// The key-prefix deletions. @@ -119,44 +124,6 @@ impl UnorderedBatch { }) } - /// Modifies an [`UnorderedBatch`] so that the key-prefix deletions do not conflict - /// with subsequent insertions. This may require accessing the database to compute - /// lists of deleted keys. - pub async fn expand_colliding_prefix_deletions( - &mut self, - db: &DB, - ) -> Result<(), DB::Error> { - if self.key_prefix_deletions.is_empty() { - return Ok(()); - } - let inserted_keys = self - .simple_unordered_batch - .insertions - .iter() - .map(|x| x.0.clone()) - .collect::>(); - let mut key_prefix_deletions = Vec::new(); - for key_prefix in &self.key_prefix_deletions { - if inserted_keys - .range(get_key_range_for_prefix(key_prefix.clone())) - .next() - .is_some() - { - for short_key in &db.expand_delete_prefix(key_prefix).await? { - let mut key = key_prefix.clone(); - key.extend(short_key); - if !inserted_keys.contains(&key) { - self.simple_unordered_batch.deletions.push(key); - } - } - } else { - key_prefix_deletions.push(key_prefix.to_vec()); - } - } - self.key_prefix_deletions = key_prefix_deletions; - Ok(()) - } - /// The total number of entries of the batch. pub fn len(&self) -> usize { self.key_prefix_deletions.len() + self.simple_unordered_batch.len() @@ -541,7 +508,7 @@ impl BatchValueWriter for SimpleUnorderedBatchIter { } } -/// The iterator that corresponds to a `SimpleUnorderedBatch` +/// The iterator that corresponds to an [`UnorderedBatch`]. pub struct UnorderedBatchIter { delete_prefix_iter: Peekable>>, insert_deletion_iter: SimpleUnorderedBatchIter, @@ -584,12 +551,11 @@ impl SimplifiedBatch for UnorderedBatch { self.simple_unordered_batch.add_insert(key, value) } - async fn from_batch(store: S, batch: Batch) -> Result { - let mut unordered_batch = batch.simplify(); - unordered_batch - .expand_colliding_prefix_deletions(&store) - .await?; - Ok(unordered_batch) + async fn from_batch( + _store: S, + batch: Batch, + ) -> Result { + Ok(batch.simplify()) } } @@ -637,7 +603,7 @@ impl BatchValueWriter for UnorderedBatchIter { #[cfg(test)] mod tests { use linera_views::{ - batch::{Batch, SimpleUnorderedBatch, UnorderedBatch}, + batch::Batch, context::{Context, MemoryContext}, store::WritableKeyValueStore as _, }; @@ -716,29 +682,4 @@ mod tests { ); assert!(simple_unordered_batch.insertions.is_empty()); } - - #[tokio::test] - async fn test_simplify_batch6() { - let context = MemoryContext::new_for_testing(()); - let insertions = vec![(vec![1, 2, 3], vec![])]; - let simple_unordered_batch = SimpleUnorderedBatch { - insertions: insertions.clone(), - deletions: vec![], - }; - let key_prefix_deletions = vec![vec![1, 2]]; - let mut unordered_batch = UnorderedBatch { - simple_unordered_batch, - key_prefix_deletions, - }; - unordered_batch - .expand_colliding_prefix_deletions(&context) - .await - .unwrap(); - assert!(unordered_batch.simple_unordered_batch.deletions.is_empty()); - assert_eq!( - unordered_batch.simple_unordered_batch.insertions, - insertions - ); - assert!(unordered_batch.key_prefix_deletions.is_empty()); - } } diff --git a/linera-views/tests/store_tests.rs b/linera-views/tests/store_tests.rs index 8ea1ab421d73..a76fd5aeee94 100644 --- a/linera-views/tests/store_tests.rs +++ b/linera-views/tests/store_tests.rs @@ -339,6 +339,71 @@ async fn test_scylla_db_writes_from_state() { run_writes_from_state(&store).await; } +// `new_test_store` opens a shared store, so the test above only exercises the +// coordinator-timestamp path. This variant opens an exclusive store, routing the +// same batches (including the `DeletePrefix`-overlapping-`Put` cases in +// `generate_specific_state_batch`) through the explicit `T`/`T + 1` timestamps. +#[cfg(with_scylladb)] +#[tokio::test] +async fn test_scylla_db_writes_from_state_exclusive() { + use linera_views::store::KeyValueDatabase as _; + + let database = linera_views::scylla_db::ScyllaDbDatabase::connect_test_namespace() + .await + .unwrap(); + let store = database.open_exclusive(&[]).unwrap(); + run_writes_from_state(&store).await; +} + +// Exclusive mode keeps timestamps monotonic across batches via an in-memory +// floor seeded, on first write, from the `WRITETIME` of a persisted sentinel. +// A reconnect resets that floor to 0, so this test checks that a "restarted" +// store resumes strictly above the previously persisted data: the second +// process's `DeletePrefix` + `Put` must win over the first process's value. +#[cfg(with_scylladb)] +#[tokio::test] +async fn test_scylla_db_exclusive_seed_after_restart() { + use linera_views::{ + random::generate_test_namespace, scylla_db::ScyllaDbDatabase, store::KeyValueDatabase as _, + }; + + let config = ScyllaDbDatabase::new_test_config().await.unwrap(); + let namespace = generate_test_namespace(); + let key = vec![42]; + + // First process: write an initial value through an exclusive store. + { + let database = ScyllaDbDatabase::recreate_and_connect(&config, &namespace) + .await + .unwrap(); + let store = database.open_exclusive(&[]).unwrap(); + let mut batch = Batch::new(); + batch.put_key_value_bytes(key.clone(), vec![1]); + store.write_batch(batch).await.unwrap(); + } + + // Second process: a fresh database + store resets the timestamp floor to 0, + // forcing the seed to read the persisted sentinel and resume above it. + { + let database = ScyllaDbDatabase::connect(&config, &namespace) + .await + .unwrap(); + let store = database.open_exclusive(&[]).unwrap(); + let mut batch = Batch::new(); + batch.delete_key_prefix(key.clone()); + batch.put_key_value_bytes(key.clone(), vec![2]); + store.write_batch(batch).await.unwrap(); + } + + // Third process: a fresh connection (empty cache) reads from the database and + // must observe the second write, proving it was resolved as the later one. + let database = ScyllaDbDatabase::connect(&config, &namespace) + .await + .unwrap(); + let store = database.open_exclusive(&[]).unwrap(); + assert_eq!(store.read_value_bytes(&key).await.unwrap(), Some(vec![2])); +} + #[cfg(with_scylladb)] #[tokio::test] async fn test_scylladb_access() {