Skip to content

Commit 5acdd3a

Browse files
authored
rust(feat): Add grpc status codes to sift-stream metrics, better logs (#530)
1 parent 7df185a commit 5acdd3a

3 files changed

Lines changed: 218 additions & 19 deletions

File tree

rust/crates/sift_rs/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,4 @@ pub mod retry;
5555
pub use retry::{DefaultGrpcRetry, RetryConfig, RetryDecider, RetryExt, Retrying};
5656
pub use sift_connect::{Credentials, SiftChannel, SiftChannelBuilder};
5757
pub use tonic::codec::CompressionEncoding;
58+
pub use tonic::{Code as GrpcCode, Status as GrpcStatus};

rust/crates/sift_stream/src/metrics/mod.rs

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ pub struct SiftStreamMetricsSnapshot {
122122
pub old_messages_failed_adding_to_backup: u64,
123123
/// Current retry attempt count
124124
pub cur_retry_count: u64,
125+
/// Count of stream completions per gRPC status code, indexed by code value (0=Ok .. 16=Unauthenticated, 17=UnknownGrpcCode)
126+
pub grpc_status_counts: [u64; 18],
125127
/// Depth of the ingestion channel
126128
pub ingestion_channel_depth: u64,
127129
/// Depth of the backup channel
@@ -386,6 +388,7 @@ pub struct SiftStreamMetrics {
386388
pub(crate) old_messages_dropped_for_ingestion: U64Counter,
387389
pub(crate) old_messages_failed_adding_to_backup: U64Counter,
388390
pub(crate) cur_retry_count: U64Signal,
391+
pub(crate) grpc_status_counts: [U64Counter; 18],
389392
pub(crate) ingestion_channel_depth: U64Signal,
390393
pub(crate) backup_channel_depth: U64Signal,
391394
pub(crate) checkpoint: CheckpointMetrics,
@@ -430,6 +433,7 @@ impl SiftStreamMetrics {
430433
old_messages_dropped_for_ingestion,
431434
old_messages_failed_adding_to_backup,
432435
cur_retry_count,
436+
grpc_status_counts: std::array::from_fn(|i| self.grpc_status_counts[i].get()),
433437
ingestion_channel_depth,
434438
backup_channel_depth,
435439
checkpoint: self.checkpoint.snapshot(),
@@ -514,6 +518,114 @@ impl SiftStreamMetricsSnapshot {
514518
data_type: ChannelDataType::Uint64.into(),
515519
..Default::default()
516520
},
521+
ChannelConfig {
522+
name: format!("{channel_prefix}.grpc_status_counts.ok"),
523+
description: "gRPC status code Ok (0) count".into(),
524+
data_type: ChannelDataType::Uint64.into(),
525+
..Default::default()
526+
},
527+
ChannelConfig {
528+
name: format!("{channel_prefix}.grpc_status_counts.cancelled"),
529+
description: "gRPC status code Cancelled (1) count".into(),
530+
data_type: ChannelDataType::Uint64.into(),
531+
..Default::default()
532+
},
533+
ChannelConfig {
534+
name: format!("{channel_prefix}.grpc_status_counts.unknown"),
535+
description: "gRPC status code Unknown (2) count".into(),
536+
data_type: ChannelDataType::Uint64.into(),
537+
..Default::default()
538+
},
539+
ChannelConfig {
540+
name: format!("{channel_prefix}.grpc_status_counts.invalid_argument"),
541+
description: "gRPC status code InvalidArgument (3) count".into(),
542+
data_type: ChannelDataType::Uint64.into(),
543+
..Default::default()
544+
},
545+
ChannelConfig {
546+
name: format!("{channel_prefix}.grpc_status_counts.deadline_exceeded"),
547+
description: "gRPC status code DeadlineExceeded (4) count".into(),
548+
data_type: ChannelDataType::Uint64.into(),
549+
..Default::default()
550+
},
551+
ChannelConfig {
552+
name: format!("{channel_prefix}.grpc_status_counts.not_found"),
553+
description: "gRPC status code NotFound (5) count".into(),
554+
data_type: ChannelDataType::Uint64.into(),
555+
..Default::default()
556+
},
557+
ChannelConfig {
558+
name: format!("{channel_prefix}.grpc_status_counts.already_exists"),
559+
description: "gRPC status code AlreadyExists (6) count".into(),
560+
data_type: ChannelDataType::Uint64.into(),
561+
..Default::default()
562+
},
563+
ChannelConfig {
564+
name: format!("{channel_prefix}.grpc_status_counts.permission_denied"),
565+
description: "gRPC status code PermissionDenied (7) count".into(),
566+
data_type: ChannelDataType::Uint64.into(),
567+
..Default::default()
568+
},
569+
ChannelConfig {
570+
name: format!("{channel_prefix}.grpc_status_counts.resource_exhausted"),
571+
description: "gRPC status code ResourceExhausted (8) count".into(),
572+
data_type: ChannelDataType::Uint64.into(),
573+
..Default::default()
574+
},
575+
ChannelConfig {
576+
name: format!("{channel_prefix}.grpc_status_counts.failed_precondition"),
577+
description: "gRPC status code FailedPrecondition (9) count".into(),
578+
data_type: ChannelDataType::Uint64.into(),
579+
..Default::default()
580+
},
581+
ChannelConfig {
582+
name: format!("{channel_prefix}.grpc_status_counts.aborted"),
583+
description: "gRPC status code Aborted (10) count".into(),
584+
data_type: ChannelDataType::Uint64.into(),
585+
..Default::default()
586+
},
587+
ChannelConfig {
588+
name: format!("{channel_prefix}.grpc_status_counts.out_of_range"),
589+
description: "gRPC status code OutOfRange (11) count".into(),
590+
data_type: ChannelDataType::Uint64.into(),
591+
..Default::default()
592+
},
593+
ChannelConfig {
594+
name: format!("{channel_prefix}.grpc_status_counts.unimplemented"),
595+
description: "gRPC status code Unimplemented (12) count".into(),
596+
data_type: ChannelDataType::Uint64.into(),
597+
..Default::default()
598+
},
599+
ChannelConfig {
600+
name: format!("{channel_prefix}.grpc_status_counts.internal"),
601+
description: "gRPC status code Internal (13) count".into(),
602+
data_type: ChannelDataType::Uint64.into(),
603+
..Default::default()
604+
},
605+
ChannelConfig {
606+
name: format!("{channel_prefix}.grpc_status_counts.unavailable"),
607+
description: "gRPC status code Unavailable (14) count".into(),
608+
data_type: ChannelDataType::Uint64.into(),
609+
..Default::default()
610+
},
611+
ChannelConfig {
612+
name: format!("{channel_prefix}.grpc_status_counts.data_loss"),
613+
description: "gRPC status code DataLoss (15) count".into(),
614+
data_type: ChannelDataType::Uint64.into(),
615+
..Default::default()
616+
},
617+
ChannelConfig {
618+
name: format!("{channel_prefix}.grpc_status_counts.unauthenticated"),
619+
description: "gRPC status code Unauthenticated (16) count".into(),
620+
data_type: ChannelDataType::Uint64.into(),
621+
..Default::default()
622+
},
623+
ChannelConfig {
624+
name: format!("{channel_prefix}.grpc_status_counts.unknown_grpc_code"),
625+
description: "Unknown gRPC status code (>16) count".into(),
626+
data_type: ChannelDataType::Uint64.into(),
627+
..Default::default()
628+
},
517629
ChannelConfig {
518630
name: format!("{channel_prefix}.ingestion_channel_depth"),
519631
description: "Ingestion channel depth".into(),
@@ -678,6 +790,78 @@ impl SiftStreamMetricsSnapshot {
678790
&format!("{channel_prefix}.cur_retry_count"),
679791
self.cur_retry_count,
680792
),
793+
ChannelValue::new(
794+
&format!("{channel_prefix}.grpc_status_counts.ok"),
795+
self.grpc_status_counts[0],
796+
),
797+
ChannelValue::new(
798+
&format!("{channel_prefix}.grpc_status_counts.cancelled"),
799+
self.grpc_status_counts[1],
800+
),
801+
ChannelValue::new(
802+
&format!("{channel_prefix}.grpc_status_counts.unknown"),
803+
self.grpc_status_counts[2],
804+
),
805+
ChannelValue::new(
806+
&format!("{channel_prefix}.grpc_status_counts.invalid_argument"),
807+
self.grpc_status_counts[3],
808+
),
809+
ChannelValue::new(
810+
&format!("{channel_prefix}.grpc_status_counts.deadline_exceeded"),
811+
self.grpc_status_counts[4],
812+
),
813+
ChannelValue::new(
814+
&format!("{channel_prefix}.grpc_status_counts.not_found"),
815+
self.grpc_status_counts[5],
816+
),
817+
ChannelValue::new(
818+
&format!("{channel_prefix}.grpc_status_counts.already_exists"),
819+
self.grpc_status_counts[6],
820+
),
821+
ChannelValue::new(
822+
&format!("{channel_prefix}.grpc_status_counts.permission_denied"),
823+
self.grpc_status_counts[7],
824+
),
825+
ChannelValue::new(
826+
&format!("{channel_prefix}.grpc_status_counts.resource_exhausted"),
827+
self.grpc_status_counts[8],
828+
),
829+
ChannelValue::new(
830+
&format!("{channel_prefix}.grpc_status_counts.failed_precondition"),
831+
self.grpc_status_counts[9],
832+
),
833+
ChannelValue::new(
834+
&format!("{channel_prefix}.grpc_status_counts.aborted"),
835+
self.grpc_status_counts[10],
836+
),
837+
ChannelValue::new(
838+
&format!("{channel_prefix}.grpc_status_counts.out_of_range"),
839+
self.grpc_status_counts[11],
840+
),
841+
ChannelValue::new(
842+
&format!("{channel_prefix}.grpc_status_counts.unimplemented"),
843+
self.grpc_status_counts[12],
844+
),
845+
ChannelValue::new(
846+
&format!("{channel_prefix}.grpc_status_counts.internal"),
847+
self.grpc_status_counts[13],
848+
),
849+
ChannelValue::new(
850+
&format!("{channel_prefix}.grpc_status_counts.unavailable"),
851+
self.grpc_status_counts[14],
852+
),
853+
ChannelValue::new(
854+
&format!("{channel_prefix}.grpc_status_counts.data_loss"),
855+
self.grpc_status_counts[15],
856+
),
857+
ChannelValue::new(
858+
&format!("{channel_prefix}.grpc_status_counts.unauthenticated"),
859+
self.grpc_status_counts[16],
860+
),
861+
ChannelValue::new(
862+
&format!("{channel_prefix}.grpc_status_counts.unknown_grpc_code"),
863+
self.grpc_status_counts[17],
864+
),
681865
ChannelValue::new(
682866
&format!("{channel_prefix}.ingestion_channel_depth"),
683867
self.ingestion_channel_depth,
@@ -779,6 +963,7 @@ impl Default for SiftStreamMetrics {
779963
old_messages_dropped_for_ingestion: U64Counter::default(),
780964
old_messages_failed_adding_to_backup: U64Counter::default(),
781965
cur_retry_count: U64Signal::default(),
966+
grpc_status_counts: Default::default(),
782967
ingestion_channel_depth: U64Signal::default(),
783968
backup_channel_depth: U64Signal::default(),
784969
checkpoint: CheckpointMetrics::default(),

rust/crates/sift_stream/src/stream/tasks/ingestion.rs

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use crate::{
88
};
99
use sift_connect::SiftChannel;
1010
use sift_error::prelude::*;
11-
use sift_rs::{CompressionEncoding, ingest::v1::ingest_service_client::IngestServiceClient};
11+
use sift_rs::{
12+
CompressionEncoding, GrpcCode, GrpcStatus,
13+
ingest::v1::ingest_service_client::IngestServiceClient,
14+
};
1215
use std::{
1316
future::Future,
1417
pin::Pin,
@@ -138,7 +141,6 @@ impl IngestionTask {
138141

139142
// Currently the stream result is not used, so to simplify we return a unit value.
140143
res.map(|_| ())
141-
.map_err(|e| Error::new(ErrorKind::StreamError, e))
142144
}));
143145

144146
#[cfg(feature = "tracing")]
@@ -153,6 +155,7 @@ impl IngestionTask {
153155
res = stream.as_mut().unwrap() => {
154156
match res {
155157
Ok(_) => {
158+
self.config.metrics.grpc_status_counts[0].increment();
156159
self.config.metrics.cur_retry_count.set(0);
157160
current_wait = Duration::ZERO;
158161
}
@@ -192,19 +195,15 @@ impl IngestionTask {
192195
sift_stream_id = %self.config.sift_stream_id,
193196
"checkpoint succeeded - data streamed to Sift successfully"
194197
);
198+
self.config.metrics.grpc_status_counts[0].increment();
195199
self.config.metrics.cur_retry_count.set(0);
196200
}
197201
Ok(Err(e)) => {
198202
current_wait = self.handle_failed_stream(&e, stream_created_at, current_wait, first_message_id.load(Ordering::Relaxed), last_message_id.load(Ordering::Relaxed))?;
199203
}
200-
Err(elapsed) => {
201-
#[cfg(feature = "tracing")]
202-
tracing::error!(
203-
sift_stream_id = %self.config.sift_stream_id,
204-
error = %elapsed,
205-
"timed out waiting for checkpoint completion from Sift"
206-
);
207-
current_wait = self.handle_failed_stream(&Error::new(ErrorKind::StreamError, elapsed), stream_created_at, current_wait, first_message_id.load(Ordering::Relaxed), last_message_id.load(Ordering::Relaxed))?;
204+
Err(_elapsed) => {
205+
let timeout_status = GrpcStatus::deadline_exceeded("checkpoint timed out waiting for Sift");
206+
current_wait = self.handle_failed_stream(&timeout_status, stream_created_at, current_wait, first_message_id.load(Ordering::Relaxed), last_message_id.load(Ordering::Relaxed))?;
208207
}
209208
}
210209

@@ -216,7 +215,7 @@ impl IngestionTask {
216215
match ctrl_msg {
217216
Ok(ControlMessage::BackupFull) => {
218217
#[cfg(feature = "tracing")]
219-
tracing::info!(
218+
tracing::trace!(
220219
sift_stream_id = %self.config.sift_stream_id,
221220
"backup full"
222221
);
@@ -243,19 +242,33 @@ impl IngestionTask {
243242
/// Handle a failed stream operation, sending the re-ingest signal and logging the error and incrementing metrics.
244243
fn handle_failed_stream(
245244
&mut self,
246-
e: &Error,
245+
status: &GrpcStatus,
247246
stream_created_at: Instant,
248247
current_wait: Duration,
249248
first_message_id: u64,
250249
last_message_id: u64,
251250
) -> Result<Duration> {
251+
let code = i32::from(status.code());
252+
let code_idx = if (0..=16).contains(&code) {
253+
code as usize
254+
} else {
255+
17
256+
};
257+
self.config.metrics.grpc_status_counts[code_idx].increment();
258+
252259
#[cfg(feature = "tracing")]
253-
tracing::error!(
254-
sift_stream_id = %self.config.sift_stream_id,
255-
retry_counter = self.config.metrics.cur_retry_count.get(),
256-
error = %e,
257-
"stream failed - failed to ingest data to Sift - if backups are enabled, backup files will be re-ingested"
258-
);
260+
{
261+
let msg = match status.code() {
262+
GrpcCode::Cancelled => "stream connection went idle",
263+
_ => "stream connection is being reset",
264+
};
265+
tracing::warn!(
266+
sift_stream_id = %self.config.sift_stream_id,
267+
retry_counter = self.config.metrics.cur_retry_count.get(),
268+
grpc_status = ?status.code(),
269+
msg
270+
);
271+
}
259272

260273
self.config
261274
.metrics
@@ -285,7 +298,7 @@ impl IngestionTask {
285298
}
286299

287300
/// Shuts down the ingestion task by awaiting the stream one last time and sending the final checkpoint complete signal to the backup manager.
288-
async fn shutdown<T: Future<Output = Result<()>> + Send + 'static>(
301+
async fn shutdown<T: Future<Output = std::result::Result<(), GrpcStatus>> + Send + 'static>(
289302
&mut self,
290303
mut stream: Option<Pin<Box<T>>>,
291304
first_message_id: Arc<AtomicU64>,

0 commit comments

Comments
 (0)