Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ pub struct SimpleScheduler {
/// The strategy used to assign workers jobs.
#[serde(default)]
pub allocation_strategy: WorkerAllocationStrategy,

/// Remove action from queue after this much time has elapsed without a listener
/// amount of time in seconds.
/// Default: 60 (seconds)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
pub disconnect_timeout_s: u64,
}

/// A scheduler that simply forwards requests to an upstream scheduler. This
Expand Down
3 changes: 3 additions & 0 deletions nativelink-scheduler/src/action_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub trait ActionScheduler: Sync + Send + Unpin {
/// Cleans up the cache of recently completed actions.
async fn clean_recently_completed_actions(&self);

/// Inform the scheduler a client has disconnected
fn notify_client_disconnected(&self, unique_qualifier: &ActionInfoHashKey);

/// Register the metrics for the action scheduler.
fn register_metrics(self: Arc<Self>, _registry: &mut Registry) {}
}
2 changes: 2 additions & 0 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ impl CacheLookupScheduler {

#[async_trait]
impl ActionScheduler for CacheLookupScheduler {
fn notify_client_disconnected(&self, _unique_qualifier: &ActionInfoHashKey) {}

async fn get_platform_property_manager(
&self,
instance_name: &str,
Expand Down
2 changes: 2 additions & 0 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ impl GrpcScheduler {

#[async_trait]
impl ActionScheduler for GrpcScheduler {
fn notify_client_disconnected(&self, _unique_qualifier: &ActionInfoHashKey) {}

async fn get_platform_property_manager(
&self,
instance_name: &str,
Expand Down
2 changes: 2 additions & 0 deletions nativelink-scheduler/src/property_modifier_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ impl PropertyModifierScheduler {

#[async_trait]
impl ActionScheduler for PropertyModifierScheduler {
fn notify_client_disconnected(&self, _unique_qualifier: &ActionInfoHashKey) {}

async fn get_platform_property_manager(
&self,
instance_name: &str,
Expand Down
176 changes: 165 additions & 11 deletions nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use std::collections::BTreeMap;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use std::time::Instant;

use async_trait::async_trait;
use futures::Future;
use futures::future::{BoxFuture, Future};
use hashbrown::{HashMap, HashSet};
use lru::LruCache;
use nativelink_config::schedulers::WorkerAllocationStrategy;
Expand All @@ -38,7 +38,7 @@ use parking_lot::{Mutex, MutexGuard};
use tokio::sync::{watch, Notify};
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tracing::{error, warn};
use tracing::{error, info, warn};

use crate::action_scheduler::ActionScheduler;
use crate::platform_property_manager::PlatformPropertyManager;
Expand All @@ -57,6 +57,10 @@ const DEFAULT_RETAIN_COMPLETED_FOR_S: u64 = 60;
/// If this changes, remember to change the documentation in the config.
const DEFAULT_MAX_JOB_RETRIES: usize = 3;

/// Default timeout for actions without any listeners.
/// If this changes, remember to change the documentation in the config.
const DEFAULT_DISCONNECT_TIMEOUT_S: u64 = 60;

/// An action that is being awaited on and last known state.
struct AwaitedAction {
action_info: Arc<ActionInfo>,
Expand Down Expand Up @@ -173,7 +177,7 @@ impl Workers {
}

struct CompletedAction {
completed_time: SystemTime,
completed_instant: Instant,
state: Arc<ActionState>,
}

Expand All @@ -198,6 +202,41 @@ impl Borrow<ActionInfoHashKey> for CompletedAction {
}
}

type NowFn = fn() -> Instant;
type SleepFn = fn(Duration) -> BoxFuture<'static, ()>;

/// Functions that may be injected for testing purposes, during standard control
/// flows these are specified by the new function.
pub struct Callbacks {
/// A function that gets the current time.
now_fn: NowFn,

/// A function that sleeps for a given Duration.
sleep_fn: SleepFn,
}

impl Callbacks {
pub fn new(now_fn: NowFn, sleep_fn: SleepFn) -> Self {
Self { now_fn, sleep_fn }
}

fn now(&self) -> Instant {
(self.now_fn)()
}

fn sleep(&self, duration: Duration) -> impl Future<Output = ()> {
(self.sleep_fn)(duration)
}
}

impl Default for Callbacks {
fn default() -> Self {
Callbacks {
now_fn: Instant::now,
sleep_fn: |duration| Box::pin(tokio::time::sleep(duration)),
}
}
}
struct SimpleSchedulerImpl {
// BTreeMap uses `cmp` to do it's comparisons, this is a problem because we want to sort our
// actions based on priority and insert timestamp but also want to find and join new actions
Expand Down Expand Up @@ -227,6 +266,9 @@ struct SimpleSchedulerImpl {
/// Notify task<->worker matching engine that work needs to be done.
tasks_or_workers_change_notify: Arc<Notify>,
metrics: Arc<Metrics>,
/// How long the server will wait for a client to reconnect before removing the action from the queue.
disconnect_timeout_s: u64,
callbacks: Arc<Callbacks>,
}

impl SimpleSchedulerImpl {
Expand Down Expand Up @@ -313,11 +355,13 @@ impl SimpleSchedulerImpl {
}

fn clean_recently_completed_actions(&mut self) {
let expiry_time = SystemTime::now()
let expiry_time = self
.callbacks
.now()
.checked_sub(self.retain_completed_for)
.unwrap();
self.recently_completed_actions
.retain(|action| action.completed_time > expiry_time);
.retain(|action| action.completed_instant > expiry_time);
}

fn find_recently_completed_action(
Expand Down Expand Up @@ -422,10 +466,21 @@ impl SimpleSchedulerImpl {
Ok(())
}

fn get_queued_action(&self, unique_qualifier: &ActionInfoHashKey) -> Option<&AwaitedAction> {
self.queued_actions_set
.get(unique_qualifier)
.and_then(|action_info| self.queued_actions.get(action_info))
}

fn get_active_action(&self, unique_qualifier: &ActionInfoHashKey) -> Option<&AwaitedAction> {
self.active_actions.get(unique_qualifier)
}

// TODO(blaise.bruer) This is an O(n*m) (aka n^2) algorithm. In theory we can create a map
// of capabilities of each worker and then try and match the actions to the worker using
// the map lookup (ie. map reduce).
fn do_try_match(&mut self) {
println!("do_try_match did run");
// TODO(blaise.bruer) This is a bit difficult because of how rust's borrow checker gets in
// the way. We need to conditionally remove items from the `queued_action`. Rust is working
// to add `drain_filter`, which would in theory solve this problem, but because we need
Expand Down Expand Up @@ -619,7 +674,7 @@ impl SimpleSchedulerImpl {

// Keep in case this is asked for soon.
self.recently_completed_actions.insert(CompletedAction {
completed_time: SystemTime::now(),
completed_instant: self.callbacks.now(),
state: running_action.current_state,
});

Expand Down Expand Up @@ -647,7 +702,7 @@ impl SimpleScheduler {
#[inline]
#[must_use]
pub fn new(scheduler_cfg: &nativelink_config::schedulers::SimpleScheduler) -> Self {
Self::new_with_callback(scheduler_cfg, || {
Self::new_with_callback(scheduler_cfg, Callbacks::default(), || {
// The cost of running `do_try_match()` is very high, but constant
// in relation to the number of changes that have happened. This means
// that grabbing this lock to process `do_try_match()` should always
Expand All @@ -664,6 +719,7 @@ impl SimpleScheduler {
F: Fn() -> Fut + Send + Sync + 'static,
>(
scheduler_cfg: &nativelink_config::schedulers::SimpleScheduler,
callbacks: Callbacks,
on_matching_engine_run: F,
) -> Self {
let platform_property_manager = Arc::new(PlatformPropertyManager::new(
Expand All @@ -688,6 +744,11 @@ impl SimpleScheduler {
max_job_retries = DEFAULT_MAX_JOB_RETRIES;
}

let mut disconnect_timeout_s = scheduler_cfg.disconnect_timeout_s;
if disconnect_timeout_s == 0 {
disconnect_timeout_s = DEFAULT_DISCONNECT_TIMEOUT_S;
}

let tasks_or_workers_change_notify = Arc::new(Notify::new());

let metrics = Arc::new(Metrics::default());
Expand All @@ -703,6 +764,8 @@ impl SimpleScheduler {
max_job_retries,
tasks_or_workers_change_notify: tasks_or_workers_change_notify.clone(),
metrics: metrics.clone(),
disconnect_timeout_s,
callbacks: Arc::new(callbacks),
}));
let weak_inner = Arc::downgrade(&inner);
Self {
Expand Down Expand Up @@ -777,6 +840,97 @@ impl SimpleScheduler {

#[async_trait]
impl ActionScheduler for SimpleScheduler {
fn notify_client_disconnected(&self, unique_qualifier: &ActionInfoHashKey) {
// TODO: Make this prettier.
// It's a bit tricky to comply with borrow checker
// but it should be possible to make this nicer.
let inner = self.get_inner_lock();
let Some(action) = inner
.get_queued_action(&unique_qualifier)
.or_else(|| inner.get_active_action(&unique_qualifier))
else {
warn!(
"Scheduler notified that client disconnected, but failed to find action {}",
unique_qualifier.digest.hash_str()
);
return;
};

if action.notify_channel.receiver_count() != 0 {
return;
}
let sleep_time = Duration::from_secs(inner.disconnect_timeout_s);
let callbacks = inner.callbacks.clone();
// Drop the mutex guard so we don't hold up access.
drop(inner);

let weak_inner = Arc::downgrade(&self.inner);

let unique_qualifier = unique_qualifier.clone();

// We create a spawn here which sleeps for disconnect_timeout_s
// and then checks to see if the listener count is still 0.
// If so, it moves to the removal stage, otherwise returns.
tokio::spawn(async move {
callbacks.sleep(sleep_time).await;
let Some(inner_mux) = weak_inner.upgrade() else {
return;
};
let mut inner = inner_mux.lock();
let Some(action) = inner
.queued_actions_set
.get(&unique_qualifier)
.and_then(|action_info| inner.queued_actions.get(action_info))
else {
if inner.active_actions.contains_key(&unique_qualifier) {
warn!("Action was active and could not be killed");
} else {
info!(
"Action {} completed while client disconnected",
unique_qualifier.digest.hash_str()
);
}
return;
};

// If listener count is 0, remove the action from queued actions
// or active actions, whichever is applicable, and kill the action
// on the worker if it was active.
// If the listener count is not still 0, a client has reconencted
// and there is no more work to be done.
if action.notify_channel.receiver_count() != 0 {
info!(
"Client reconnected before disconnect_timeout elsapsed for Action {}",
unique_qualifier.digest.hash_str()
);
return;
}

warn!(
"Client disconnect timeout elapsed - Removing action with digest hash {}",
action.action_info.unique_qualifier.digest.hash_str()
);

println!("About to remove");
match inner.get_queued_action(&unique_qualifier) {
Some(_) => {
// We can't use the action info from the above call due to borrow checker.
let action_info = inner
.queued_actions_set
.get(&unique_qualifier)
.unwrap()
.clone();
inner.queued_actions_set.remove(&action_info);
inner.queued_actions.remove(&action_info);
}
None => {
inner.active_actions.remove(&unique_qualifier);
// TODO: Send kill on worker signal - PR: #842.
}
}
});
}

async fn get_platform_property_manager(
&self,
_instance_name: &str,
Expand Down Expand Up @@ -1024,9 +1178,9 @@ impl MetricsComponent for SimpleScheduler {
impl MetricsComponent for CompletedAction {
fn gather_metrics(&self, c: &mut CollectorState) {
c.publish(
"completed_timestamp",
&self.completed_time,
"The timestamp this action was completed",
"completed_instant_elapsed",
&self.completed_instant.elapsed(),
"The elapsed time since the action was completed",
);
c.publish(
"current_state",
Expand Down
Loading