Skip to content

Commit 262248d

Browse files
committed
fix: very snapshot hash & unique tmp destinations to avoid conflicts
1 parent 54baf56 commit 262248d

1 file changed

Lines changed: 84 additions & 19 deletions

File tree

crates/dry_run_core/src/history/filesystem_store.rs

Lines changed: 84 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use crate::history::{
1212
DatabaseId, ProjectId, PutOutcome, SnapshotKey, SnapshotKind, SnapshotRef, SnapshotStore,
1313
SnapshotSummary, StoredSnapshot, TimeRange, parse_snapshot_filename, snapshot_path,
1414
};
15-
use crate::schema::{ActivityStatsSnapshot, PlannerStatsSnapshot, SchemaSnapshot};
15+
use crate::schema::{
16+
ActivityStatsSnapshot, HashInput, PlannerStatsSnapshot, SchemaSnapshot, compute_content_hash,
17+
};
1618

1719
pub struct FilesystemStore {
1820
root: Arc<PathBuf>,
@@ -415,40 +417,103 @@ fn find_bundle_by_schema_hash(dir: &Path, schema_hash: &str) -> Result<Option<(P
415417
fn read_bundle(path: &Path) -> Result<Bundle> {
416418
let bytes =
417419
std::fs::read(path).map_err(|e| Error::History(format!("read {}: {e}", path.display())))?;
418-
let json = zstd::decode_all(bytes.as_slice())
419-
.map_err(|e| Error::History(format!("zstd decode: {e}")))?;
420-
if let Ok(b) = serde_json::from_slice::<Bundle>(&json) {
421-
return Ok(b);
422-
}
423-
424-
// handle original base snapshot
425-
// TODO: remove in about month time
426-
let schema: SchemaSnapshot = serde_json::from_slice(&json)
427-
.map_err(|e| Error::History(format!("corrupt snapshot JSON: {e}")))?;
428-
Ok(Bundle {
429-
schema,
430-
planner: None,
431-
activity: BTreeMap::new(),
432-
})
420+
let json = zstd::decode_all(bytes.as_slice()).map_err(|e| {
421+
Error::History(format!(
422+
"corrupt snapshot {}: zstd decode: {e}",
423+
path.display()
424+
))
425+
})?;
426+
let bundle = if let Ok(b) = serde_json::from_slice::<Bundle>(&json) {
427+
b
428+
} else {
429+
// handle original base snapshot
430+
// TODO: remove in about month time
431+
let schema: SchemaSnapshot = serde_json::from_slice(&json).map_err(|e| {
432+
Error::History(format!("corrupt snapshot {}: JSON: {e}", path.display()))
433+
})?;
434+
Bundle {
435+
schema,
436+
planner: None,
437+
activity: BTreeMap::new(),
438+
}
439+
};
440+
441+
verify_bundle_hash(path, &bundle)?;
442+
Ok(bundle)
443+
}
444+
445+
// filename hash must match recomputed schema content_hash
446+
fn verify_bundle_hash(path: &Path, bundle: &Bundle) -> Result<()> {
447+
let fname = path
448+
.file_name()
449+
.and_then(|s| s.to_str())
450+
.ok_or_else(|| Error::History(format!("non-utf8 filename: {}", path.display())))?;
451+
let (_, expected) = parse_snapshot_filename(fname).ok_or_else(|| {
452+
Error::History(format!(
453+
"corrupt snapshot {}: filename does not match {{ts}}-{{hash}}.json.zst",
454+
path.display()
455+
))
456+
})?;
457+
458+
if bundle.schema.content_hash != expected {
459+
return Err(Error::History(format!(
460+
"corrupt snapshot {}: filename hash {} != stored schema.content_hash {}",
461+
path.display(),
462+
expected,
463+
bundle.schema.content_hash,
464+
)));
465+
}
466+
467+
let recomputed = compute_content_hash(&HashInput {
468+
pg_version: &bundle.schema.pg_version,
469+
tables: &bundle.schema.tables,
470+
enums: &bundle.schema.enums,
471+
domains: &bundle.schema.domains,
472+
composites: &bundle.schema.composites,
473+
views: &bundle.schema.views,
474+
functions: &bundle.schema.functions,
475+
extensions: &bundle.schema.extensions,
476+
});
477+
if recomputed != expected {
478+
return Err(Error::History(format!(
479+
"corrupt snapshot {}: filename hash {} != recomputed schema hash {}",
480+
path.display(),
481+
expected,
482+
recomputed,
483+
)));
484+
}
485+
Ok(())
433486
}
434487

435488
fn write_bundle(path: &Path, bundle: &Bundle) -> Result<()> {
436489
if let Some(parent) = path.parent() {
437490
std::fs::create_dir_all(parent)
438491
.map_err(|e| Error::History(format!("create_dir_all {}: {e}", parent.display())))?;
439492
}
440-
let tmp = path.with_extension("zst.tmp");
493+
// unique tmp path so concurrent same-hash writers don't collide
494+
let tmp = unique_tmp_path(path);
441495
let json = serde_json::to_vec(bundle)
442496
.map_err(|e| Error::History(format!("cannot serialize bundle: {e}")))?;
443497
let compressed = zstd::encode_all(json.as_slice(), 3)
444498
.map_err(|e| Error::History(format!("zstd encode: {e}")))?;
445499
std::fs::write(&tmp, compressed)
446500
.map_err(|e| Error::History(format!("write {}: {e}", tmp.display())))?;
447-
std::fs::rename(&tmp, path)
448-
.map_err(|e| Error::History(format!("rename to {}: {e}", path.display())))?;
501+
if let Err(e) = std::fs::rename(&tmp, path) {
502+
let _ = std::fs::remove_file(&tmp);
503+
return Err(Error::History(format!("rename to {}: {e}", path.display())));
504+
}
449505
Ok(())
450506
}
451507

508+
fn unique_tmp_path(path: &Path) -> PathBuf {
509+
use std::sync::atomic::{AtomicU64, Ordering};
510+
static COUNTER: AtomicU64 = AtomicU64::new(0);
511+
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
512+
let pid = std::process::id();
513+
let suffix = format!("zst.{pid}.{n}.tmp");
514+
path.with_extension(suffix)
515+
}
516+
452517
fn not_found_err(kind: &str, at: &SnapshotRef) -> Error {
453518
let detail = match at {
454519
SnapshotRef::Latest => "latest".to_string(),

0 commit comments

Comments
 (0)