Skip to content

Commit d83cee1

Browse files
committed
test(depot-client): stabilize fault harness
1 parent e650129 commit d83cee1

2 files changed

Lines changed: 42 additions & 10 deletions

File tree

engine/packages/depot-client/tests/inline/fault/scenario.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ use std::future::Future;
33
use std::path::Path;
44
use std::pin::Pin;
55
use std::ptr;
6-
use std::sync::Arc;
6+
use std::sync::{Arc, LazyLock};
7+
use std::time::Duration;
78

89
use anyhow::{Context, Result, bail, ensure};
910
use depot::{
@@ -55,6 +56,8 @@ type StageFuture = Pin<Box<dyn Future<Output = Result<()>>>>;
5556
type Stage = Box<dyn FnOnce(FaultScenarioCtx) -> StageFuture>;
5657
type FaultSetup = Box<dyn FnOnce(&DepotFaultController) -> Result<()>>;
5758

59+
static FAULT_SCENARIO_RUN_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
60+
5861
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
5962
pub(crate) enum FaultProfile {
6063
Simple,
@@ -190,6 +193,15 @@ impl FaultScenario {
190193
}
191194

192195
pub(crate) fn run(self) -> Result<()> {
196+
// Fault scenarios install process-global workflow hooks for compaction
197+
// workflows, then spin and shut down workflow workers. Running multiple
198+
// scenarios in the same test process can make one scenario observe another
199+
// scenario's worker/debug lifecycle instead of its own force-compaction ack.
200+
let Some(_run_guard) = FAULT_SCENARIO_RUN_LOCK.try_lock() else {
201+
bail!(
202+
"depot-client fault scenarios cannot run in parallel; rerun with `cargo test -p depot-client fault -- --test-threads=1`"
203+
);
204+
};
193205
let runtime = Builder::new_multi_thread()
194206
.worker_threads(2)
195207
.enable_all()
@@ -436,10 +448,18 @@ impl FaultScenarioCtx {
436448
let manager_workflow_id = self.manager_workflow_id(database_branch_id).await?;
437449
let test_ctx = self.inner.test_ctx.lock().await;
438450
DepotCompactionTestDriver::new(&test_ctx)
451+
.with_wait_timeout(self.force_compaction_wait_timeout())
439452
.force_compaction(manager_workflow_id, database_branch_id, work)
440453
.await
441454
}
442455

456+
fn force_compaction_wait_timeout(&self) -> Duration {
457+
match self.inner.profile {
458+
FaultProfile::Simple => Duration::from_secs(30),
459+
FaultProfile::Chaos => Duration::from_secs(120),
460+
}
461+
}
462+
443463
pub(crate) async fn verify_sqlite_integrity(&self) -> Result<()> {
444464
self.with_database_blocking(|db| NativeSqliteOracle::verify_integrity(db.as_ptr()))?;
445465
self.inner.oracle.lock().verify_oracle_integrity()?;

engine/packages/depot-client/tests/inline/fault/verify.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,18 @@ use std::sync::Arc;
44
use anyhow::{Context, Result, anyhow, bail, ensure};
55
use depot::{
66
cold_tier::ColdTier,
7+
conveyer::branch::resolve_database_branch,
78
keys,
89
ltx::{DecodedLtx, decode_ltx_v3},
910
types::{
10-
BranchState, ColdShardRef, CommitRow, DatabaseBranchId, decode_cold_shard_ref,
11+
BranchState, BucketId, ColdShardRef, CommitRow, DatabaseBranchId, decode_cold_shard_ref,
1112
decode_commit_row, decode_compaction_root, decode_database_branch_record,
1213
decode_database_pointer, decode_db_head, decode_db_history_pin,
1314
decode_pitr_interval_coverage, decode_retired_cold_object, decode_sqlite_cmp_dirty,
1415
},
1516
};
1617
use futures_util::TryStreamExt;
18+
use rivet_pools::__rivet_util::Id;
1719
use sha2::{Digest, Sha256};
1820
use universaldb::{
1921
RangeOption,
@@ -119,15 +121,22 @@ impl<'a> InvariantScan<'a> {
119121
}
120122

121123
async fn check_database_pointer(&mut self) -> Result<Option<DatabaseBranchId>> {
122-
let mut current = None;
124+
let resolved = resolve_database_branch(
125+
self.tx,
126+
BucketId::from_gas_id(Id::nil()),
127+
&self.database_id,
128+
Serializable,
129+
)
130+
.await?;
131+
let mut scanned_current = None;
123132
for (key, value) in scan_prefix(self.tx, keys::database_pointer_cur_prefix()).await? {
124133
let decoded_key = keys::decode_database_pointer_cur_key(&key);
125134
let pointer = decode_database_pointer(&value);
126135
match (decoded_key, pointer) {
127136
(Ok((_bucket_branch_id, database_id)), Ok(pointer))
128137
if database_id == self.database_id =>
129138
{
130-
if current.replace(pointer.current_branch).is_some() {
139+
if scanned_current.replace(pointer.current_branch).is_some() {
131140
self.violate("database pointer appeared more than once");
132141
}
133142
}
@@ -141,13 +150,16 @@ impl<'a> InvariantScan<'a> {
141150
}
142151
}
143152

144-
if current.is_none() {
145-
self.violate(format!(
146-
"database pointer for {} is missing",
147-
self.database_id
148-
));
153+
let Some(current) = resolved else {
154+
self.violate(format!("database pointer for {} is missing", self.database_id));
155+
return Ok(None);
156+
};
157+
if let Some(scanned_current) = scanned_current
158+
&& scanned_current != current
159+
{
160+
self.violate("database pointer scan disagreed with branch resolution");
149161
}
150-
Ok(current)
162+
Ok(Some(current))
151163
}
152164

153165
async fn check_branch_record(&mut self, branch_id: DatabaseBranchId) -> Result<()> {

0 commit comments

Comments
 (0)