1- use super :: { BackupsManager , Message } ;
2- use bytesize:: ByteSize ;
3- use chrono:: Utc ;
1+ use super :: Message ;
42use prost:: Message as PbMessage ;
53use sift_error:: prelude:: * ;
6- use std:: {
7- fs:: { self , File } ,
8- io:: { BufReader , Error as IoError , ErrorKind as IoErrorKind , Write } ,
9- path:: { Path , PathBuf } ,
10- sync:: Arc ,
11- } ;
12- use tokio:: {
13- sync:: {
14- Notify ,
15- mpsc:: { Receiver , Sender , channel} ,
16- } ,
17- task:: JoinHandle ,
18- } ;
4+ use std:: { fs:: File , io:: BufReader , path:: Path } ;
195
206mod async_manager;
217pub ( crate ) use async_manager:: AsyncBackupsManager ;
@@ -25,17 +11,7 @@ pub use policy::{DiskBackupPolicy, RollingFilePolicy};
2511
2612/// Concerned with writing/reading protobuf from disk.
2713mod pbfs;
28- use pbfs:: {
29- BackupsDecoder ,
30- chunk:: { BATCH_SIZE_LEN , CHECKSUM_HEADER_LEN , MESSAGE_LENGTH_PREFIX_LEN , PbfsChunk } ,
31- } ;
32-
33- /// Default maximum backup file size - 100 MiB.
34- pub const DEFAULT_MAX_BACKUP_SIZE : usize = 100 * 2_usize . pow ( 20 ) ;
35-
36- /// The buffer size used for the channel that sends and receives data to the backup task as well as
37- /// the in-memory message buffer that gets flushed when full.
38- const CHANNEL_BUFFER_SIZE : usize = 10_000 ;
14+ use pbfs:: BackupsDecoder ;
3915
4016/// Takes in a path to a backup file and returns an instance of [BackupsDecoder] which is an
4117/// iterator over the protobuf messages found in the backup file. The iterator will terminate when
@@ -53,214 +29,3 @@ where
5329 . context ( "failed to open backup" )
5430 . help ( "contact Sift" )
5531}
56-
57- /// Disk-based backup strategy implementation.
58- #[ derive( Debug ) ]
59- pub struct DiskBackupsManager < T > {
60- pub ( crate ) backups_root : PathBuf ,
61- pub ( crate ) new_dir_name : String ,
62- pub ( crate ) backup_prefix : String ,
63- /// Max allowed backup size in bytes.
64- pub ( crate ) max_backup_size : usize ,
65-
66- pub ( crate ) backup_file : PathBuf ,
67- backup_task : Option < JoinHandle < Result < ( ) > > > ,
68- flush_and_sync_notification : Arc < Notify > ,
69- data_tx : Sender < Message < T > > ,
70- }
71-
72- impl < T > DiskBackupsManager < T >
73- where
74- T : PbMessage + Default + ' static ,
75- {
76- /// Users shouldn't have to call interact with [DiskBackupsManager::new] directly.
77- ///
78- /// `backups_root` is the directory that stores data backups. If it doesn't exist then there
79- /// will be an attempt to create it. If `None`, then the user's [data
80- /// directory](https://docs.rs/dirs/6.0.0/dirs/fn.data_dir.html) will be used as a default.
81- pub fn new (
82- backups_root : Option < PathBuf > ,
83- new_dir_name : & str ,
84- backup_prefix : & str ,
85- max_backup_size : Option < usize > ,
86- ) -> Result < Self > {
87- let ( data_tx, data_rx) = channel :: < Message < T > > ( CHANNEL_BUFFER_SIZE ) ;
88-
89- let Some ( backups_root) = backups_root. or_else ( dirs:: data_dir) else {
90- return Err (
91- IoError :: new ( IoErrorKind :: NotFound , "user data directory not found" ) . into ( ) ,
92- ) ;
93- } ;
94- let backups_dir = backups_root. join ( new_dir_name) ;
95-
96- match fs:: create_dir_all ( & backups_dir) {
97- Err ( err) if err. kind ( ) != IoErrorKind :: AlreadyExists => {
98- return Err ( Error :: new ( ErrorKind :: BackupsError , err) )
99- . with_context ( || format ! ( "failed to create directory for backups at {}" , backups_dir. display( ) ) )
100- . help ( "if using a custom path for backups directory ensure that it's valid with proper permissions, otherwise contact Sift" )
101- }
102- _ => ( )
103- }
104-
105- let backup_file =
106- backups_dir. join ( format ! ( "{backup_prefix}-{}" , Utc :: now( ) . timestamp_millis( ) ) ) ;
107-
108- let max_backup_size = max_backup_size. unwrap_or ( DEFAULT_MAX_BACKUP_SIZE ) ;
109-
110- #[ cfg( feature = "tracing" ) ]
111- tracing:: info!(
112- backup_file = format!( "{}" , backup_file. display( ) ) ,
113- max_backup_size = format!( "{}" , ByteSize :: b( max_backup_size as u64 ) ) ,
114- "backup file initialized"
115- ) ;
116-
117- let backup = File :: create ( & backup_file)
118- . map_err ( |e| Error :: new ( ErrorKind :: BackupsError , e) )
119- . context ( "failed generate backup file" )
120- . help ( "please contact Sift" ) ?;
121-
122- let flush_and_sync_notification = Arc :: new ( Notify :: new ( ) ) ;
123-
124- let backup_task = Self :: init_backup_task (
125- data_rx,
126- max_backup_size,
127- backup,
128- flush_and_sync_notification. clone ( ) ,
129- )
130- . context ( "failed to start backup task" ) ?;
131-
132- Ok ( Self {
133- backups_root,
134- new_dir_name : new_dir_name. into ( ) ,
135- backup_prefix : backup_prefix. into ( ) ,
136- backup_task : Some ( backup_task) ,
137- backup_file,
138- data_tx,
139- max_backup_size,
140- flush_and_sync_notification,
141- } )
142- }
143-
144- fn init_backup_task (
145- mut data_rx : Receiver < Message < T > > ,
146- max_backup_size : usize ,
147- mut backup : File ,
148- flush_and_sync_notifier : Arc < Notify > ,
149- ) -> Result < JoinHandle < Result < ( ) > > > {
150- let join_handle = tokio:: task:: spawn_blocking ( move || -> Result < ( ) > {
151- let mut message_buffer = Vec :: with_capacity ( CHANNEL_BUFFER_SIZE ) ;
152- let mut bytes_processed = 0 ;
153-
154- while let Some ( message) = data_rx. blocking_recv ( ) {
155- match message {
156- Message :: Data ( val) => {
157- bytes_processed += val. encoded_len ( ) + MESSAGE_LENGTH_PREFIX_LEN ;
158- message_buffer. push ( val) ;
159-
160- if bytes_processed >= max_backup_size
161- || message_buffer. len ( ) >= CHANNEL_BUFFER_SIZE
162- {
163- let chunk = PbfsChunk :: new ( & message_buffer) ?;
164- backup. write_all ( & chunk) ?;
165- backup. sync_all ( ) ?;
166- bytes_processed += CHECKSUM_HEADER_LEN + BATCH_SIZE_LEN ;
167- message_buffer. clear ( ) ;
168-
169- if bytes_processed >= max_backup_size {
170- #[ cfg( feature = "tracing" ) ]
171- tracing:: debug!( "backup size exceeded max configured" ) ;
172-
173- flush_and_sync_notifier. notify_one ( ) ;
174- break ;
175- }
176- }
177- }
178- Message :: Flush => {
179- #[ cfg( feature = "tracing" ) ]
180- tracing:: debug!( "backup task received flush and sync signal" ) ;
181-
182- if !message_buffer. is_empty ( ) {
183- let chunk = PbfsChunk :: new ( & message_buffer) ?;
184- backup. write_all ( & chunk) ?;
185- backup. sync_all ( ) ?;
186- }
187- flush_and_sync_notifier. notify_one ( ) ;
188- break ;
189- }
190- Message :: Complete => {
191- #[ cfg( feature = "tracing" ) ]
192- tracing:: debug!( "backup task complete - shutting down" ) ;
193-
194- break ;
195- }
196- }
197- }
198- Ok ( ( ) )
199- } ) ;
200- Ok ( join_handle)
201- }
202- }
203-
204- impl < T > BackupsManager < T > for DiskBackupsManager < T >
205- where
206- T : PbMessage + Default + ' static ,
207- {
208- /// Send data point to be backed up.
209- async fn send ( & mut self , msg : T ) -> Result < ( ) > {
210- match self . data_tx . send ( Message :: Data ( msg) ) . await {
211- Ok ( _) => Ok ( ( ) ) ,
212-
213- // Backup task has shutdown due to max bytes being reached.
214- Err ( _) => {
215- let Some ( backup_task) = self . backup_task . take ( ) else {
216- return Ok ( ( ) ) ;
217- } ;
218- match backup_task. await {
219- Ok ( res) => match res {
220- Ok ( _) => Err ( Error :: new_msg (
221- ErrorKind :: BackupLimitReached ,
222- "backup limit reached" ,
223- ) ) ,
224- Err ( err) => Err ( Error :: new ( ErrorKind :: BackupsError , err) )
225- . context ( "backup task encountered an error" )
226- . help ( "please notify Sift" ) ,
227- } ,
228- Err ( err) => Err ( Error :: new ( ErrorKind :: BackupsError , err) )
229- . context ( "error waiting for backup task to finish" )
230- . help ( "please notify Sift" ) ,
231- }
232- }
233- }
234- }
235-
236- /// Use for graceful termination. This will clean up the backup file.
237- async fn finish ( mut self ) -> Result < ( ) > {
238- let _ = self . data_tx . send ( Message :: Complete ) . await ;
239-
240- if let Some ( backup_task) = self . backup_task . take ( ) {
241- backup_task
242- . await
243- . map_err ( |e| Error :: new ( ErrorKind :: BackupsError , e) )
244- . context ( "failed to join disk backup task" )
245- . help ( "please contact Sift" ) ??;
246- }
247- Ok ( ( ) )
248- }
249-
250- /// Sends the a message to the backup task to flush and sync if there's any buffered data
251- /// before creating an iterator over the backup's contents.
252- async fn get_backup_data ( & mut self ) -> Result < impl Iterator < Item = Result < T > > > {
253- let _ = self . data_tx . send ( Message :: Flush ) . await ;
254- self . flush_and_sync_notification . notified ( ) . await ;
255-
256- let backups_decoder = decode_backup ( & self . backup_file ) ?;
257-
258- Ok ( backups_decoder. into_iter ( ) )
259- }
260- }
261-
262- impl < T > Drop for DiskBackupsManager < T > {
263- fn drop ( & mut self ) {
264- let _ = fs:: remove_file ( & self . backup_file ) ;
265- }
266- }
0 commit comments