Skip to content

Commit 7f831d0

Browse files
committed
Use ObservableGauge for execution_queued_actions_count_observable.
1 parent bb0ec28 commit 7f831d0

2 files changed

Lines changed: 138 additions & 53 deletions

File tree

nativelink-scheduler/src/simple_scheduler_state_manager.rs

Lines changed: 95 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@ use super::awaited_action_db::{
1616
AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, CountableActionStage,
1717
SortedAwaitedActionState,
1818
};
19-
use crate::worker::ActionsState;
2019
use async_lock::Mutex;
2120
use async_trait::async_trait;
2221
use core::ops::Bound;
2322
use core::time::Duration;
24-
use futures::{StreamExt, TryFutureExt, TryStreamExt, stream};
25-
use nativelink_error::{Code, Error, ResultExt, make_err};
23+
use futures::{stream, StreamExt, TryStreamExt};
24+
use nativelink_error::{make_err, Code, Error, ResultExt};
2625
use nativelink_metric::MetricsComponent;
2726
use nativelink_util::action_messages::{
2827
ActionInfo, ActionResult, ActionStage, ActionState, ActionUniqueQualifier, ExecutionMetadata,
@@ -31,7 +30,8 @@ use nativelink_util::action_messages::{
3130
use nativelink_util::instant_wrapper::InstantWrapper;
3231
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
3332
use nativelink_util::metrics::{
34-
EXECUTION_INSTANCE, EXECUTION_METRICS, ExecutionMetricAttrs, ExecutionResult,
33+
register_queued_actions_callback, ExecutionMetricAttrs, ExecutionResult, EXECUTION_INSTANCE,
34+
EXECUTION_METRICS,
3535
};
3636
use nativelink_util::operation_state_manager::{
3737
ActionStateResult, ActionStateResultStream, ClientStateManager, MatchingEngineStateManager,
@@ -40,11 +40,10 @@ use nativelink_util::operation_state_manager::{
4040
use nativelink_util::origin_event::OriginMetadata;
4141
use opentelemetry::KeyValue;
4242
use std::collections::{BTreeMap, HashMap};
43-
use std::iter::Map;
4443
use std::string::ToString;
4544
use std::sync::{Arc, Weak};
4645
use std::{env, vec};
47-
use tracing::{info, warn};
46+
use tracing::{info, warn, Instrument};
4847

4948
/// Maximum number of times an update to the database
5049
/// can fail before giving up.
@@ -67,38 +66,6 @@ impl SchedulerMetrics {
6766
}
6867
}
6968

70-
pub fn record_queued_actions(&self, action_infos: Vec<Arc<AwaitedAction>>) {
71-
let count_by_properties = action_infos
72-
.iter()
73-
.map(|awaitedAction| {
74-
awaitedAction.action_info().platform_properties
75-
.clone()
76-
.into_iter()
77-
.collect::<BTreeMap<_, _>>()
78-
})
79-
.fold(HashMap::new(), |mut acc, platform_properties| {
80-
*acc.entry(platform_properties).or_insert(0) += 1;
81-
acc
82-
});
83-
84-
info!("Found {} queued actions with {} unique properties", action_infos.len(), count_by_properties.len());
85-
86-
for (platform_properties, count) in count_by_properties {
87-
let mut attrs = platform_properties
88-
.iter()
89-
.map(|(key, value)| KeyValue::new(key.clone(), value.clone()))
90-
.collect::<Vec<KeyValue>>();
91-
92-
attrs.push(KeyValue::new(
93-
EXECUTION_INSTANCE,
94-
self.instance_name.clone(),
95-
));
96-
EXECUTION_METRICS
97-
.execution_queued_actions_count
98-
.record(count, &attrs);
99-
}
100-
}
101-
10269
pub fn record_stage_transition(&self, from_stage: Option<ActionStage>, to_stage: ActionStage) {
10370
if let Some(from) = from_stage {
10471
let from_attrs = self.attrs_for_stage(from);
@@ -458,6 +425,86 @@ where
458425
/// Provides pre-computed attributes and methods for recording metrics
459426
/// related to action execution lifecycle.
460427
scheduler_metrics: SchedulerMetrics,
428+
429+
queued_actions_tracker: Arc<QueuedActionsTracker<T, I, NowFn>>,
430+
}
431+
432+
#[derive(Debug)]
433+
struct QueuedActionsTracker<T, I, NowFn>
434+
where
435+
T: AwaitedActionDb,
436+
I: InstantWrapper,
437+
NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
438+
{
439+
simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>,
440+
queued_actions: Arc<tokio::sync::Mutex<Vec<(u64, Vec<KeyValue>)>>>,
441+
}
442+
443+
impl<T, I, NowFn> QueuedActionsTracker<T, I, NowFn>
444+
where
445+
T: AwaitedActionDb,
446+
I: InstantWrapper,
447+
NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static,
448+
{
449+
fn new(simple_scheduler_state_manager: Weak<SimpleSchedulerStateManager<T, I, NowFn>>) -> Self {
450+
let queued_actions = Arc::new(tokio::sync::Mutex::new(Vec::new()));
451+
452+
Self {
453+
simple_scheduler_state_manager,
454+
queued_actions,
455+
}
456+
}
457+
458+
fn dump_queued_actions(&self, observer: impl Fn(u64, &[KeyValue])) {
459+
if let Ok(queued_actions) = self.queued_actions.try_lock() {
460+
for (count, attrs) in queued_actions.iter() {
461+
observer(*count, attrs);
462+
}
463+
}
464+
}
465+
466+
async fn count_queued_actions(&self) {
467+
if let Some(manager) = self.simple_scheduler_state_manager.upgrade() {
468+
let action_infos = manager
469+
.action_db
470+
.get_queued_actions()
471+
.await
472+
.err_tip(|| "In SimpleSchedulerStateManager::record_actions_count")
473+
.unwrap_or_default();
474+
475+
let count_by_properties = action_infos
476+
.iter()
477+
.map(|awaitedAction| {
478+
awaitedAction
479+
.action_info()
480+
.platform_properties
481+
.clone()
482+
.into_iter()
483+
.collect::<BTreeMap<_, _>>()
484+
})
485+
.fold(HashMap::new(), |mut acc, platform_properties| {
486+
*acc.entry(platform_properties).or_insert(0) += 1;
487+
acc
488+
});
489+
490+
let mut queued_actions = self.queued_actions.lock().await;
491+
queued_actions.clear();
492+
493+
for (platform_properties, count) in count_by_properties {
494+
let mut attrs = platform_properties
495+
.iter()
496+
.map(|(key, value)| KeyValue::new(key.clone(), value.clone()))
497+
.collect::<Vec<KeyValue>>();
498+
499+
attrs.push(KeyValue::new(
500+
EXECUTION_INSTANCE,
501+
manager.scheduler_metrics.instance_name.clone(),
502+
));
503+
504+
queued_actions.push((count, attrs));
505+
}
506+
}
507+
}
461508
}
462509

463510
impl<T, I, NowFn> SimpleSchedulerStateManager<T, I, NowFn>
@@ -474,7 +521,7 @@ where
474521
now_fn: NowFn,
475522
instance_name: impl Into<String>,
476523
) -> Arc<Self> {
477-
Arc::new_cyclic(|weak_self| Self {
524+
let res = Arc::new_cyclic(|weak_self| Self {
478525
action_db,
479526
max_job_retries,
480527
no_event_action_timeout,
@@ -483,7 +530,14 @@ where
483530
weak_self: weak_self.clone(),
484531
now_fn,
485532
scheduler_metrics: SchedulerMetrics::new(instance_name),
486-
})
533+
queued_actions_tracker: Arc::new(QueuedActionsTracker::new(weak_self.clone())),
534+
});
535+
let queued_actions_tracker_clone = res.queued_actions_tracker.clone();
536+
register_queued_actions_callback(Box::new(move |observe| {
537+
queued_actions_tracker_clone.dump_queued_actions(observe);
538+
}));
539+
540+
res
487541
}
488542

489543
/// Returns a reference to the scheduler metrics for recording OTEL metrics.
@@ -1182,16 +1236,7 @@ where
11821236
}
11831237

11841238
if env::var("NATIVELINK_COUNT_QUEUED_ACTIONS").unwrap_or_default() == "1" {
1185-
let action_infos = self
1186-
.action_db
1187-
.get_queued_actions()
1188-
.await
1189-
.err_tip(|| "In SimpleSchedulerStateManager::record_actions_count")
1190-
.unwrap_or_default();
1191-
1192-
info!("Found {} queued actions", action_infos.len());
1193-
1194-
self.scheduler_metrics.record_queued_actions(action_infos);
1239+
self.queued_actions_tracker.count_queued_actions().await;
11951240
}
11961241
}
11971242
}

