Skip to content

Commit daeb2f0

Browse files
Merge remote-tracking branch 'origin/main' into chore/merge-to-release
# Conflicts: # CHANGELOG.md # Cargo.toml # VERSION
2 parents 3004e5c + 3d29d33 commit daeb2f0

3 files changed

Lines changed: 90 additions & 12 deletions

File tree

.claude/.ai-version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ab5d1f2b20a273bd54199cee85161d16fb8f76b7
1+
75e0e4174de4cf59756546c07ef277439cba9812

hyperi-ai

src/transport/grpc/mod.rs

Lines changed: 88 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ pub struct GrpcTransport {
6969

7070
/// Receive timeout (milliseconds).
7171
recv_timeout_ms: u64,
72+
73+
/// In-flight send count (for metrics).
74+
#[cfg(feature = "metrics")]
75+
inflight: AtomicU64,
7276
}
7377

7478
impl GrpcTransport {
@@ -173,6 +177,8 @@ impl GrpcTransport {
173177
_server_handle: server_handle,
174178
closed: AtomicBool::new(false),
175179
recv_timeout_ms: config.recv_timeout_ms,
180+
#[cfg(feature = "metrics")]
181+
inflight: AtomicU64::new(0),
176182
})
177183
}
178184
}
@@ -202,16 +208,54 @@ impl Transport for GrpcTransport {
202208
metadata,
203209
};
204210

211+
#[cfg(feature = "metrics")]
212+
let start = std::time::Instant::now();
213+
214+
#[cfg(feature = "metrics")]
215+
self.inflight.fetch_add(1, Ordering::Relaxed);
216+
205217
// tonic clients are cheaply cloneable (shared channel)
206-
match client.clone().push(request).await {
207-
Ok(_) => SendResult::Ok,
218+
let result = match client.clone().push(request).await {
219+
Ok(_) => {
220+
#[cfg(feature = "metrics")]
221+
metrics::counter!("dfe_transport_sent_total", "transport" => "grpc").increment(1);
222+
SendResult::Ok
223+
}
208224
Err(status) => match status.code() {
209225
tonic::Code::Unavailable | tonic::Code::ResourceExhausted => {
226+
#[cfg(feature = "metrics")]
227+
metrics::counter!(
228+
"dfe_transport_backpressured_total",
229+
"transport" => "grpc"
230+
)
231+
.increment(1);
210232
SendResult::Backpressured
211233
}
212-
_ => SendResult::Fatal(TransportError::Send(status.message().to_string())),
234+
_ => {
235+
#[cfg(feature = "metrics")]
236+
metrics::counter!(
237+
"dfe_transport_send_errors_total",
238+
"transport" => "grpc"
239+
)
240+
.increment(1);
241+
SendResult::Fatal(TransportError::Send(status.message().to_string()))
242+
}
213243
},
244+
};
245+
246+
#[cfg(feature = "metrics")]
247+
{
248+
self.inflight.fetch_sub(1, Ordering::Relaxed);
249+
metrics::gauge!("dfe_transport_inflight", "transport" => "grpc")
250+
.set(self.inflight.load(Ordering::Relaxed) as f64);
251+
metrics::histogram!(
252+
"dfe_transport_send_duration_seconds",
253+
"transport" => "grpc"
254+
)
255+
.record(start.elapsed().as_secs_f64());
214256
}
257+
258+
result
215259
}
216260

217261
async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
@@ -282,7 +326,14 @@ impl Transport for GrpcTransport {
282326
}
283327

284328
fn is_healthy(&self) -> bool {
285-
!self.closed.load(Ordering::Relaxed)
329+
let healthy = !self.closed.load(Ordering::Relaxed);
330+
#[cfg(feature = "metrics")]
331+
metrics::gauge!("dfe_transport_healthy", "transport" => "grpc").set(if healthy {
332+
1.0
333+
} else {
334+
0.0
335+
});
336+
healthy
286337
}
287338

288339
fn name(&self) -> &'static str {
@@ -329,12 +380,39 @@ impl proto::dfe_transport_server::DfeTransport for DfeTransportServiceImpl {
329380
format,
330381
};
331382

332-
self.sender
333-
.send(msg)
334-
.await
335-
.map_err(|_| Status::unavailable("receiver buffer full"))?;
336-
337-
Ok(Response::new(proto::PushResponse { accepted: 1 }))
383+
match self.sender.try_send(msg) {
384+
Ok(()) => {
385+
#[cfg(feature = "metrics")]
386+
{
387+
metrics::counter!("dfe_transport_sent_total", "transport" => "grpc")
388+
.increment(1);
389+
metrics::gauge!("dfe_transport_queue_size", "transport" => "grpc").set(
390+
self.sender
391+
.max_capacity()
392+
.saturating_sub(self.sender.capacity()) as f64,
393+
);
394+
}
395+
Ok(Response::new(proto::PushResponse { accepted: 1 }))
396+
}
397+
Err(mpsc::error::TrySendError::Full(_)) => {
398+
#[cfg(feature = "metrics")]
399+
metrics::counter!(
400+
"dfe_transport_backpressured_total",
401+
"transport" => "grpc"
402+
)
403+
.increment(1);
404+
Err(Status::resource_exhausted("receiver buffer full"))
405+
}
406+
Err(mpsc::error::TrySendError::Closed(_)) => {
407+
#[cfg(feature = "metrics")]
408+
metrics::counter!(
409+
"dfe_transport_refused_total",
410+
"transport" => "grpc"
411+
)
412+
.increment(1);
413+
Err(Status::unavailable("receiver closed"))
414+
}
415+
}
338416
}
339417

340418
async fn health_check(

0 commit comments

Comments
 (0)