Skip to content

Commit 2f8b560

Browse files
committed
feat: FilesystemStore
1 parent f12a3e9 commit 2f8b560

2 files changed

Lines changed: 244 additions & 0 deletions

File tree

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
use std::path::{Path, PathBuf};
2+
use std::sync::Arc;
3+
4+
use async_trait::async_trait;
5+
use chrono::{DateTime, Utc};
6+
use tracing::{debug, info};
7+
8+
use crate::error::{Error, Result};
9+
use crate::history::{
10+
DatabaseId, ProjectId, PutOutcome, SnapshotKey, SnapshotRef, SnapshotStore, SnapshotSummary,
11+
TimeRange, parse_snapshot_filename, snapshot_path,
12+
};
13+
use crate::schema::SchemaSnapshot;
14+
15+
pub struct FilesystemStore {
16+
root: Arc<PathBuf>,
17+
}
18+
19+
impl FilesystemStore {
20+
pub fn new(root: impl Into<PathBuf>) -> Self {
21+
Self {
22+
root: Arc::new(root.into()),
23+
}
24+
}
25+
26+
pub fn list_keys(&self) -> Result<Vec<SnapshotKey>> {
27+
list_keys_sync(&self.root)
28+
}
29+
}
30+
31+
#[async_trait]
32+
impl SnapshotStore for FilesystemStore {
33+
async fn put(&self, key: &SnapshotKey, snap: &SchemaSnapshot) -> Result<PutOutcome> {
34+
let root = self.root.clone();
35+
let key = key.clone();
36+
let snap = snap.clone();
37+
run_blocking(move || {
38+
let stream_dir = stream_dir(&root, &key);
39+
if let Some(latest) = read_latest_hash(&stream_dir)?
40+
&& latest == snap.content_hash
41+
{
42+
debug!(hash = %snap.content_hash, "schema unchanged, skipping put");
43+
return Ok(PutOutcome::Deduped);
44+
}
45+
46+
let path = snapshot_path(&root, &key, snap.timestamp, &snap.content_hash);
47+
if let Some(parent) = path.parent() {
48+
std::fs::create_dir_all(parent).map_err(|e| {
49+
Error::History(format!("create_dir_all {}: {e}", parent.display()))
50+
})?;
51+
}
52+
53+
// atomic write: tmp + rename
54+
let tmp = path.with_extension("zst.tmp");
55+
let json = serde_json::to_vec(&snap)
56+
.map_err(|e| Error::History(format!("cannot serialize snapshot: {e}")))?;
57+
let compressed = zstd::encode_all(json.as_slice(), 3)
58+
.map_err(|e| Error::History(format!("zstd encode: {e}")))?;
59+
std::fs::write(&tmp, compressed)
60+
.map_err(|e| Error::History(format!("write {}: {e}", tmp.display())))?;
61+
std::fs::rename(&tmp, &path)
62+
.map_err(|e| Error::History(format!("rename to {}: {e}", path.display())))?;
63+
64+
info!(
65+
hash = %snap.content_hash,
66+
project = %key.project_id.0,
67+
database = %key.database_id.0,
68+
"snapshot put (fs)",
69+
);
70+
Ok(PutOutcome::Inserted)
71+
})
72+
.await
73+
}
74+
75+
async fn get(&self, key: &SnapshotKey, at: SnapshotRef) -> Result<SchemaSnapshot> {
76+
let root = self.root.clone();
77+
let key = key.clone();
78+
run_blocking(move || {
79+
let entries = read_stream_entries(&stream_dir(&root, &key))?;
80+
81+
let chosen = match &at {
82+
SnapshotRef::Latest => entries.into_iter().max_by_key(|(ts, _, _)| *ts),
83+
SnapshotRef::At(target) => entries
84+
.into_iter()
85+
.filter(|(ts, _, _)| *ts <= *target)
86+
.max_by_key(|(ts, _, _)| *ts),
87+
SnapshotRef::Hash(h) => entries.into_iter().find(|(_, hash, _)| hash == h),
88+
};
89+
90+
let (_, _, path) = chosen.ok_or_else(|| {
91+
let detail = match &at {
92+
SnapshotRef::Latest => "latest".to_string(),
93+
SnapshotRef::At(ts) => format!("at-or-before {ts}"),
94+
SnapshotRef::Hash(h) => format!("hash {h}"),
95+
};
96+
Error::History(format!("snapshot not found ({detail})"))
97+
})?;
98+
99+
let bytes = std::fs::read(&path)
100+
.map_err(|e| Error::History(format!("read {}: {e}", path.display())))?;
101+
let json = zstd::decode_all(bytes.as_slice())
102+
.map_err(|e| Error::History(format!("zstd decode: {e}")))?;
103+
serde_json::from_slice(&json)
104+
.map_err(|e| Error::History(format!("corrupt snapshot JSON: {e}")))
105+
})
106+
.await
107+
}
108+
109+
async fn list(&self, key: &SnapshotKey, range: TimeRange) -> Result<Vec<SnapshotSummary>> {
110+
let root = self.root.clone();
111+
let key = key.clone();
112+
run_blocking(move || {
113+
let entries = read_stream_entries(&stream_dir(&root, &key))?;
114+
let mut summaries: Vec<SnapshotSummary> = entries
115+
.into_iter()
116+
.filter(|(ts, _, _)| {
117+
range.from.is_none_or(|from| *ts >= from) && range.to.is_none_or(|to| *ts < to)
118+
})
119+
.map(|(ts, hash, _)| SnapshotSummary {
120+
id: 0,
121+
timestamp: ts,
122+
content_hash: hash,
123+
database: key.database_id.0.clone(),
124+
project_id: Some(key.project_id.0.clone()),
125+
database_id: Some(key.database_id.0.clone()),
126+
})
127+
.collect();
128+
summaries.sort_by_key(|s| std::cmp::Reverse(s.timestamp));
129+
Ok(summaries)
130+
})
131+
.await
132+
}
133+
134+
async fn latest(&self, key: &SnapshotKey) -> Result<Option<SnapshotSummary>> {
135+
Ok(self
136+
.list(key, TimeRange::default())
137+
.await?
138+
.into_iter()
139+
.next())
140+
}
141+
142+
async fn delete_before(&self, key: &SnapshotKey, cutoff: DateTime<Utc>) -> Result<usize> {
143+
let root = self.root.clone();
144+
let key = key.clone();
145+
run_blocking(move || {
146+
let entries = read_stream_entries(&stream_dir(&root, &key))?;
147+
let mut deleted = 0usize;
148+
for (ts, _, path) in entries {
149+
if ts < cutoff {
150+
std::fs::remove_file(&path)
151+
.map_err(|e| Error::History(format!("remove {}: {e}", path.display())))?;
152+
deleted += 1;
153+
}
154+
}
155+
Ok(deleted)
156+
})
157+
.await
158+
}
159+
}
160+
161+
fn stream_dir(root: &Path, key: &SnapshotKey) -> PathBuf {
162+
root.join(&key.project_id.0).join(&key.database_id.0)
163+
}
164+
165+
fn read_stream_entries(dir: &Path) -> Result<Vec<(DateTime<Utc>, String, PathBuf)>> {
166+
if !dir.is_dir() {
167+
return Ok(Vec::new());
168+
}
169+
let mut entries = Vec::new();
170+
for entry in std::fs::read_dir(dir)
171+
.map_err(|e| Error::History(format!("read_dir {}: {e}", dir.display())))?
172+
{
173+
let entry = entry.map_err(|e| Error::History(format!("dirent: {e}")))?;
174+
let path = entry.path();
175+
let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
176+
continue;
177+
};
178+
if let Some((ts, hash)) = parse_snapshot_filename(name) {
179+
entries.push((ts, hash, path));
180+
}
181+
}
182+
Ok(entries)
183+
}
184+
185+
fn read_latest_hash(dir: &Path) -> Result<Option<String>> {
186+
Ok(read_stream_entries(dir)?
187+
.into_iter()
188+
.max_by_key(|(ts, _, _)| *ts)
189+
.map(|(_, hash, _)| hash))
190+
}
191+
192+
fn list_keys_sync(root: &Path) -> Result<Vec<SnapshotKey>> {
193+
let mut keys = Vec::new();
194+
if !root.is_dir() {
195+
return Ok(keys);
196+
}
197+
for proj_entry in std::fs::read_dir(root)
198+
.map_err(|e| Error::History(format!("read_dir {}: {e}", root.display())))?
199+
{
200+
let proj_entry = proj_entry.map_err(|e| Error::History(format!("dirent: {e}")))?;
201+
let proj_path = proj_entry.path();
202+
if !proj_path.is_dir() {
203+
continue;
204+
}
205+
let Some(project_id) = proj_path.file_name().and_then(|n| n.to_str()) else {
206+
continue;
207+
};
208+
for db_entry in std::fs::read_dir(&proj_path)
209+
.map_err(|e| Error::History(format!("read_dir {}: {e}", proj_path.display())))?
210+
{
211+
let db_entry = db_entry.map_err(|e| Error::History(format!("dirent: {e}")))?;
212+
let db_path = db_entry.path();
213+
if !db_path.is_dir() {
214+
continue;
215+
}
216+
let Some(database_id) = db_path.file_name().and_then(|n| n.to_str()) else {
217+
continue;
218+
};
219+
keys.push(SnapshotKey {
220+
project_id: ProjectId(project_id.to_string()),
221+
database_id: DatabaseId(database_id.to_string()),
222+
});
223+
}
224+
}
225+
keys.sort_by(|a, b| {
226+
a.project_id
227+
.0
228+
.cmp(&b.project_id.0)
229+
.then_with(|| a.database_id.0.cmp(&b.database_id.0))
230+
});
231+
Ok(keys)
232+
}
233+
234+
async fn run_blocking<F, T>(f: F) -> Result<T>
235+
where
236+
F: FnOnce() -> Result<T> + Send + 'static,
237+
T: Send + 'static,
238+
{
239+
tokio::task::spawn_blocking(f)
240+
.await
241+
.map_err(|e| Error::History(format!("blocking task failed: {e}")))?
242+
}

crates/dry_run_core/src/history/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
pub mod filesystem_layout;
2+
mod filesystem_store;
23
mod snapshot_store;
34
mod store;
45

56
pub use filesystem_layout::{SNAPSHOT_EXTENSION, parse_snapshot_filename, snapshot_path};
7+
pub use filesystem_store::FilesystemStore;
68
pub use snapshot_store::{
79
DatabaseId, ProjectId, PutOutcome, SnapshotKey, SnapshotRef, SnapshotStore, TimeRange,
810
};

0 commit comments

Comments
 (0)