Skip to content

Commit 9d7e03e

Browse files
authored
rust(feat/bug): Improve add flow config, prevent send_impl error (#409)
1 parent d57788b commit 9d7e03e

6 files changed

Lines changed: 180 additions & 20 deletions

File tree

rust/crates/sift_error/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ impl Error {
9898
/// Various categories of errors that can occur throughout Sift crates.
9999
#[derive(Debug, PartialEq, Copy, Clone)]
100100
pub enum ErrorKind {
101+
/// Indicates that the error is due to a resource already existing.
102+
AlreadyExistsError,
101103
/// Indicates user-error having to do with bad arguments.
102104
ArgumentValidationError,
103105
/// Indicates that the program is unable to grab credentials from a user's `sift.toml` file.
@@ -194,6 +196,7 @@ where
194196
impl fmt::Display for ErrorKind {
195197
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196198
match self {
199+
Self::AlreadyExistsError => write!(f, "AlreadyExistsError"),
197200
Self::GrpcConnectError => write!(f, "GrpcConnectError"),
198201
Self::RetriesExhausted => write!(f, "RetriesExhausted"),
199202
Self::RetrieveAssetError => write!(f, "RetrieveAssetError"),

rust/crates/sift_rs/src/wrappers/ingestion_configs.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,13 @@ impl IngestionConfigServiceWrapper for IngestionConfigServiceImpl {
129129
flows: configs.into(),
130130
})
131131
.await
132-
.map_err(|e| Error::new(ErrorKind::CreateFlowError, e))?;
132+
.map_err(|e| {
133+
if e.code() == tonic::Code::AlreadyExists {
134+
Error::new(ErrorKind::AlreadyExistsError, e)
135+
} else {
136+
Error::new(ErrorKind::CreateFlowError, e)
137+
}
138+
})?;
133139
Ok(())
134140
}
135141

rust/crates/sift_stream/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ features = ["metrics-unstable"]
1616
rustdoc-args = ["--cfg", "docsrs"]
1717

1818
[dependencies]
19+
futures = { version = "0.3", default-features = false, features = ["alloc"] }
1920
sift_connect = { workspace = true }
2021
sift_error = { workspace = true }
2122
sift_rs = { workspace = true }

rust/crates/sift_stream/src/metrics/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ pub struct SiftStreamMetricsSnapshot {
118118
pub messages_sent_to_backup: u64,
119119
/// Total messages dropped for ingestion
120120
pub old_messages_dropped_for_ingestion: u64,
121+
/// Total messages dropped for ingestion that failed to be added to backup
122+
pub old_messages_failed_adding_to_backup: u64,
121123
/// Current retry attempt count
122124
pub cur_retry_count: u64,
123125
/// Depth of the ingestion channel
@@ -382,6 +384,7 @@ pub struct SiftStreamMetrics {
382384
pub(crate) bytes_sent: U64Counter,
383385
pub(crate) messages_sent_to_backup: U64Counter,
384386
pub(crate) old_messages_dropped_for_ingestion: U64Counter,
387+
pub(crate) old_messages_failed_adding_to_backup: U64Counter,
385388
pub(crate) cur_retry_count: U64Signal,
386389
pub(crate) ingestion_channel_depth: U64Signal,
387390
pub(crate) backup_channel_depth: U64Signal,
@@ -407,6 +410,7 @@ impl SiftStreamMetrics {
407410
let bytes_sent = self.bytes_sent.get();
408411
let messages_sent_to_backup = self.messages_sent_to_backup.get();
409412
let old_messages_dropped_for_ingestion = self.old_messages_dropped_for_ingestion.get();
413+
let old_messages_failed_adding_to_backup = self.old_messages_failed_adding_to_backup.get();
410414
let cur_retry_count = self.cur_retry_count.get();
411415
let ingestion_channel_depth = self.ingestion_channel_depth.get();
412416
let backup_channel_depth = self.backup_channel_depth.get();
@@ -424,6 +428,7 @@ impl SiftStreamMetrics {
424428
byte_rate: stats.byte_rate,
425429
messages_sent_to_backup,
426430
old_messages_dropped_for_ingestion,
431+
old_messages_failed_adding_to_backup,
427432
cur_retry_count,
428433
ingestion_channel_depth,
429434
backup_channel_depth,
@@ -497,6 +502,12 @@ impl SiftStreamMetricsSnapshot {
497502
data_type: ChannelDataType::Uint64.into(),
498503
..Default::default()
499504
},
505+
ChannelConfig {
506+
name: format!("{channel_prefix}.old_messages_failed_adding_to_backup"),
507+
description: "Old messages failed to add to backup".into(),
508+
data_type: ChannelDataType::Uint64.into(),
509+
..Default::default()
510+
},
500511
ChannelConfig {
501512
name: format!("{channel_prefix}.cur_retry_count"),
502513
description: "Current retry count".into(),
@@ -659,6 +670,10 @@ impl SiftStreamMetricsSnapshot {
659670
&format!("{channel_prefix}.old_messages_dropped_for_ingestion"),
660671
self.old_messages_dropped_for_ingestion,
661672
),
673+
ChannelValue::new(
674+
&format!("{channel_prefix}.old_messages_failed_adding_to_backup"),
675+
self.old_messages_failed_adding_to_backup,
676+
),
662677
ChannelValue::new(
663678
&format!("{channel_prefix}.cur_retry_count"),
664679
self.cur_retry_count,
@@ -762,6 +777,7 @@ impl Default for SiftStreamMetrics {
762777
bytes_sent: U64Counter::default(),
763778
messages_sent_to_backup: U64Counter::default(),
764779
old_messages_dropped_for_ingestion: U64Counter::default(),
780+
old_messages_failed_adding_to_backup: U64Counter::default(),
765781
cur_retry_count: U64Signal::default(),
766782
ingestion_channel_depth: U64Signal::default(),
767783
backup_channel_depth: U64Signal::default(),

rust/crates/sift_stream/src/stream/mode/ingestion_config.rs

Lines changed: 76 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -307,12 +307,30 @@ impl SiftStream<IngestionConfigMode> {
307307

308308
// Re-send the oldest message to the backup to ensure it is re-ingested later despite being
309309
// dropped from the ingestion channel.
310-
self.mode
310+
//
311+
// On failure, rely on metrics to track occurences. Logging can quickly become spammy as
312+
// the system works through bursts of messages so logs are reduced to the debug level.
313+
if let Err(e) = self
314+
.mode
311315
.stream_system
312316
.backup_tx
313317
.try_send(oldest_message)
314318
.map_err(|e| Error::new(ErrorKind::StreamError, e))
315319
.context("failed to send data to backup task system")
320+
{
321+
self.metrics
322+
.old_messages_failed_adding_to_backup
323+
.increment();
324+
325+
#[cfg(feature = "tracing")]
326+
tracing::debug!(
327+
sift_stream_id = self.mode.sift_stream_id.to_string(),
328+
"failed to send oldest data to backup task system: {e}"
329+
);
330+
}
331+
332+
// Do not interupt ingestion.
333+
Ok(())
316334
}
317335
Err(e) => Err(Error::new_msg(
318336
ErrorKind::StreamError,
@@ -346,34 +364,74 @@ impl SiftStream<IngestionConfigMode> {
346364
"adding new flows to ingestion config"
347365
);
348366

349-
new_ingestion_config_service(self.grpc_channel.clone())
350-
.try_create_flows(
351-
&self.mode.ingestion_config.ingestion_config_id,
352-
filtered
353-
.iter()
354-
.map(|f| (*f).clone())
355-
.collect::<Vec<FlowConfig>>(),
356-
)
357-
.await
358-
.context("SiftStream::add_new_flows")?;
359-
360-
self.metrics.loaded_flows.add(filtered.len() as u64);
361-
362-
for flow_config in filtered {
363-
let flow_name = flow_config.name.clone();
367+
let mut calls = Vec::with_capacity(filtered.len());
368+
let create_flows = filtered.into_iter().cloned().collect::<Vec<FlowConfig>>();
369+
for flow_config in create_flows.iter() {
370+
let channel = self.grpc_channel.clone();
371+
let ingestion_config_id = self.mode.ingestion_config.ingestion_config_id.clone();
372+
let flow_config = flow_config.clone();
373+
374+
calls.push(tokio::spawn(async move {
375+
new_ingestion_config_service(channel)
376+
.try_create_flows(&ingestion_config_id, vec![flow_config])
377+
.await
378+
.context("SiftStream::add_new_flows")
379+
}));
380+
}
381+
382+
// Wait for all the gRPC calls to complete.
383+
let results = futures::future::join_all(calls).await;
384+
385+
let mut add_config = |config: &FlowConfig| -> Result<()> {
386+
let flow_name = config.name.clone();
364387
let flow_descriptor = FlowDescriptor::try_from((
365388
self.mode.ingestion_config.ingestion_config_id.clone(),
366-
flow_config,
389+
config,
367390
))?;
368391
self.mode.flows_by_name.insert(flow_name, flow_descriptor);
369392

370393
#[cfg(feature = "tracing")]
371394
tracing::info!(
372395
sift_stream_id = self.mode.sift_stream_id.to_string(),
373-
flow = flow_config.name,
396+
flow = config.name,
374397
"successfully registered new flow"
375398
);
399+
400+
Ok(())
401+
};
402+
403+
// Iterate over the results and update the flow cache for the successfully created flows.
404+
for (config, result) in create_flows.iter().zip(results.into_iter()) {
405+
match result {
406+
Ok(Ok(())) => {
407+
add_config(config)?;
408+
}
409+
Ok(Err(e)) if e.kind() == ErrorKind::AlreadyExistsError => {
410+
add_config(config)?;
411+
}
412+
Ok(Err(e)) => {
413+
#[cfg(feature = "tracing")]
414+
tracing::error!(
415+
sift_stream_id = self.mode.sift_stream_id.to_string(),
416+
"failed to create flow {}: {e}",
417+
config.name,
418+
);
419+
}
420+
Err(e) => {
421+
#[cfg(feature = "tracing")]
422+
tracing::error!(
423+
sift_stream_id = self.mode.sift_stream_id.to_string(),
424+
"failed to create flow {}: {e}",
425+
config.name,
426+
);
427+
}
428+
}
376429
}
430+
431+
self.metrics
432+
.loaded_flows
433+
.add(self.mode.flows_by_name.len() as u64);
434+
377435
Ok(())
378436
}
379437

rust/crates/sift_stream/src/stream/test.rs

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use std::time::Duration;
44
use crate::TimeValue;
55
use crate::backup::DiskBackupPolicy;
66
use crate::{
7-
ChannelValue, Flow, IngestionConfigForm, RecoveryStrategy, RunForm, SiftStreamBuilder,
7+
ChannelValue, Flow, FlowBuilder, IngestionConfigForm, RecoveryStrategy, RunForm,
8+
SiftStreamBuilder,
89
};
910
use sift_rs::common::r#type::v1::ChannelDataType;
1011
use sift_rs::ingestion_configs::v2::{ChannelConfig, FlowConfig};
@@ -399,3 +400,78 @@ async fn test_sift_stream_builder_load_ingestion_config_with_new_flows() {
399400
let flows = sift_stream.get_flows();
400401
assert_eq!(flows.len(), 2);
401402
}
403+
404+
#[tokio::test(flavor = "current_thread")]
405+
async fn test_sift_stream_ingestion_and_backup_channels_fill_up() {
406+
let backups_dir = uuid::Uuid::new_v4().to_string();
407+
408+
let tmp_dir = TempDir::new(&backups_dir).expect("failed to creat tempdir");
409+
let tmp_dir_path = tmp_dir.path();
410+
411+
let existing_flow = FlowConfig {
412+
name: "already_exists_flow".to_string(),
413+
channels: vec![ChannelConfig {
414+
name: "channel1".to_string(),
415+
data_type: ChannelDataType::Double.into(),
416+
..Default::default()
417+
}],
418+
};
419+
420+
let ingestion_config = IngestionConfigForm {
421+
asset_name: "test_asset".to_string(),
422+
client_key: "test_client_key".to_string(),
423+
flows: vec![existing_flow],
424+
};
425+
let disk_backup_policy = DiskBackupPolicy {
426+
backups_dir: Some(tmp_dir_path.to_path_buf()),
427+
retain_backups: true,
428+
..Default::default()
429+
};
430+
let retry_policy = crate::RetryPolicy::default();
431+
let (grpc_channel, _mock_service) = crate::test::create_mock_grpc_channel_with_service().await;
432+
433+
let mut sift_stream = SiftStreamBuilder::from_channel(grpc_channel)
434+
.ingestion_config(ingestion_config)
435+
.recovery_strategy(RecoveryStrategy::RetryWithBackups {
436+
retry_policy,
437+
disk_backup_policy,
438+
})
439+
.metrics_streaming_interval(None)
440+
.ingestion_data_channel_capacity(1)
441+
.backup_data_channel_capacity(1)
442+
.build()
443+
.await
444+
.expect("failed to build sift stream");
445+
446+
let descriptor = sift_stream
447+
.get_flow_descriptor("already_exists_flow")
448+
.expect("failed to get flow descriptor");
449+
450+
// Send a burst of messages that will cause the ingestion and backup channels to fill up.
451+
//
452+
// Since this test is running in single-threded mode, and `send_requests_nonblocking` is not async,
453+
// sending all the messages should occur before the background tasks have a chance to run
454+
// and create space.
455+
for data in 0..100 {
456+
let mut builder = FlowBuilder::new(&descriptor);
457+
assert!(builder.set_with_key("channel1", data as f64).is_ok());
458+
459+
assert!(
460+
sift_stream
461+
.send_requests(vec![builder.request(TimeValue::now())])
462+
.await
463+
.is_ok(),
464+
"failed to send request"
465+
);
466+
}
467+
468+
// Finish the stream to ensure that the backup manager is shutdown and the backup files are processed.
469+
tokio::time::timeout(Duration::from_secs(10), async {
470+
assert!(
471+
sift_stream.finish().await.is_ok(),
472+
"failed to finish sift stream"
473+
);
474+
})
475+
.await
476+
.expect("timeout waiting for sift stream to finish");
477+
}

0 commit comments

Comments
 (0)