Skip to content

Commit 282edd4

Browse files
authored
Merge pull request #17 from boringSQL/filesystem-store
feat: Filesystem based pull/push of the snapshots
2 parents 96ebaf4 + f2c95ce commit 282edd4

31 files changed

Lines changed: 3050 additions & 387 deletions

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ members = ["crates/*"]
33
resolver = "2"
44

55
[workspace.package]
6-
version = "0.6.1"
6+
version = "0.7.0"
77
edition = "2024"
88
repository = "https://github.com/boringSQL/dryrun"
99

README.md

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,9 @@ dryrun lint
141141

142142
All commands work offline from the schema file. Each project has its own `dryrun.toml` and `.dryrun/`, there is no global state. Add `.dryrun/` to your `.gitignore`.
143143

144-
Snapshots live in `~/.dryrun/history.db`, keyed by `(project_id, database_id)`. The MCP server reads from the history db first and falls back to `.dryrun/schema.json` for first-run or shared snapshots. After `dryrun snapshot take` it will switch to DB.
144+
Snapshots live in `.dryrun/history.db`, keyed by `(project_id, database_id)`. The MCP server reads from the history db first and falls back to `.dryrun/schema.json` for first-run or shared snapshots. After `dryrun snapshot take` it will switch to DB.
145+
146+
Static file `schema.json` will be deprecated in future.
145147

146148
### Multi-node: capture activity from replicas
147149

@@ -193,6 +195,35 @@ Every DB-related command (`init`, `import`, `probe`, `dump-schema`, `lint`, `dri
193195

194196
> **Note:** the MCP server is currently single-database. Using the default profile. Or the option is to run one `dryrun mcp-serve` process per database. Native multi-database support inside one MCP process is tracked in [#7](https://github.com/boringSQL/dryrun/issues/7).
195197
198+
### Sharing snapshots across a team
199+
200+
DryRun's value increases in team setup. Multiple developers can pull snapshots from any POSIX compliant directory.
201+
202+
To publish the snapshots you need
203+
204+
```sh
205+
cd project_name
206+
207+
# apture from the live DB (use cwd name for project name)
208+
dryrun init --db "$DATABASE_URL"
209+
dryrun snapshot take --db "$DATABASE_URL"
210+
dryrun snapshot push --to-path ./snapshots --all
211+
```
212+
213+
Developers can then import the snapshots to the local history
214+
215+
```sh
216+
dryrun snapshot pull --from-path ./shared/snapshots --all
217+
```
218+
219+
Snapshots are content-addressed (`{project}/{database}/{ts}-{hash}.json.zst`) and idempotent: pushing the same snapshot twice won't change it.
220+
221+
The simplest deployment is a dedicated git repo. Create the snapshots repo and add `*.json.zst binary` to `.gitattributes` so git stops trying to diff bundles.`
222+
223+
Offline tools (`lint`, `check_migration`, `drift`) work immediately after the pull.
224+
225+
No server, no credentials. Same promise as before.
226+
196227
## MCP server
197228

198229
Add `dryrun` to your AI assistant. If you installed via Homebrew, `dryrun` is already on your PATH:

crates/dry_run_cli/src/main.rs

Lines changed: 184 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use std::path::PathBuf;
55

66
use clap::{Parser, Subcommand};
77
use dry_run_core::history::{
8-
DatabaseId, PutOutcome, SnapshotKey, SnapshotRef, SnapshotStore, TimeRange,
8+
DatabaseId, FilesystemStore, PutOutcome, SnapshotKey, SnapshotKind, SnapshotRef, SnapshotStore,
9+
TimeRange,
910
};
1011
use dry_run_core::{DryRun, HistoryStore, ProjectConfig};
1112
use rmcp::ServiceExt;
@@ -140,6 +141,26 @@ enum SnapshotAction {
140141
#[arg(long)]
141142
history_db: Option<PathBuf>,
142143
},
144+
Push {
145+
#[arg(long)]
146+
to_path: PathBuf,
147+
#[arg(long)]
148+
all: bool,
149+
#[arg(long, env = "DATABASE_URL")]
150+
db: Option<String>,
151+
#[arg(long)]
152+
history_db: Option<PathBuf>,
153+
},
154+
Pull {
155+
#[arg(long)]
156+
from_path: PathBuf,
157+
#[arg(long)]
158+
all: bool,
159+
#[arg(long, env = "DATABASE_URL")]
160+
db: Option<String>,
161+
#[arg(long)]
162+
history_db: Option<PathBuf>,
163+
},
143164
}
144165

