Skip to content

Commit e7089bb

Browse files
committed
Add metrics to monitor cluster throughput
1 parent 6593712 commit e7089bb

4 files changed

Lines changed: 191 additions & 11 deletions

File tree

rust/crates/scheduler/src/cluster.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{
3333
cluster_key::{Tag, TagType},
3434
config::CONFIG,
3535
dao::{helpers::parse_uuid, ClusterDao},
36-
metrics::observe_cluster_round_trip,
36+
metrics,
3737
};
3838

3939
pub static CLUSTER_ROUNDS: AtomicUsize = AtomicUsize::new(0);
@@ -73,6 +73,20 @@ impl Cluster {
7373
tags: tags.into_iter().collect(),
7474
}
7575
}
76+
77+
/// Bounded metric label for the cluster's tag class. Clusters are built from
78+
/// a single `TagType` (alloc clusters are one-tag; manual/hostname/hardware
79+
/// clusters chunk tags of one type), so the first tag determines the class.
80+
/// Returns a `&'static str` to keep Prometheus label cardinality at four.
81+
pub fn cluster_type(&self) -> &'static str {
82+
match self.tags.iter().next().map(|t| &t.ttype) {
83+
Some(TagType::Alloc) => "alloc",
84+
Some(TagType::Manual) => "manual",
85+
Some(TagType::HostName) => "hostname",
86+
Some(TagType::Hardware) => "hardware",
87+
None => "unknown",
88+
}
89+
}
7690
}
7791

