1- use std:: sync:: Arc ;
1+ use std:: { sync:: Arc , time :: Instant } ;
22
33use env_logger:: Env ;
4+ use log:: info;
45use pretty_assertions:: assert_matches;
56use spacetimedb:: {
67 db:: {
@@ -15,6 +16,7 @@ use spacetimedb::{
1516 Identity ,
1617} ;
1718use spacetimedb_durability:: { EmptyHistory , TxOffset } ;
19+ use spacetimedb_fs_utils:: dir_trie:: DirTrie ;
1820use spacetimedb_lib:: {
1921 bsatn,
2022 db:: raw_def:: v9:: { RawModuleDefV9Builder , RawTableDefBuilder } ,
@@ -32,38 +34,33 @@ use spacetimedb_snapshot::{
3234 Snapshot , SnapshotError , SnapshotRepository ,
3335} ;
3436use spacetimedb_table:: page_pool:: PagePool ;
35- use tempfile:: tempdir;
36- use tokio:: task:: spawn_blocking;
37+ use tempfile:: { tempdir, TempDir } ;
38+ use tokio:: { sync :: OnceCell , task:: spawn_blocking} ;
3739
3840// TODO: Happy path for compressed snapshot, pending #2034
3941#[ tokio:: test]
4042async fn can_sync_a_snapshot ( ) -> anyhow:: Result < ( ) > {
4143 enable_logging ( ) ;
4244 let tmp = tempdir ( ) ?;
45+ let src = SourceSnapshot :: get_or_create ( ) . await ?;
4346
44- let src_path = SnapshotsPath :: from_path_unchecked ( tmp. path ( ) . join ( "src" ) ) ;
45- let dst_path = SnapshotsPath :: from_path_unchecked ( tmp. path ( ) . join ( "dst" ) ) ;
46-
47- src_path. create ( ) ?;
47+ let dst_path = SnapshotsPath :: from_path_unchecked ( tmp. path ( ) ) ;
4848 dst_path. create ( ) ?;
4949
50- let src_repo = SnapshotRepository :: open ( src_path, Identity :: ZERO , 0 ) . map ( Arc :: new) ?;
5150 let dst_repo = SnapshotRepository :: open ( dst_path. clone ( ) , Identity :: ZERO , 0 ) . map ( Arc :: new) ?;
5251
53- let snapshot_offset = create_snapshot ( src_repo. clone ( ) ) . await ?;
54- let src_snapshot_path = src_repo. snapshot_dir_path ( snapshot_offset) ;
55- let ( mut src_snapshot, _) = Snapshot :: read_from_file ( & src_snapshot_path. snapshot_file ( snapshot_offset) ) ?;
52+ let mut src_snapshot = src. meta . clone ( ) ;
5653 let total_objects = src_snapshot. total_objects ( ) as u64 ;
5754
58- let blob_provider = SnapshotRepository :: object_repo ( & src_snapshot_path ) . map ( Arc :: new ) ? ;
55+ let blob_provider = src . objects . clone ( ) ;
5956
6057 // This is the first snapshot in `dst_repo`, so all objects should be written.
6158 let stats = synchronize_snapshot ( blob_provider. clone ( ) , dst_path. clone ( ) , src_snapshot. clone ( ) ) . await ?;
6259 assert_eq ! ( stats. objects_written, total_objects) ;
6360
6461 // Assert that the copied snapshot is valid.
6562 let pool = PagePool :: default ( ) ;
66- let dst_snapshot_full = dst_repo. read_snapshot ( snapshot_offset , & pool) ?;
63+ let dst_snapshot_full = dst_repo. read_snapshot ( src . offset , & pool) ?;
6764 Locking :: restore_from_snapshot ( dst_snapshot_full, pool) ?;
6865
6966 // Check that `verify_snapshot` agrees.
@@ -88,38 +85,88 @@ async fn can_sync_a_snapshot() -> anyhow::Result<()> {
8885
8986#[ tokio:: test]
9087async fn rejects_overwrite ( ) -> anyhow:: Result < ( ) > {
88+ enable_logging ( ) ;
9189 let tmp = tempdir ( ) ?;
90+ let src = SourceSnapshot :: get_or_create ( ) . await ?;
9291
93- let src_path = SnapshotsPath :: from_path_unchecked ( tmp. path ( ) . join ( "src" ) ) ;
94- let dst_path = SnapshotsPath :: from_path_unchecked ( tmp. path ( ) . join ( "dst" ) ) ;
95-
96- src_path. create ( ) ?;
92+ let dst_path = SnapshotsPath :: from_path_unchecked ( tmp. path ( ) ) ;
9793 dst_path. create ( ) ?;
9894
99- let src_repo = SnapshotRepository :: open ( src_path, Identity :: ZERO , 0 ) . map ( Arc :: new) ?;
100-
101- let snapshot_offset = create_snapshot ( src_repo. clone ( ) ) . await ?;
102- let src_snapshot_path = src_repo. snapshot_dir_path ( snapshot_offset) ;
103- let ( src_snapshot, _) = Snapshot :: read_from_file ( & src_snapshot_path. snapshot_file ( snapshot_offset) ) ?;
104-
105- let blob_provider = SnapshotRepository :: object_repo ( & src_snapshot_path) . map ( Arc :: new) ?;
95+ let src_snapshot = src. meta . clone ( ) ;
96+ let blob_provider = src. objects . clone ( ) ;
10697
10798 synchronize_snapshot ( blob_provider. clone ( ) , dst_path. clone ( ) , src_snapshot. clone ( ) ) . await ?;
10899
109100 // Try to overwrite with the previous snapshot.
110- let prev_offset = src_repo. latest_snapshot_older_than ( snapshot_offset - 1 ) ?. unwrap ( ) ;
111- let src_snapshot_path = src_repo. snapshot_dir_path ( prev_offset) ;
101+ // A previous snapshot exists because one is created immediately after
102+ // database initialization.
103+ let prev_offset = src. repo . latest_snapshot_older_than ( src. offset - 1 ) ?. unwrap ( ) ;
104+ let src_snapshot_path = src. repo . snapshot_dir_path ( prev_offset) ;
112105 let ( mut src_snapshot, _) = Snapshot :: read_from_file ( & src_snapshot_path. snapshot_file ( prev_offset) ) ?;
113106 // Pretend it's the current snapshot, thereby altering the preimage.
114- src_snapshot. tx_offset = snapshot_offset ;
107+ src_snapshot. tx_offset = src . offset ;
115108
116109 let res = synchronize_snapshot ( blob_provider, dst_path, src_snapshot) . await ;
117110 assert_matches ! ( res, Err ( SnapshotError :: HashMismatch { .. } ) ) ;
118111
119112 Ok ( ( ) )
120113}
121114
115+ /// Creating a snapshot takes a long time, because we need to commit
116+ /// `SNAPSHOT_FREQUENCY` transactions to trigger one.
117+ ///
118+ /// Until the snapshot frequency becomes configurable,
119+ /// avoid creating the source snapshot repeatedly.
120+ struct SourceSnapshot {
121+ offset : TxOffset ,
122+ meta : Snapshot ,
123+ objects : Arc < DirTrie > ,
124+ repo : Arc < SnapshotRepository > ,
125+
126+ #[ allow( unused) ]
127+ tmp : TempDir ,
128+ }
129+
130+ impl SourceSnapshot {
131+ async fn get_or_create ( ) -> anyhow:: Result < & ' static Self > {
132+ static SOURCE_SNAPSHOT : OnceCell < SourceSnapshot > = OnceCell :: const_new ( ) ;
133+ SOURCE_SNAPSHOT . get_or_try_init ( Self :: try_init) . await
134+ }
135+
136+ async fn try_init ( ) -> anyhow:: Result < Self > {
137+ let tmp = tempdir ( ) ?;
138+
139+ let repo_path = SnapshotsPath :: from_path_unchecked ( tmp. path ( ) ) ;
140+ let repo = spawn_blocking ( move || {
141+ repo_path. create ( ) ?;
142+ SnapshotRepository :: open ( repo_path, Identity :: ZERO , 0 ) . map ( Arc :: new)
143+ } )
144+ . await
145+ . unwrap ( ) ?;
146+ let offset = create_snapshot ( repo. clone ( ) ) . await ?;
147+
148+ let dir_path = repo. snapshot_dir_path ( offset) ;
149+ let ( meta, objects) = spawn_blocking ( move || {
150+ let meta = Snapshot :: read_from_file ( & dir_path. snapshot_file ( offset) ) . map ( |( file, _) | file) ?;
151+ let objects = SnapshotRepository :: object_repo ( & dir_path) . map ( Arc :: new) ?;
152+
153+ Ok :: < _ , SnapshotError > ( ( meta, objects) )
154+ } )
155+ . await
156+ . unwrap ( ) ?;
157+
158+ Ok ( SourceSnapshot {
159+ offset,
160+ meta,
161+ objects,
162+ repo,
163+ tmp,
164+ } )
165+ }
166+ }
167+
122168async fn create_snapshot ( repo : Arc < SnapshotRepository > ) -> anyhow:: Result < TxOffset > {
169+ let start = Instant :: now ( ) ;
123170 let mut watch = spawn_blocking ( || {
124171 let tmp = TempReplicaDir :: new ( ) ?;
125172 let db = TestDB :: open_db ( & tmp, EmptyHistory :: new ( ) , None , Some ( repo) , 0 ) ?;
@@ -148,6 +195,10 @@ async fn create_snapshot(repo: Arc<SnapshotRepository>) -> anyhow::Result<TxOffs
148195 snapshot_offset = * watch. borrow_and_update ( ) ;
149196 }
150197 assert ! ( snapshot_offset >= SNAPSHOT_FREQUENCY ) ;
198+ info ! (
199+ "snapshot creation took {}s" ,
200+ Instant :: now( ) . duration_since( start) . as_secs_f32( )
201+ ) ;
151202
152203 Ok ( snapshot_offset)
153204}
0 commit comments