Skip to content

Commit 2c88ca7

Browse files
authored
rust(feat): Improve backup directory names and structure (#326)
1 parent bf1c268 commit 2c88ca7

7 files changed

Lines changed: 541 additions & 94 deletions

File tree

rust/crates/sift_stream/src/backup/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,19 @@ enum Message<T> {
3333
/// Force the backup task to flush its contents to the target data container.
3434
Flush,
3535
}
36+
37+
/// Sanitize a name by replacing illegal characters with underscores.
38+
pub(crate) fn sanitize_name(name: &str) -> String {
39+
name.chars()
40+
.map(|c| match c {
41+
':' | '/' | '\\' | '*' | '?' | '"' | '<' | '>' | '|' | '.' => '_',
42+
_ => {
43+
if c.is_whitespace() {
44+
'_'
45+
} else {
46+
c
47+
}
48+
}
49+
})
50+
.collect()
51+
}

rust/crates/sift_stream/src/backup/test.rs

Lines changed: 15 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,106 +1,29 @@
11
use super::{BackupsManager, DiskBackupPolicy, DiskBackupsManager, InMemoryBackupsManager};
2-
use crate::SiftChannel;
3-
use crate::TimeValue;
42
use crate::backup::disk::AsyncBackupsManager;
5-
use hyper_util::rt::TokioIo;
6-
use sift_connect::grpc::interceptor::AuthInterceptor;
3+
use crate::{TimeValue, backup::sanitize_name};
74
use sift_error::ErrorKind;
85
use sift_rs::ingest::v1::{
96
IngestWithConfigDataChannelValue, IngestWithConfigDataStreamRequest,
107
ingest_with_config_data_channel_value::Type,
118
};
12-
use sift_rs::ingest::v1::{
13-
IngestWithConfigDataStreamResponse,
14-
ingest_service_server::{IngestService, IngestServiceServer},
15-
};
169
use std::fs;
17-
use std::sync::{Arc, Mutex};
1810
use tempdir::TempDir;
19-
use tonic::transport::{Endpoint, Server, Uri};
20-
use tonic::{Request, Response, Status};
21-
use tower::{ServiceBuilder, service_fn};
22-
23-
#[derive(Debug, Clone)]
24-
struct MockIngestService {
25-
captured_data: Arc<Mutex<Vec<IngestWithConfigDataStreamRequest>>>,
26-
}
27-
28-
impl MockIngestService {
29-
fn new() -> Self {
30-
Self {
31-
captured_data: Arc::new(Mutex::new(Vec::new())),
32-
}
33-
}
3411

35-
fn get_captured_data(&self) -> Vec<IngestWithConfigDataStreamRequest> {
36-
self.captured_data.lock().unwrap().clone()
12+
#[test]
13+
fn test_sanitize_name_with_illegal_chars() {
14+
let illegal_chars = vec![
15+
':', '/', '\\', '*', '?', '"', '<', '>', '|', '.', ' ', '\t', '\n', '\r',
16+
];
17+
for char in illegal_chars {
18+
assert_eq!(sanitize_name(&format!("test{}test", char)), "test_test");
3719
}
3820
}
3921

40-
#[tonic::async_trait]
41-
impl IngestService for MockIngestService {
42-
async fn ingest_with_config_data_stream(
43-
&self,
44-
request: Request<tonic::Streaming<IngestWithConfigDataStreamRequest>>,
45-
) -> Result<Response<IngestWithConfigDataStreamResponse>, Status> {
46-
let mut stream = request.into_inner();
47-
48-
while let Some(data) = stream.message().await? {
49-
self.captured_data.lock().unwrap().push(data);
50-
}
51-
52-
Ok(Response::new(IngestWithConfigDataStreamResponse {}))
53-
}
54-
55-
async fn ingest_arbitrary_protobuf_data_stream(
56-
&self,
57-
_request: Request<
58-
tonic::Streaming<sift_rs::ingest::v1::IngestArbitraryProtobufDataStreamRequest>,
59-
>,
60-
) -> Result<Response<sift_rs::ingest::v1::IngestArbitraryProtobufDataStreamResponse>, Status>
61-
{
62-
Err(Status::unimplemented("Not implemented for test"))
63-
}
64-
}
65-
66-
async fn create_mock_grpc_channel_with_service() -> (SiftChannel, Arc<MockIngestService>) {
67-
let mock_service = Arc::new(MockIngestService::new());
68-
let service_clone = mock_service.clone();
69-
70-
let (client, server) = tokio::io::duplex(1024);
71-
72-
tokio::spawn(async move {
73-
Server::builder()
74-
.add_service(IngestServiceServer::new(service_clone.as_ref().clone()))
75-
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
76-
.await
77-
.unwrap();
78-
});
79-
80-
let mut client = Some(client);
81-
let channel = Endpoint::try_from("http://[::]:50051")
82-
.unwrap()
83-
.connect_with_connector(service_fn(move |_: Uri| {
84-
let client = client.take();
85-
86-
async move {
87-
if let Some(client) = client {
88-
Ok(TokioIo::new(client))
89-
} else {
90-
Err(std::io::Error::other("Client already taken"))
91-
}
92-
}
93-
}))
94-
.await
95-
.unwrap();
96-
97-
let sift_channel = ServiceBuilder::new()
98-
.layer(tonic::service::interceptor(AuthInterceptor {
99-
apikey: "test-api-key".to_string(),
100-
}))
101-
.service(channel);
102-
103-
(sift_channel, mock_service)
22+
#[test]
23+
fn test_sanitize_name_with_legal_chars() {
24+
assert_eq!(sanitize_name("test"), "test");
25+
assert_eq!(sanitize_name("test_test"), "test_test");
26+
assert_eq!(sanitize_name("test-test"), "test-test");
10427
}
10528

10629
#[tokio::test]
@@ -229,7 +152,7 @@ async fn test_async_backups_manager_retrieve_data_with_graceful_termination() {
229152
..Default::default()
230153
};
231154
let backup_retry_policy = crate::RetryPolicy::default();
232-
let (grpc_channel, mock_service) = create_mock_grpc_channel_with_service().await;
155+
let (grpc_channel, mock_service) = crate::test::create_mock_grpc_channel_with_service().await;
233156

234157
let mut backups_manager = AsyncBackupsManager::<IngestWithConfigDataStreamRequest>::new(
235158
&backups_dir,
@@ -308,7 +231,7 @@ async fn test_async_backups_manager_discard_data_with_graceful_termination() {
308231
..Default::default()
309232
};
310233
let backup_retry_policy = crate::RetryPolicy::default();
311-
let (grpc_channel, mock_service) = create_mock_grpc_channel_with_service().await;
234+
let (grpc_channel, mock_service) = crate::test::create_mock_grpc_channel_with_service().await;
312235

313236
let mut backups_manager = AsyncBackupsManager::<IngestWithConfigDataStreamRequest>::new(
314237
&backups_dir,

rust/crates/sift_stream/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,3 +423,6 @@ pub mod backup;
423423
pub use backup::DiskBackupPolicy;
424424

425425
pub use sift_connect::grpc::{Credentials, SiftChannel};
426+
427+
#[cfg(test)]
428+
mod test;

rust/crates/sift_stream/src/stream/builder.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use super::{
88
use crate::backup::{
99
DiskBackupsManager, InMemoryBackupsManager,
1010
disk::{AsyncBackupsManager, DiskBackupPolicy},
11+
sanitize_name,
1112
};
1213
use sift_connect::{Credentials, SiftChannel, SiftChannelBuilder};
1314
use sift_error::prelude::*;
@@ -325,6 +326,8 @@ impl SiftStreamBuilder<IngestionConfigMode> {
325326
}
326327
};
327328

329+
let asset_name = asset.name.clone();
330+
328331
// Try updating tags or metadata. Update only occurs if either asset_tags or asset_metadata is Some
329332
Self::update_asset_tags_and_metadata(
330333
asset,
@@ -376,10 +379,14 @@ impl SiftStreamBuilder<IngestionConfigMode> {
376379
retry_policy,
377380
disk_backup_policy,
378381
} => {
382+
let mut dir_name = sanitize_name(&asset_name);
383+
if let Some(run) = run.as_ref() {
384+
dir_name.push_str(&format!("/{}", sanitize_name(&run.name)));
385+
}
379386
policy = Some(retry_policy.clone());
380387
let manager = AsyncBackupsManager::new(
381-
&ingestion_config.asset_id,
382-
&ingestion_config.ingestion_config_id,
388+
&dir_name,
389+
&ingestion_config.client_key,
383390
disk_backup_policy,
384391
retry_policy,
385392
channel.clone(),

rust/crates/sift_stream/src/stream/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ pub mod time;
2525
/// in a manner that isn't backwards compatible.
2626
pub(crate) mod flow;
2727

28+
#[cfg(test)]
29+
mod test;
30+
2831
/// [SiftStream] is a smart wrapper over an actual gRPC stream that makes it robust and more
2932
/// ergonomic to work with. Some additional behaviors that [SiftStream] supports are:
3033
/// - Checkpointing
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
use std::fs;
2+
3+
use crate::TimeValue;
4+
use crate::backup::DiskBackupPolicy;
5+
use crate::{
6+
ChannelValue, Flow, IngestionConfigForm, RecoveryStrategy, RunForm, SiftStreamBuilder,
7+
};
8+
use tempdir::TempDir;
9+
10+
#[tokio::test]
11+
async fn test_sift_stream_builder_backup_manager_directory_naming_with_run() {
12+
let backups_dir = uuid::Uuid::new_v4().to_string();
13+
14+
let tmp_dir = TempDir::new(&backups_dir).expect("failed to creat tempdir");
15+
let tmp_dir_path = tmp_dir.path();
16+
17+
let ingestion_config = IngestionConfigForm {
18+
asset_name: "test_asset".to_string(),
19+
client_key: "test_client_key".to_string(),
20+
flows: vec![],
21+
};
22+
let run = RunForm {
23+
name: "test_run".to_string(),
24+
client_key: "test_client_key".to_string(),
25+
description: None,
26+
tags: None,
27+
metadata: None,
28+
};
29+
30+
let disk_backup_policy = DiskBackupPolicy {
31+
backups_dir: Some(tmp_dir_path.to_path_buf()),
32+
..Default::default()
33+
};
34+
let retry_policy = crate::RetryPolicy::default();
35+
let (grpc_channel, _mock_service) = crate::test::create_mock_grpc_channel_with_service().await;
36+
37+
let mut sift_stream = SiftStreamBuilder::from_channel(grpc_channel)
38+
.ingestion_config(ingestion_config)
39+
.attach_run(run)
40+
.recovery_strategy(RecoveryStrategy::RetryWithBackups {
41+
retry_policy,
42+
disk_backup_policy,
43+
})
44+
.build()
45+
.await
46+
.expect("failed to build sift stream");
47+
48+
for data in 0..100 {
49+
sift_stream
50+
.send(Flow::new(
51+
"some_flow",
52+
TimeValue::now(),
53+
&[ChannelValue::new("some_channel", data)],
54+
))
55+
.await
56+
.expect("failed to send data to backup task");
57+
}
58+
59+
let test_dir = fs::read_dir(tmp_dir_path)
60+
.expect("failed to read backups directory")
61+
.collect::<Vec<_>>();
62+
assert_eq!(test_dir.len(), 1);
63+
64+
// The first subdirectory should be the asset name.
65+
let asset_dir = test_dir[0].as_ref().expect("failed to get file");
66+
assert!(asset_dir.path().is_dir(), "expected file to be a directory");
67+
68+
let asset_dir_path = asset_dir.path();
69+
let asset_dir_file_name = asset_dir_path.file_name().expect("failed to get file name");
70+
assert_eq!(asset_dir_file_name, "test_asset");
71+
72+
// The next subdirectory in the asset directory should be the run name.
73+
let asset_dir_contents = fs::read_dir(asset_dir_path)
74+
.expect("failed to read asset directory")
75+
.collect::<Vec<_>>();
76+
assert_eq!(asset_dir_contents.len(), 1);
77+
78+
let run_dir = asset_dir_contents[0].as_ref().expect("failed to get file");
79+
assert!(run_dir.path().is_dir(), "expected file to be a directory");
80+
81+
let run_dir_path = run_dir.path();
82+
let run_dir_name = run_dir_path.file_name().expect("failed to get file name");
83+
assert_eq!(run_dir_name, "test_run");
84+
}
85+
86+
#[tokio::test]
87+
async fn test_sift_stream_builder_backup_manager_directory_naming_no_run() {
88+
let backups_dir = uuid::Uuid::new_v4().to_string();
89+
90+
let tmp_dir = TempDir::new(&backups_dir).expect("failed to creat tempdir");
91+
let tmp_dir_path = tmp_dir.path();
92+
93+
let ingestion_config = IngestionConfigForm {
94+
asset_name: "test_asset".to_string(),
95+
client_key: "test_client_key".to_string(),
96+
flows: vec![],
97+
};
98+
let disk_backup_policy = DiskBackupPolicy {
99+
backups_dir: Some(tmp_dir_path.to_path_buf()),
100+
..Default::default()
101+
};
102+
let retry_policy = crate::RetryPolicy::default();
103+
let (grpc_channel, _mock_service) = crate::test::create_mock_grpc_channel_with_service().await;
104+
105+
let mut sift_stream = SiftStreamBuilder::from_channel(grpc_channel)
106+
.ingestion_config(ingestion_config)
107+
.recovery_strategy(RecoveryStrategy::RetryWithBackups {
108+
retry_policy,
109+
disk_backup_policy,
110+
})
111+
.build()
112+
.await
113+
.expect("failed to build sift stream");
114+
115+
for data in 0..100 {
116+
sift_stream
117+
.send(Flow::new(
118+
"some_flow",
119+
TimeValue::now(),
120+
&[ChannelValue::new("some_channel", data)],
121+
))
122+
.await
123+
.expect("failed to send data to backup task");
124+
}
125+
126+
let test_dir = fs::read_dir(tmp_dir_path)
127+
.expect("failed to read backups directory")
128+
.collect::<Vec<_>>();
129+
assert_eq!(test_dir.len(), 1);
130+
131+
// The first subdirectory should be the asset name.
132+
let asset_dir = test_dir[0].as_ref().expect("failed to get file");
133+
assert!(asset_dir.path().is_dir(), "expected file to be a directory");
134+
135+
let asset_dir_path = asset_dir.path();
136+
let asset_dir_file_name = asset_dir_path.file_name().expect("failed to get file name");
137+
assert_eq!(asset_dir_file_name, "test_asset");
138+
139+
// Since there was no run provided, there are no subdirectories in the asset directory.
140+
let asset_dir_contents = fs::read_dir(asset_dir_path)
141+
.expect("failed to read asset directory")
142+
.collect::<Vec<_>>();
143+
assert_eq!(asset_dir_contents.len(), 1);
144+
assert!(
145+
asset_dir_contents[0]
146+
.as_ref()
147+
.expect("failed to get file")
148+
.path()
149+
.is_file(),
150+
"expected to be a file",
151+
);
152+
}

0 commit comments

Comments
 (0)