7892
/// Inputs retained by a DB-backed [`ClusterFeed`] so it can periodically reload
@@ -629,11 +643,16 @@ impl ClusterFeed {
629643
return ControlFlow::Break(());
630644
}
631645
let now = Instant::now();
646+
// Capture the type before `item` is moved into the map.
647+
let cluster_type = item.cluster_type();
632648
let mut last_sent_lock = last_sent_map_producer
633649
.lock()
634650
.unwrap_or_else(|p| p.into_inner());
635651
if let Some(prev) = last_sent_lock.insert(item, now) {
636-
observe_cluster_round_trip(now.duration_since(prev));
652+
metrics::observe_cluster_round_trip(
653+
cluster_type,
654+
now.duration_since(prev),
655+
);
637656
}
638657
} else if !completed_round {
639658
// Skipped a sleeping cluster mid-round; yield so we don't starve the runtime.
@@ -650,6 +669,25 @@ impl ClusterFeed {
650669
sleep_map.lock().unwrap_or_else(|p| p.into_inner());
651670
sleep_map_lock.len()
652671
};
672+
673+
// Sample fan-out gauges once per lap (cheap relative to a
674+
// full round-robin pass). CLUSTERS_TOTAL by type is the
675+
// primary fan-out signal; CLUSTERS_SLEEPING shows how much
676+
// of the set is backed off at any instant.
677+
{
678+
let mut by_type: HashMap<&'static str, i64> = HashMap::new();
679+
{
680+
let clusters =
681+
feed.read().unwrap_or_else(|p| p.into_inner());
682+
for c in clusters.iter() {
683+
*by_type.entry(c.cluster_type()).or_default() += 1;
684+
}
685+
}
686+
for (cluster_type, count) in by_type {
687+
metrics::set_clusters_total(cluster_type, count);
688+
}
689+
metrics::set_clusters_sleeping(sleeping_count as i64);
690+
}
653691
if sleeping_count >= cluster_size {
654692
// Ensure this doesn't loop forever when there's a limit configured
655693
all_sleeping_rounds += 1;

rust/crates/scheduler/src/metrics/mod.rs

Lines changed: 125 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
use axum::{response::IntoResponse, routing::get, Router};
1414
use lazy_static::lazy_static;
1515
use prometheus::{
16-
register_counter, register_counter_vec, register_histogram, Counter, CounterVec, Encoder,
17-
Histogram, TextEncoder,
16+
register_counter, register_counter_vec, register_gauge, register_gauge_vec,
17+
register_histogram, register_histogram_vec, Counter, CounterVec, Encoder, Gauge, GaugeVec,
18+
Histogram, HistogramVec, TextEncoder,
1819
};
1920
use std::sync::atomic::{AtomicU64, Ordering};
2021
use std::time::Duration;
@@ -87,13 +88,89 @@ lazy_static! {
8788
.expect("Failed to register job_query_duration_seconds histogram");
8889

8990
// Cluster feed metrics from cluster.rs
90-
pub static ref CLUSTER_ROUND_TRIP_SECONDS: Histogram = register_histogram!(
91+
//
92+
// Labeled by `cluster_type` (alloc / manual / hostname / hardware) so the
93+
// round-trip tail can be attributed to a tag class. A large fan-out of
94+
// chunked manual-tag clusters lengthens the round-robin lap and shows up
95+
// here as a worse tail on the `manual` series than on `alloc`. The label is
96+
// bounded to the four `TagType` variants — never per-tag, which would blow
97+
// up cardinality on farms with thousands of tags.
98+
pub static ref CLUSTER_ROUND_TRIP_SECONDS: HistogramVec = register_histogram_vec!(
9199
"scheduler_cluster_round_trip_seconds",
92-
"Time between successive emissions of the same active (non-sleeping) cluster",
100+
"Time between successive emissions of the same active (non-sleeping) cluster, by cluster type",
101+
&["cluster_type"],
93102
vec![0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0]
94103
)
95104
.expect("Failed to register cluster_round_trip_seconds histogram");
96105

106+
// Size of the live cluster set, by tag class, sampled once per round-robin
107+
// lap by the feed producer. This is the fan-out magnitude: with
108+
// `manual_tags_chunk_size = N` and T manual tags, the `manual` series is
109+
// ~T/N. A large value here is the prime suspect for slow per-cluster
110+
// revisits (each cluster only gets a turn once per full lap).
111+
pub static ref CLUSTERS_TOTAL: GaugeVec = register_gauge_vec!(
112+
"scheduler_clusters_total",
113+
"Number of clusters in the live feed set, by cluster type",
114+
&["cluster_type"]
115+
)
116+
.expect("Failed to register clusters_total gauge");
117+
118+
// Clusters currently sleeping (skipped this lap), sampled once per lap.
119+
// Sleeping clusters are invisible to dispatch until their backoff expires
120+
// (`cluster_empty_sleep` / `cluster_saturated_sleep`). A high value relative
121+
// to CLUSTERS_TOTAL means most of the set is backed off at any instant.
122+
pub static ref CLUSTERS_SLEEPING: Gauge = register_gauge!(
123+
"scheduler_clusters_sleeping",
124+
"Number of clusters currently sleeping (skipped each lap)"
125+
)
126+
.expect("Failed to register clusters_sleeping gauge");
127+
128+
// Frames booked in a single cluster pass (one emission off the feed),
129+
// labeled by `cluster_type`. If this pins at the per-pass ceiling
130+
// (`max_jobs_per_cluster_pass` x `dispatch_frames_per_layer_limit`) while a
131+
// backlog persists, throughput-per-turn x infrequent-turns explains a
132+
// growing queue: the farm has capacity but each cluster only drains a
133+
// bounded slice per visit.
134+
pub static ref FRAMES_DISPATCHED_PER_PASS: HistogramVec = register_histogram_vec!(
135+
"scheduler_frames_dispatched_per_pass",
136+
"Frames dispatched in a single cluster pass, by cluster type",
137+
&["cluster_type"],
138+
vec![0.0, 1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 250.0, 500.0]
139+
)
140+
.expect("Failed to register frames_dispatched_per_pass histogram");
141+
142+
// Why each cluster pass ended, labeled by `reason`:
143+
// booked - placed at least one frame
144+
// saturated - jobs were pending but nothing fit (farm full or gated)
145+
// no_jobs - the job query returned nothing eligible
146+
// query_error - the job query failed (pass backed off and will retry)
147+
// `saturated` dominating while hosts sit idle points at matching/tagging,
148+
// not capacity. `no_jobs` dominating means the cluster set is mostly empty
149+
// churn and the round-robin lap is paying for clusters with no work.
150+
pub static ref PASS_TERMINATED_REASON_TOTAL: CounterVec = register_counter_vec!(
151+
"scheduler_pass_terminated_reason_total",
152+
"Cluster passes by terminal reason",
153+
&["reason"]
154+
)
155+
.expect("Failed to register pass_terminated_reason_total counter");
156+
157+
// Outcome of each host-checkout attempt in the matcher loop, labeled by
158+
// `outcome`:
159+
// booked - a host was checked out and the frame dispatched
160+
// no_match - check_out returned NoCandidateAvailable
161+
// dispatch_error - host checked out but the dispatch transaction failed
162+
// The reported "hosts available but not booked" symptom shows up as a high
163+
// `no_match` rate; cross-reference with CLUSTERS_TOTAL / round-trip to tell
164+
// a genuine no-fit from a starved cluster that simply isn't getting visited.
165+
// A finer no_match breakdown (reserved / cas_lost / gate_rejected) lives
166+
// inside host_cache and is deferred to a later tier.
167+
pub static ref CHECKOUT_OUTCOME_TOTAL: CounterVec = register_counter_vec!(
168+
"scheduler_checkout_outcome_total",
169+
"Host checkout attempts by outcome",
170+
&["outcome"]
171+
)
172+
.expect("Failed to register checkout_outcome_total counter");
173+
97174
// E-PVM placement metrics from host_cache/cache.rs. Observed only on the
98175
// Epvm path (Saturation always scores 0.0). Buckets are dimensionless W3
99176
// fractional-layer-frames units; calibration may need adjustment after
@@ -255,10 +332,51 @@ pub fn observe_job_query_duration(duration: Duration) {
255332
JOB_QUERY_DURATION_SECONDS.observe(duration.as_secs_f64());
256333
}
257334

258-
/// Helper function to observe cluster round-trip duration
335+
/// Helper function to observe cluster round-trip duration, labeled by cluster type.
336+
#[inline]
337+
pub fn observe_cluster_round_trip(cluster_type: &str, duration: Duration) {
338+
CLUSTER_ROUND_TRIP_SECONDS
339+
.with_label_values(&[cluster_type])
340+
.observe(duration.as_secs_f64());
341+
}
342+
343+
/// Sets the live cluster-set size for a given cluster type. Sampled once per
344+
/// round-robin lap by the feed producer.
345+
#[inline]
346+
pub fn set_clusters_total(cluster_type: &str, count: i64) {
347+
CLUSTERS_TOTAL
348+
.with_label_values(&[cluster_type])
349+
.set(count as f64);
350+
}
351+
352+
/// Sets the number of clusters currently sleeping. Sampled once per lap.
353+
#[inline]
354+
pub fn set_clusters_sleeping(count: i64) {
355+
CLUSTERS_SLEEPING.set(count as f64);
356+
}
357+
358+
/// Records the number of frames booked in a single cluster pass.
359+
#[inline]
360+
pub fn observe_frames_dispatched_per_pass(cluster_type: &str, frames: usize) {
361+
FRAMES_DISPATCHED_PER_PASS
362+
.with_label_values(&[cluster_type])
363+
.observe(frames as f64);
364+
}
365+
366+
/// Records the terminal reason for a cluster pass
367+
/// (`booked` / `saturated` / `no_jobs` / `query_error`).
368+
#[inline]
369+
pub fn increment_pass_terminated_reason(reason: &str) {
370+
PASS_TERMINATED_REASON_TOTAL
371+
.with_label_values(&[reason])
372+
.inc();
373+
}
374+
375+
/// Records the outcome of a single host-checkout attempt
376+
/// (`booked` / `no_match` / `dispatch_error`).
259377
#[inline]
260-
pub fn observe_cluster_round_trip(duration: Duration) {
261-
CLUSTER_ROUND_TRIP_SECONDS.observe(duration.as_secs_f64());
378+
pub fn increment_checkout_outcome(outcome: &str) {
379+
CHECKOUT_OUTCOME_TOTAL.with_label_values(&[outcome]).inc();
262380
}
263381

264382
/// Records the E-PVM score of the host returned by `check_out_best`.

rust/crates/scheduler/src/pipeline/entrypoint.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> {
7070
let feed_sender = feed_sender.clone();
7171

7272
async move {
73+
// Bounded metric label captured before `cluster` is moved into a
74+
// FeedMessage::Sleep below.
75+
let cluster_type = cluster.cluster_type();
7376
let jobs = job_fetcher
7477
.query_pending_jobs_by_show_facility_and_tags(
7578
cluster.show_id,
@@ -98,16 +101,33 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> {
98101
},
99102
)
100103
.await;
104+
let processed = processed_jobs.load(Ordering::Relaxed);
105+
let dispatched = dispatched_frames.load(Ordering::Relaxed);
106+
107+
// Per-pass yield and terminal reason. Together these show
108+
// whether each cluster turn drains a bounded slice while a
109+
// backlog persists (cap-limited) versus genuinely finding
110+
// no work or no fit.
111+
metrics::observe_frames_dispatched_per_pass(cluster_type, dispatched);
112+
let reason = if processed == 0 {
113+
"no_jobs"
114+
} else if dispatched == 0 {
115+
"saturated"
116+
} else {
117+
"booked"
118+
};
119+
metrics::increment_pass_terminated_reason(reason);
120+
101121
// If no jobs got processed, sleep to prevent hammering the database with
102122
// queries with no outcome
103-
if processed_jobs.load(Ordering::Relaxed) == 0 {
123+
if processed == 0 {
104124
let _ = feed_sender
105125
.send(FeedMessage::Sleep(
106126
cluster,
107127
CONFIG.queue.cluster_empty_sleep,
108128
))
109129
.await;
110-
} else if dispatched_frames.load(Ordering::Relaxed) == 0 {
130+
} else if dispatched == 0 {
111131
// Jobs are pending but the whole pass placed nothing —
112132
// typically a saturated farm (no host fits anywhere).
113133
// Without a back-off this cluster would re-query its
@@ -144,6 +164,7 @@ pub async fn run(cluster_feed: ClusterFeed) -> miette::Result<()> {
144164
// would shut the scheduler down on the first hiccup; back
145165
// this cluster off instead and let the next pass retry.
146166
error!("Failed to fetch jobs for cluster {}: {}", cluster, err);
167+
metrics::increment_pass_terminated_reason("query_error");
147168
let _ = feed_sender
148169
.send(FeedMessage::Sleep(
149170
cluster,

rust/crates/scheduler/src/pipeline/matcher.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,7 @@ impl MatchingService {
443443
updated_host,
444444
updated_layer,
445445
}) => {
446+
metrics::increment_checkout_outcome("booked");
446447
// Track cores actually consumed so the next iteration's
447448
// LayerProfile sees the local picture of usage. The same
448449
// delta applies to the (show, alloc) subscription burst
@@ -479,6 +480,7 @@ impl MatchingService {
479480
}
480481
}
481482
Err(err) => {
483+
metrics::increment_checkout_outcome("dispatch_error");
482484
// On error, we lost the layer since it was moved to DispatchLayerMessage
483485
// This means we can't continue with this layer
484486
Self::log_dispatch_error_with_info(
@@ -504,6 +506,7 @@ impl MatchingService {
504506

505507
match err {
506508
crate::host_cache::HostCacheError::NoCandidateAvailable => {
509+
metrics::increment_checkout_outcome("no_match");
507510
debug!(
508511
"No host candidate available for layer {}. {:?}",
509512
current_layer_version.as_ref().unwrap(),

0 commit comments

Comments
 (0)