Skip to content

Commit 257cc35

Browse files
authored
rust(chore): add tracing for unique flows seen (#301)
1 parent d06c44f commit 257cc35

1 file changed

Lines changed: 15 additions & 1 deletion

File tree

rust/crates/sift_stream/src/stream/mode/ingestion_config.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use sift_rs::{
1919
wrappers::ingestion_configs::{IngestionConfigServiceWrapper, new_ingestion_config_service},
2020
};
2121
use std::{
22-
collections::HashMap,
22+
collections::{HashMap, HashSet},
2323
ops::Drop,
2424
pin::Pin,
2525
sync::{
@@ -56,6 +56,7 @@ pub struct IngestionConfigMode {
5656
pub(crate) run: Option<Run>,
5757
ingestion_config: IngestionConfig,
5858
flows_by_name: HashMap<String, Vec<FlowConfig>>,
59+
flows_seen: HashSet<String>,
5960
checkpoint_interval: Duration,
6061
streaming_task: Option<DataStreamTask>,
6162
retry_policy: Option<RetryPolicy>,
@@ -184,6 +185,7 @@ impl SiftStream<IngestionConfigMode> {
184185
mode: IngestionConfigMode {
185186
ingestion_config,
186187
flows_by_name,
188+
flows_seen: HashSet::new(),
187189
sift_stream_id,
188190
run,
189191
streaming_task: Some(streaming_task),
@@ -267,6 +269,18 @@ impl SiftStream<IngestionConfigMode> {
267269
/// Concerned with sending the actual ingest request to [DataStream] which will then write it
268270
/// to the gRPC stream. If backups are enabled, the request will be backed up as well.
269271
async fn send_impl(&mut self, req: IngestWithConfigDataStreamRequest) -> Result<()> {
272+
#[cfg(feature = "tracing")]
273+
{
274+
if !self.mode.flows_seen.contains(&req.flow) {
275+
self.mode.flows_seen.insert(req.flow.clone());
276+
tracing::info!(
277+
sift_stream_id = self.mode.sift_stream_id.to_string(),
278+
"flow '{}' being ingested for the first time",
279+
&req.flow,
280+
);
281+
}
282+
}
283+
270284
if self
271285
.backup_data(&req)
272286
.await

0 commit comments

Comments
 (0)