@@ -19,7 +19,9 @@ use sift_rs::{
1919 runs:: v2:: Run ,
2020} ;
2121use std:: collections:: HashSet ;
22+ use std:: io:: ErrorKind as IoErrorKind ;
2223use std:: { collections:: HashMap , path:: PathBuf , sync:: Arc , time:: Duration } ;
24+ use tokio:: fs;
2325use tokio:: { sync:: broadcast, task:: JoinHandle } ;
2426use uuid:: Uuid ;
2527
@@ -276,7 +278,7 @@ impl SiftStream<IngestionConfigEncoder, FileBackup> {
276278 ///
277279 /// [`SiftStreamBuilder`]: crate::stream::builder::SiftStreamBuilder
278280 #[ allow( clippy:: too_many_arguments) ]
279- pub ( crate ) fn new_file_backup (
281+ pub ( crate ) async fn new_file_backup (
280282 grpc_channel : SiftChannel ,
281283 ingestion_config : IngestionConfig ,
282284 flows_by_name : HashMap < String , FlowDescriptor < String > > ,
@@ -292,6 +294,16 @@ impl SiftStream<IngestionConfigEncoder, FileBackup> {
292294 metrics : Arc < SiftStreamMetrics > ,
293295 ) -> Result < Self > {
294296 let full_backup_path = backups_directory. join ( output_directory) ;
297+
298+ // Ensure the output directory exists
299+ if let Err ( err) = fs:: create_dir_all ( & full_backup_path) . await
300+ && err. kind ( ) != IoErrorKind :: AlreadyExists
301+ {
302+ return Err ( Error :: new ( ErrorKind :: BackupsError , err) )
303+ . with_context ( || format ! ( "failed to create directory for backups at {}" , full_backup_path. display( ) ) )
304+ . help ( "if using a custom path for backups directory ensure that it's valid with proper permissions, otherwise contact Sift" ) ;
305+ }
306+
295307 let file_writer_config = FileWriterConfig {
296308 directory : full_backup_path,
297309 prefix : ingestion_config. client_key . clone ( ) ,
@@ -856,7 +868,8 @@ mod tests {
856868 sift_stream_id,
857869 metrics,
858870 )
859- . unwrap ( ) ;
871+ . await
872+ . expect ( "failed to create file backup stream" ) ;
860873
861874 // Finish should succeed
862875 stream. finish ( ) . await . unwrap ( ) ;
@@ -886,7 +899,8 @@ mod tests {
886899 sift_stream_id,
887900 metrics,
888901 )
889- . unwrap ( ) ;
902+ . await
903+ . expect ( "failed to create file backup stream" ) ;
890904
891905 // Write some data first
892906 let request = create_test_request ( "test_flow" , & ingestion_config. ingestion_config_id ) ;
0 commit comments