Skip to content

Commit 79ed6f9

Browse files
authored
rust(bug): Improve how sift-stream handles the flow config cache (#401)
1 parent 833f092 commit 79ed6f9

5 files changed

Lines changed: 329 additions & 62 deletions

File tree

rust/crates/sift_rs/src/wrappers/ingestion_configs.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,9 @@ pub trait IngestionConfigServiceWrapper:
4242

4343
/// Create [FlowConfig]s for a given ingestion config. If this function does not return an
4444
/// error, then it is safe to assume that all [FlowConfig]s in `configs` was created.
45-
async fn try_create_flows(
46-
&mut self,
47-
ingestion_config_id: &str,
48-
configs: &[FlowConfig],
49-
) -> Result<()>;
45+
async fn try_create_flows<I>(&mut self, ingestion_config_id: &str, configs: I) -> Result<()>
46+
where
47+
I: Into<Vec<FlowConfig>> + Send;
5048

5149
/// Retrieve all flows that satisfy the provided filter.
5250
async fn try_filter_flows(
@@ -121,15 +119,14 @@ impl IngestionConfigServiceWrapper for IngestionConfigServiceImpl {
121119

122120
/// Create [FlowConfig]s for a given ingestion config. If this function does not return an
123121
/// error, then it is safe to assume that all [FlowConfig]s in `configs` was created.
124-
async fn try_create_flows(
125-
&mut self,
126-
ingestion_config_id: &str,
127-
configs: &[FlowConfig],
128-
) -> Result<()> {
122+
async fn try_create_flows<I>(&mut self, ingestion_config_id: &str, configs: I) -> Result<()>
123+
where
124+
I: Into<Vec<FlowConfig>> + Send,
125+
{
129126
let _ = self
130127
.create_ingestion_config_flows(CreateIngestionConfigFlowsRequest {
131128
ingestion_config_id: ingestion_config_id.to_string(),
132-
flows: configs.to_vec(),
129+
flows: configs.into(),
133130
})
134131
.await
135132
.map_err(|e| Error::new(ErrorKind::CreateFlowError, e))?;

rust/crates/sift_stream/src/stream/builder.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,16 @@ impl SiftStreamBuilder<IngestionConfigMode> {
541541
.try_filter_flows(&ingestion_config.ingestion_config_id, &filter)
542542
.await?;
543543

544+
// If no flows are provided, use the existing flows in Sift to populate the local flow cache.
545+
if flows.is_empty() {
546+
#[cfg(feature = "tracing")]
547+
tracing::info!(
548+
ingestion_config_id = ingestion_config.ingestion_config_id,
549+
"no flows provided, using existing flows in Sift to populate the local flow cache"
550+
);
551+
return Ok((ingestion_config, existing_flows, asset));
552+
}
553+
544554
let mut flows_to_create: Vec<FlowConfig> = Vec::new();
545555

546556
for flow in &flows {
@@ -565,7 +575,10 @@ impl SiftStreamBuilder<IngestionConfigMode> {
565575

566576
if !flows_to_create.is_empty() {
567577
let _ = ingestion_config_service
568-
.try_create_flows(&ingestion_config.ingestion_config_id, &flows_to_create)
578+
.try_create_flows(
579+
&ingestion_config.ingestion_config_id,
580+
flows_to_create.as_slice(),
581+
)
569582
.await;
570583

571584
#[cfg(feature = "tracing")]

rust/crates/sift_stream/src/stream/mode/ingestion_config.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,17 +298,42 @@ impl SiftStream<IngestionConfigMode> {
298298
/// Modify the existing ingestion config by adding new flows that weren't accounted for during
299299
/// initialization.
300300
pub async fn add_new_flows(&mut self, flow_configs: &[FlowConfig]) -> Result<()> {
301+
// Filter out flows that already exist.
302+
let filtered = flow_configs
303+
.iter()
304+
.filter(|f| !self.mode.flows_by_name.contains_key(&f.name))
305+
.collect::<Vec<_>>();
306+
307+
// If no new flows are provided, return early.
308+
if filtered.is_empty() {
309+
return Ok(());
310+
}
311+
312+
#[cfg(feature = "tracing")]
313+
tracing::info!(
314+
ingestion_config_id = self.mode.ingestion_config.ingestion_config_id,
315+
new_flows = filtered
316+
.iter()
317+
.map(|f| f.name.as_str())
318+
.collect::<Vec<&str>>()
319+
.join(","),
320+
"adding new flows to ingestion config"
321+
);
322+
301323
new_ingestion_config_service(self.grpc_channel.clone())
302324
.try_create_flows(
303325
&self.mode.ingestion_config.ingestion_config_id,
304-
flow_configs,
326+
filtered
327+
.iter()
328+
.map(|f| (*f).clone())
329+
.collect::<Vec<FlowConfig>>(),
305330
)
306331
.await
307332
.context("SiftStream::add_new_flows")?;
308333

309-
self.metrics.loaded_flows.add(flow_configs.len() as u64);
334+
self.metrics.loaded_flows.add(filtered.len() as u64);
310335

311-
for flow_config in flow_configs {
336+
for flow_config in filtered {
312337
self.mode
313338
.flows_by_name
314339
.entry(flow_config.name.clone())

rust/crates/sift_stream/src/stream/test.rs

Lines changed: 170 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ use crate::backup::DiskBackupPolicy;
66
use crate::{
77
ChannelValue, Flow, IngestionConfigForm, RecoveryStrategy, RunForm, SiftStreamBuilder,
88
};
9+
use sift_rs::common::r#type::v1::ChannelDataType;
10+
use sift_rs::ingestion_configs::v2::{ChannelConfig, FlowConfig};
911
use tempdir::TempDir;
1012
use tracing_test::traced_test;
1113

@@ -44,6 +46,7 @@ async fn test_sift_stream_builder_backup_manager_directory_naming_with_run() {
4446
retry_policy,
4547
disk_backup_policy,
4648
})
49+
.metrics_streaming_interval(None)
4750
.build()
4851
.await
4952
.expect("failed to build sift stream");
@@ -72,7 +75,7 @@ async fn test_sift_stream_builder_backup_manager_directory_naming_with_run() {
7275
let test_dir = fs::read_dir(tmp_dir_path)
7376
.expect("failed to read backups directory")
7477
.collect::<Vec<_>>();
75-
assert_eq!(test_dir.len(), 1);
78+
assert_eq!(test_dir.len(), 1, "{:?}", test_dir);
7679

7780
// The first subdirectory should be the asset name.
7881
let asset_dir = test_dir[0].as_ref().expect("failed to get file");
@@ -122,6 +125,7 @@ async fn test_sift_stream_builder_backup_manager_directory_naming_no_run() {
122125
retry_policy,
123126
disk_backup_policy,
124127
})
128+
.metrics_streaming_interval(None)
125129
.build()
126130
.await
127131
.expect("failed to build sift stream");
@@ -230,3 +234,168 @@ async fn test_sift_stream_drop_without_finish() {
230234
.await
231235
.expect("timeout waiting for tasks to shutdown");
232236
}
237+
238+
#[tokio::test]
239+
async fn test_sift_stream_builder_load_ingestion_config_with_no_flows() {
240+
let backups_dir = uuid::Uuid::new_v4().to_string();
241+
242+
let tmp_dir = TempDir::new(&backups_dir).expect("failed to creat tempdir");
243+
let tmp_dir_path = tmp_dir.path();
244+
245+
let ingestion_config = IngestionConfigForm {
246+
asset_name: "already_exists_asset".to_string(),
247+
client_key: "already_exists_client_key".to_string(),
248+
flows: vec![],
249+
};
250+
let disk_backup_policy = DiskBackupPolicy {
251+
backups_dir: Some(tmp_dir_path.to_path_buf()),
252+
retain_backups: true,
253+
..Default::default()
254+
};
255+
let retry_policy = crate::RetryPolicy::default();
256+
let (grpc_channel, _mock_service) = crate::test::create_mock_grpc_channel_with_service().await;
257+
258+
let mut sift_stream = SiftStreamBuilder::from_channel(grpc_channel)
259+
.ingestion_config(ingestion_config)
260+
.recovery_strategy(RecoveryStrategy::RetryWithBackups {
261+
retry_policy,
262+
disk_backup_policy,
263+
})
264+
.build()
265+
.await
266+
.expect("failed to build sift stream");
267+
268+
// The mock sift server should have returned 1 flow.
269+
let flows = sift_stream.get_flows();
270+
assert_eq!(flows.len(), 1);
271+
272+
let existing_flow = FlowConfig {
273+
name: "already_exists_flow".to_string(),
274+
channels: vec![ChannelConfig {
275+
name: "channel1".to_string(),
276+
data_type: ChannelDataType::Double.into(),
277+
..Default::default()
278+
}],
279+
};
280+
281+
// Add the existing flow again to ensure it is not added again.
282+
assert!(sift_stream.add_new_flows(&[existing_flow]).await.is_ok());
283+
let flows = sift_stream.get_flows();
284+
assert_eq!(flows.len(), 1);
285+
286+
sift_stream
287+
.finish()
288+
.await
289+
.expect("failed to finish sift stream");
290+
}
291+
292+
#[tokio::test]
293+
async fn test_sift_stream_builder_load_ingestion_config_with_flows() {
294+
let backups_dir = uuid::Uuid::new_v4().to_string();
295+
296+
let tmp_dir = TempDir::new(&backups_dir).expect("failed to creat tempdir");
297+
let tmp_dir_path = tmp_dir.path();
298+
299+
let existing_flow = FlowConfig {
300+
name: "already_exists_flow".to_string(),
301+
channels: vec![ChannelConfig {
302+
name: "channel1".to_string(),
303+
data_type: ChannelDataType::Double.into(),
304+
..Default::default()
305+
}],
306+
};
307+
308+
let ingestion_config = IngestionConfigForm {
309+
asset_name: "test_asset".to_string(),
310+
client_key: "test_client_key".to_string(),
311+
flows: vec![existing_flow.clone()],
312+
};
313+
let disk_backup_policy = DiskBackupPolicy {
314+
backups_dir: Some(tmp_dir_path.to_path_buf()),
315+
retain_backups: true,
316+
..Default::default()
317+
};
318+
let retry_policy = crate::RetryPolicy::default();
319+
let (grpc_channel, _mock_service) = crate::test::create_mock_grpc_channel_with_service().await;
320+
321+
let mut sift_stream = SiftStreamBuilder::from_channel(grpc_channel)
322+
.ingestion_config(ingestion_config)
323+
.recovery_strategy(RecoveryStrategy::RetryWithBackups {
324+
retry_policy,
325+
disk_backup_policy,
326+
})
327+
.build()
328+
.await
329+
.expect("failed to build sift stream");
330+
331+
// The mock sift server should have returned 1 flow.
332+
let flows = sift_stream.get_flows();
333+
assert_eq!(flows.len(), 1);
334+
335+
// Add the existing flow again to ensure it is not added again.
336+
assert!(sift_stream.add_new_flows(&[existing_flow]).await.is_ok());
337+
let flows = sift_stream.get_flows();
338+
assert_eq!(flows.len(), 1);
339+
}
340+
341+
#[tokio::test]
342+
async fn test_sift_stream_builder_load_ingestion_config_with_new_flows() {
343+
let backups_dir = uuid::Uuid::new_v4().to_string();
344+
345+
let tmp_dir = TempDir::new(&backups_dir).expect("failed to creat tempdir");
346+
let tmp_dir_path = tmp_dir.path();
347+
348+
let new_flow = FlowConfig {
349+
name: "new_flow".to_string(),
350+
channels: vec![ChannelConfig {
351+
name: "channel-new".to_string(),
352+
data_type: ChannelDataType::Uint32.into(),
353+
..Default::default()
354+
}],
355+
};
356+
357+
let ingestion_config = IngestionConfigForm {
358+
asset_name: "test_asset".to_string(),
359+
client_key: "test_client_key".to_string(),
360+
flows: vec![new_flow.clone()],
361+
};
362+
let disk_backup_policy = DiskBackupPolicy {
363+
backups_dir: Some(tmp_dir_path.to_path_buf()),
364+
retain_backups: true,
365+
..Default::default()
366+
};
367+
let retry_policy = crate::RetryPolicy::default();
368+
let (grpc_channel, _mock_service) = crate::test::create_mock_grpc_channel_with_service().await;
369+
370+
let mut sift_stream = SiftStreamBuilder::from_channel(grpc_channel)
371+
.ingestion_config(ingestion_config)
372+
.recovery_strategy(RecoveryStrategy::RetryWithBackups {
373+
retry_policy,
374+
disk_backup_policy,
375+
})
376+
.build()
377+
.await
378+
.expect("failed to build sift stream");
379+
380+
// The mock sift server should have returned 1 flow.
381+
let flows = sift_stream.get_flows();
382+
assert_eq!(flows.len(), 1);
383+
384+
// Add the existing flow again to ensure it is not added again.
385+
assert!(sift_stream.add_new_flows(&[new_flow]).await.is_ok());
386+
let flows = sift_stream.get_flows();
387+
assert_eq!(flows.len(), 1);
388+
389+
// Add another new flow to ensure it is added.
390+
let new_flow2 = FlowConfig {
391+
name: "new_flow2".to_string(),
392+
channels: vec![ChannelConfig {
393+
name: "channel-new2".to_string(),
394+
data_type: ChannelDataType::Uint32.into(),
395+
..Default::default()
396+
}],
397+
};
398+
assert!(sift_stream.add_new_flows(&[new_flow2]).await.is_ok());
399+
let flows = sift_stream.get_flows();
400+
assert_eq!(flows.len(), 2);
401+
}

0 commit comments

Comments
 (0)