nativelink-util/src/metrics.rs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,44 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::LazyLock;
15+
use std::sync::{LazyLock, OnceLock};
1616

1717
use opentelemetry::{InstrumentationScope, KeyValue, Value, global, metrics};
1818
use crate::action_messages::ActionStage;
1919

20+
/// Callback type for observable gauges that report queued action counts.
21+
/// The callback receives an `Observer` that should be used to record values with attributes.
22+
pub type QueuedActionsCallback = Box<dyn Fn(&dyn Fn(u64, &[KeyValue])) + Send + Sync>;
23+
24+
/// Storage for the external callback for queued actions count.
25+
static QUEUED_ACTIONS_CALLBACK: OnceLock<QueuedActionsCallback> = OnceLock::new();
26+
27+
/// Registers an external callback for the `execution_queued_actions_count` observable gauge.
28+
///
29+
/// This function can only be called once. Subsequent calls will panic.
30+
///
31+
/// The callback will be invoked during metrics collection and should report
32+
/// the current count of queued actions by calling the provided observer function
33+
/// with the count and any relevant attributes (e.g., platform properties).
34+
///
35+
/// # Panics
36+
///
37+
/// Panics if the callback has already been registered.
38+
///
39+
/// # Example
40+
/// ```ignore
41+
/// register_queued_actions_callback(Box::new(|observe| {
42+
/// // Report counts for different platform configurations
43+
/// observe(10, &[KeyValue::new("platform", "linux")]);
44+
/// observe(5, &[KeyValue::new("platform", "windows")]);
45+
/// }));
46+
/// ```
47+
pub fn register_queued_actions_callback(callback: QueuedActionsCallback) {
48+
if QUEUED_ACTIONS_CALLBACK.set(callback).is_err() {
49+
panic!("Queued actions callback can only be registered once");
50+
}
51+
}
52+
2053
// Metric attribute keys for remote execution operations.
2154
pub const EXECUTION_STAGE: &str = "execution_stage";
2255
pub const EXECUTION_RESULT: &str = "execution_result";
@@ -311,9 +344,16 @@ pub static EXECUTION_METRICS: LazyLock<ExecutionMetrics> = LazyLock::new(|| {
311344
.build(),
312345

313346
execution_queued_actions_count: meter
314-
.u64_gauge("execution_queued_actions_count")
347+
.u64_observable_gauge("execution_queued_actions_count_observable")
315348
.with_description("Current number of queued actions by platform properties")
316349
.with_unit("{action}")
350+
.with_callback(|observer| {
351+
if let Some(callback) = QUEUED_ACTIONS_CALLBACK.get() {
352+
callback(&|value, attrs| {
353+
observer.observe(value, attrs);
354+
});
355+
}
356+
})
317357
.build(),
318358
}
319359
});
@@ -338,7 +378,7 @@ pub struct ExecutionMetrics {
338378
/// Gauge of actions by stage
339379
pub execution_actions_count: metrics::Gauge<u64>,
340380
// Gauge of queued actions by platform properties
341-
pub execution_queued_actions_count: metrics::Gauge<u64>,
381+
pub execution_queued_actions_count: metrics::ObservableGauge<u64>,
342382
}
343383

344384
/// Helper function to create attributes for execution metrics

0 commit comments

Comments
 (0)