145166
#[derive(Subcommand)]
@@ -342,7 +363,7 @@ schema_file = ".dryrun/schema.json"
342363
.unwrap_or_else(|| ProjectConfig::parse(""))?;
343364
let resolved = config.resolve_profile(Some(db_url), None, None, &cwd)?;
344365
let key = complete_key(&resolved, &snapshot.database);
345-
store.put(&key, &snapshot).await?;
366+
store.put_schema(&key, &snapshot).await?;
346367

347368
let planner = ctx.introspect_planner_stats(&snapshot.content_hash).await?;
348369
store.put_planner_stats(&key, &planner).await?;
@@ -478,7 +499,7 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()>
478499
let resolved = config.resolve_profile(Some(db_url), None, profile, &cwd)?;
479500
let key = complete_key(&resolved, &snapshot.database);
480501

481-
let schema_outcome = store.put(&key, &snapshot).await?;
502+
let schema_outcome = store.put_schema(&key, &snapshot).await?;
482503
match schema_outcome {
483504
PutOutcome::Inserted => {
484505
println!("Snapshot saved: {}", snapshot.content_hash);
@@ -603,7 +624,7 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()>
603624
SnapshotAction::List { db, history_db } => {
604625
let store = open_history_store(history_db.as_deref())?;
605626
let key = resolve_read_key(db.as_deref(), profile).await?;
606-
let rows = store.list(&key, TimeRange::default()).await?;
627+
let rows = store.list_schema(&key, TimeRange::default()).await?;
607628

608629
if rows.is_empty() {
609630
println!(
@@ -642,15 +663,19 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()>
642663
let key = resolve_read_key(Some(db_url), profile).await?;
643664

644665
let from_snapshot = if let Some(hash) = &from {
645-
store.get(&key, SnapshotRef::Hash(hash.clone())).await?
666+
store
667+
.get_schema(&key, SnapshotRef::Hash(hash.clone()))
668+
.await?
646669
} else if *latest {
647-
store.get(&key, SnapshotRef::Latest).await?
670+
store.get_schema(&key, SnapshotRef::Latest).await?
648671
} else {
649672
anyhow::bail!("specify --from <hash> or --latest");
650673
};
651674

652675
let to_snapshot = if let Some(hash) = &to {
653-
store.get(&key, SnapshotRef::Hash(hash.clone())).await?
676+
store
677+
.get_schema(&key, SnapshotRef::Hash(hash.clone()))
678+
.await?
654679
} else {
655680
ctx.introspect_schema().await?
656681
};
@@ -664,6 +689,52 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()>
664689
println!("{json}");
665690
Ok(())
666691
}
692+
SnapshotAction::Push {
693+
to_path,
694+
all,
695+
db,
696+
history_db,
697+
} => {
698+
let store = open_history_store(history_db.as_deref())?;
699+
let fs = FilesystemStore::new(to_path.clone());
700+
701+
let keys = if *all {
702+
store.list_keys()?
703+
} else {
704+
vec![resolve_read_key(db.as_deref(), profile).await?]
705+
};
706+
if keys.is_empty() {
707+
println!("No snapshots in history.db to push.");
708+
return Ok(());
709+
}
710+
711+
let outcomes = sync_keys(&store, &fs, &keys).await?;
712+
print_sync_outcomes("push", &outcomes, to_path);
713+
Ok(())
714+
}
715+
SnapshotAction::Pull {
716+
from_path,
717+
all,
718+
db,
719+
history_db,
720+
} => {
721+
let fs = FilesystemStore::new(from_path.clone());
722+
let store = open_history_store(history_db.as_deref())?;
723+
724+
let keys = if *all {
725+
fs.list_keys()?
726+
} else {
727+
vec![resolve_read_key(db.as_deref(), profile).await?]
728+
};
729+
if keys.is_empty() {
730+
println!("No snapshots at {} to pull.", from_path.display());
731+
return Ok(());
732+
}
733+
734+
let outcomes = sync_keys(&fs, &store, &keys).await?;
735+
print_sync_outcomes("pull", &outcomes, from_path);
736+
Ok(())
737+
}
667738
SnapshotAction::Export { out, history_db } => {
668739
let store = open_history_store(history_db.as_deref())?;
669740
let out_root = out.clone().unwrap_or_else(|| {
@@ -675,10 +746,10 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()>
675746
let keys = store.list_keys()?;
676747
let mut written = 0usize;
677748
for key in &keys {
678-
let summaries = store.list(key, TimeRange::default()).await?;
749+
let summaries = store.list_schema(key, TimeRange::default()).await?;
679750
for s in &summaries {
680751
let snap = store
681-
.get(key, SnapshotRef::Hash(s.content_hash.clone()))
752+
.get_schema(key, SnapshotRef::Hash(s.content_hash.clone()))
682753
.await?;
683754
write_snapshot_export(&out_root, key, &snap)?;
684755
written += 1;
@@ -836,6 +907,108 @@ async fn cmd_drift(
836907
Ok(())
837908
}
838909

910+
#[derive(Debug, Default)]
911+
struct KindCount {
912+
copied: usize,
913+
up_to_date: usize,
914+
}
915+
916+
#[derive(Debug)]
917+
struct SyncOutcome {
918+
key: SnapshotKey,
919+
schema: KindCount,
920+
planner: KindCount,
921+
activity: KindCount,
922+
}
923+
924+
fn kind_order(k: &SnapshotKind) -> u8 {
925+
match k {
926+
SnapshotKind::Schema => 0,
927+
SnapshotKind::Planner => 1,
928+
SnapshotKind::Activity { .. } => 2,
929+
}
930+
}
931+
932+
async fn sync_keys(
933+
src: &dyn SnapshotStore,
934+
dst: &dyn SnapshotStore,
935+
keys: &[SnapshotKey],
936+
) -> anyhow::Result<Vec<SyncOutcome>> {
937+
let mut outcomes = Vec::with_capacity(keys.len());
938+
for key in keys {
939+
let mut outcome = SyncOutcome {
940+
key: key.clone(),
941+
schema: KindCount::default(),
942+
planner: KindCount::default(),
943+
activity: KindCount::default(),
944+
};
945+
946+
let mut kinds = src.list_kinds(key).await?;
947+
// schema first so FilesystemStore's orphan rule is satisfied
948+
kinds.sort_by_key(kind_order);
949+
950+
for kind in &kinds {
951+
let src_summaries = src.list(key, kind, TimeRange::default()).await?;
952+
let dst_hashes: std::collections::HashSet<String> = dst
953+
.list(key, kind, TimeRange::default())
954+
.await?
955+
.into_iter()
956+
.map(|s| s.content_hash)
957+
.collect();
958+
959+
let counter = match kind {
960+
SnapshotKind::Schema => &mut outcome.schema,
961+
SnapshotKind::Planner => &mut outcome.planner,
962+
SnapshotKind::Activity { .. } => &mut outcome.activity,
963+
};
964+
965+
for s in src_summaries {
966+
if dst_hashes.contains(&s.content_hash) {
967+
counter.up_to_date += 1;
968+
continue;
969+
}
970+
let stored = src
971+
.get(key, kind, SnapshotRef::Hash(s.content_hash.clone()))
972+
.await?;
973+
match dst.put(key, &stored).await? {
974+
PutOutcome::Inserted => counter.copied += 1,
975+
PutOutcome::Deduped => counter.up_to_date += 1,
976+
}
977+
}
978+
}
979+
980+
outcomes.push(outcome);
981+
}
982+
Ok(outcomes)
983+
}
984+
985+
fn print_sync_outcomes(verb: &str, outcomes: &[SyncOutcome], path: &std::path::Path) {
986+
let mut total = (0usize, 0usize, 0usize, 0usize);
987+
for o in outcomes {
988+
println!(
989+
" project={} database={}: {} schema, {} planner, {} activity copied ({} up-to-date)",
990+
o.key.project_id.0,
991+
o.key.database_id.0,
992+
o.schema.copied,
993+
o.planner.copied,
994+
o.activity.copied,
995+
o.schema.up_to_date + o.planner.up_to_date + o.activity.up_to_date,
996+
);
997+
total.0 += o.schema.copied;
998+
total.1 += o.planner.copied;
999+
total.2 += o.activity.copied;
1000+
total.3 += o.schema.up_to_date + o.planner.up_to_date + o.activity.up_to_date;
1001+
}
1002+
println!(
1003+
"{verb}: {} schema, {} planner, {} activity copied / {} up-to-date ({})",
1004+
total.0,
1005+
total.1,
1006+
total.2,
1007+
total.3,
1008+
path.display(),
1009+
);
1010+
}
1011+
8391012
// helpers
8401013

8411014
fn require_db_url(db: Option<&str>) -> anyhow::Result<&str> {
@@ -934,14 +1107,8 @@ fn write_snapshot_export(
9341107
key: &SnapshotKey,
9351108
snap: &dry_run_core::SchemaSnapshot,
9361109
) -> anyhow::Result<PathBuf> {
937-
let path = out_root
938-
.join(&key.project_id.0)
939-
.join(&key.database_id.0)
940-
.join(format!(
941-
"{}-{}.json.zst",
942-
snap.timestamp.format("%Y%m%dT%H%M%SZ"),
943-
snap.content_hash,
944-
));
1110+
let path =
1111+
dry_run_core::history::snapshot_path(out_root, key, snap.timestamp, &snap.content_hash);
9451112
if let Some(parent) = path.parent() {
9461113
std::fs::create_dir_all(parent)?;
9471114
}

crates/dry_run_cli/src/mcp/server.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ async fn persist_refresh(
2828
planner: Option<&dry_run_core::PlannerStatsSnapshot>,
2929
activity_by_node: &std::collections::BTreeMap<String, dry_run_core::ActivityStatsSnapshot>,
3030
) {
31-
if let Err(e) = store.put(key, schema).await {
31+
if let Err(e) = store.put_schema(key, schema).await {
3232
tracing::warn!(error = %e, "failed to persist schema");
3333
}
3434
if let Some(p) = planner
@@ -834,18 +834,18 @@ impl DryRunServer {
834834

835835
let from_snapshot = match &params.from {
836836
Some(hash) => store
837-
.get(key, SnapshotRef::Hash(hash.clone()))
837+
.get_schema(key, SnapshotRef::Hash(hash.clone()))
838838
.await
839839
.map_err(to_mcp_err)?,
840840
None => store
841-
.get(key, SnapshotRef::Latest)
841+
.get_schema(key, SnapshotRef::Latest)
842842
.await
843843
.map_err(to_mcp_err)?,
844844
};
845845

846846
let to_snapshot = match &params.to {
847847
Some(hash) => store
848-
.get(key, SnapshotRef::Hash(hash.clone()))
848+
.get_schema(key, SnapshotRef::Hash(hash.clone()))
849849
.await
850850
.map_err(to_mcp_err)?,
851851
None => self.get_schema().await?,

crates/dry_run_cli/src/mcp/server_tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ async fn rebuild_after_refresh_preserves_replica_activity() {
384384
let schema = test_snapshot();
385385
let schema_hash = schema.content_hash.clone();
386386

387-
SnapshotStore::put(&store, &key, &schema)
387+
SnapshotStore::put_schema(&store, &key, &schema)
388388
.await
389389
.expect("seed schema");
390390
let replica = make_activity_row(&schema_hash, "replica1", "replica-h1");
@@ -436,7 +436,7 @@ async fn reload_schema_prefers_history_over_json() {
436436

437437
let schema = test_snapshot();
438438
let schema_hash = schema.content_hash.clone();
439-
SnapshotStore::put(&store, &key, &schema)
439+
SnapshotStore::put_schema(&store, &key, &schema)
440440
.await
441441
.expect("seed schema");
442442
store

crates/dry_run_cli/tests/init_e2e.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,10 @@ async fn init_full_capture_writes_schema_planner_and_activity() {
139139
database_id: DatabaseId("postgres".into()),
140140
};
141141

142-
let summaries = store.list(&key, TimeRange::default()).await.expect("list");
142+
let summaries = store
143+
.list_schema(&key, TimeRange::default())
144+
.await
145+
.expect("list");
143146
assert_eq!(
144147
summaries.len(),
145148
1,

0 commit comments

Comments
 (0)