Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a644f64
fast_slow_store: only bound followers' wait, never the leader's populate
amankrx May 2, 2026
35cf8f1
fast_slow_store: never pass caller's writer into follower closures
amankrx May 2, 2026
1a236a0
execution_server: pre-validate CAS blobs and return PreconditionFailure
amankrx May 2, 2026
20b1de9
execution_server: detect missing Action proto and surface Preconditio…
amankrx May 2, 2026
55ac2f8
ft_aggregate: pass explicit TIMEOUT to absorb RediSearch slow scans
amankrx May 5, 2026
749e1b4
health_utils: run indicator checks in parallel, not serially
amankrx May 5, 2026
a424b38
health_utils: run indicator checks in parallel, not serially
amankrx May 5, 2026
8266b7d
action_messages: surface PreconditionFailure for any missing-CAS-blob…
amankrx May 6, 2026
4f8e090
fast_slow_store: fall through to slow on stale fast-tier map entries
amankrx May 6, 2026
c461440
dynamic_fake_redis: tolerate the new explicit FT.AGGREGATE TIMEOUT cl…
amankrx May 6, 2026
065daaa
store_awaited_action_db: retry try_subscribe once on miss to close dedup
amankrx May 6, 2026
ee11645
redis_store: lightweight check_health using PING instead of full I/O
amankrx May 6, 2026
e5a7c94
filesystem_store: make unref ENOENT idempotent and demote map/disk di…
amankrx May 6, 2026
1a6cc2a
Merge branch 'main' into temp-cas-bulk-changes
amankrx May 6, 2026
56b0f02
fast_slow_store: bypass leader/follower dedup for huge blobs
amankrx May 7, 2026
cc6dd6f
gcs_store, filesystem_store: lightweight check_health probes
amankrx May 7, 2026
7a89d6b
redis_store: raise check_health PING ceiling from 2s to 4s
amankrx May 7, 2026
ccdfee2
connection_manager: replace usize counter with Semaphore RAII permit
amankrx May 9, 2026
c6c69bd
redis_store: hold subscribed_keys write lock across receiver drop
amankrx May 9, 2026
b3d5473
Merge branch 'upstream/main' into temp-cas-bulk-changes
amankrx May 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,26 @@ pub struct FastSlowSpec {
/// and you wish to have an upstream read only store.
#[serde(default)]
pub slow_direction: StoreDirection,

/// Reads of blobs at or above this size bypass the populating-digests
/// dedup map and stream directly from the slow store, without
/// populating the fast tier.
///
/// Rationale: the leader/follower dedup is a win for blobs whose
/// transfer time is short relative to `LEADER_WAIT_TIMEOUT` — one
/// slow-store fetch fills the fast cache, subsequent readers serve
/// from fast. For multi-GB blobs (typically container layers) the
/// leader's transfer takes minutes; every concurrent follower hits
/// the timeout, falls through to the slow store anyway, and the
/// fast cache is then evicted aggressively to make room for the
/// huge blob — pushing out smaller, more-frequently-read entries.
/// Bypassing dedup for huge blobs avoids both pathologies.
///
/// Set to 0 (default) to use the built-in default of 256 MiB. Set
/// to a very large value (e.g. `u64::MAX`) to disable the bypass
/// and always use dedup regardless of size.
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub bypass_dedup_threshold_bytes: u64,
}

#[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)]
Expand Down
14 changes: 13 additions & 1 deletion nativelink-redis-tester/src/dynamic_fake_redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,21 @@ impl<S: SubscriptionManagerNotify + Send + 'static + Sync> FakeRedisBackend<S> {
panic!("Aggregate query should be a string: {args:?}");
};
let query = str::from_utf8(raw_query).unwrap();
// The real ft_aggregate caller now passes an explicit
// `TIMEOUT <ms>` clause before `LOAD`. Tolerate both
// shapes here so this fake doesn't break older callers
// and the LOAD-args check still validates the bit we
// actually care about.
let load_offset = if matches!(args.get(2), Some(OwnedFrame::BulkString(b)) if b == b"TIMEOUT")
{
// Skip "TIMEOUT" and its millisecond argument.
4
} else {
2
};
// Lazy implementation making assumptions.
assert_eq!(
args[2..6],
args[load_offset..load_offset + 4],
vec![
OwnedFrame::BulkString(b"LOAD".to_vec()),
OwnedFrame::BulkString(b"2".to_vec()),
Expand Down
48 changes: 38 additions & 10 deletions nativelink-scheduler/src/store_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,16 +695,44 @@ where
ActionUniqueQualifier::Cacheable(_) => {}
ActionUniqueQualifier::Uncacheable(_) => return Ok(None),
}
let stream = self
.store
.search_by_index_prefix(SearchUniqueQualifierToAwaitedAction(unique_qualifier))
.await
.err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?;
tokio::pin!(stream);
let maybe_awaited_action = stream
.try_next()
.await
.err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?;
// Lookup, then on a miss retry once after a brief sleep. The
// backing index is `RediSearch`'s secondary index over the
// awaited-action hash-set; there is a sub-millisecond-scale
// window between a peer's HSET completing on the master and
// the index commit becoming visible to a concurrent reader.
// Two near-simultaneous `add_action` calls for the same
// `unique_qualifier` can both observe an empty result in that
// window and each create a separate operation, leading to
// duplicate scheduler operations for the same action digest
// (observed in production as "two same actions running on
// different PRs"). One short retry closes the window without
// a heavyweight atomic-claim mechanism.
//
// Uses real tokio time rather than the configurable `now_fn`
// sleep: `MockInstantWrapped::sleep` busy-yields until mock
// time advances, which scheduler tests don't do for this
// path, and the retry itself is a millisecond-scale physical
// wait that should never participate in test-controlled time.
const SUBSCRIBE_RACE_RETRY_DELAY: Duration = Duration::from_millis(20);
let mut maybe_awaited_action: Option<AwaitedAction> = None;
for attempt in 0..2_u32 {
if attempt > 0 {
tokio::time::sleep(SUBSCRIBE_RACE_RETRY_DELAY).await;
}
let stream = self
.store
.search_by_index_prefix(SearchUniqueQualifierToAwaitedAction(unique_qualifier))
.await
.err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?;
tokio::pin!(stream);
maybe_awaited_action = stream
.try_next()
.await
.err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?;
if maybe_awaited_action.is_some() {
break;
}
}
match maybe_awaited_action {
Some(awaited_action) => {
// TODO(palfrey) We don't support joining completed jobs because we
Expand Down
199 changes: 191 additions & 8 deletions nativelink-service/src/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::fmt;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use bytes::Bytes;
use futures::stream::unfold;
use futures::{Stream, StreamExt};
use nativelink_config::cas_server::{ExecutionConfig, InstanceName, WithInstanceName};
Expand All @@ -35,6 +36,7 @@ use nativelink_proto::google::longrunning::{
CancelOperationRequest, DeleteOperationRequest, GetOperationRequest, ListOperationsRequest,
ListOperationsResponse, Operation, WaitOperationRequest,
};
use nativelink_proto::google::rpc::Status as GrpcStatusProto;
use nativelink_store::ac_utils::get_and_decode_digest;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::action_messages::{
Expand All @@ -45,10 +47,89 @@ use nativelink_util::digest_hasher::{DigestHasherFunc, make_ctx_for_hash_func};
use nativelink_util::operation_state_manager::{
ActionStateResult, ClientStateManager, OperationFilter,
};
use nativelink_util::store_trait::Store;
use nativelink_util::store_trait::{Store, StoreLike};
use opentelemetry::context::FutureExt;
use tonic::{Request, Response, Status};
use tracing::{Instrument, Level, debug, error, error_span, instrument};
use prost::Message as _;
use tonic::{Code, Request, Response, Status};
use tracing::{Instrument, Level, debug, error, error_span, instrument, warn};

/// Inline definition of `google.rpc.PreconditionFailure`. Bazel's
/// `RemoteSpawnRunner` inspects this proto in the gRPC `Status.details`
/// of a `FAILED_PRECONDITION` response and, when violations of type
/// `MISSING` are present, automatically re-uploads the named blobs and
/// retries the Execute call. Without this detail Bazel surfaces the
/// failure as a hard build error — exactly the symptom triggered by an
/// interrupted previous build leaving partial CAS uploads.
mod precondition_failure {
/// `google.rpc.PreconditionFailure`.
#[derive(Clone, PartialEq, ::prost::Message)]
pub(super) struct PreconditionFailure {
#[prost(message, repeated, tag = "1")]
pub(super) violations: ::prost::alloc::vec::Vec<Violation>,
}
/// `google.rpc.PreconditionFailure.Violation`.
#[derive(Clone, PartialEq, ::prost::Message)]
pub(super) struct Violation {
#[prost(string, tag = "1")]
pub(super) r#type: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub(super) subject: ::prost::alloc::string::String,
#[prost(string, tag = "3")]
pub(super) description: ::prost::alloc::string::String,
}
pub(super) const TYPE_URL: &str = "type.googleapis.com/google.rpc.PreconditionFailure";
pub(super) const VIOLATION_TYPE_MISSING: &str = "MISSING";
}

/// Build a tonic [`Status`] of code `FAILED_PRECONDITION` whose details
/// carry a `google.rpc.PreconditionFailure` listing the missing CAS
/// blobs. The format matches what Bazel's `RemoteExecutionService`
/// expects in order to trigger automatic re-upload + retry.
fn missing_blobs_failed_precondition(
missing: &[(DigestInfo, &'static str)],
summary: &str,
) -> Status {
let pf = precondition_failure::PreconditionFailure {
violations: missing
.iter()
.map(|(d, ctx)| precondition_failure::Violation {
r#type: precondition_failure::VIOLATION_TYPE_MISSING.to_string(),
// Per REv2, the subject for a missing-blob violation is
// `blobs/<hash>/<size>` so the client knows exactly
// which digest to re-upload.
subject: format!("blobs/{}/{}", d.packed_hash(), d.size_bytes()),
description: (*ctx).to_string(),
})
.collect(),
};

// Wrap PreconditionFailure into a google.protobuf.Any.
let mut pf_buf: Vec<u8> = Vec::with_capacity(pf.encoded_len());
pf.encode(&mut pf_buf).unwrap_or(());
let any = prost_types::Any {
type_url: precondition_failure::TYPE_URL.to_string(),
value: pf_buf,
};

// tonic places the `details` argument verbatim into the
// `grpc-status-details-bin` trailer, which is itself a serialized
// `google.rpc.Status` proto. Encode the whole Status (code +
// message + details) so Bazel's RemoteExecutionService can decode
// the PreconditionFailure violations and re-upload missing blobs.
let status_proto = GrpcStatusProto {
code: Code::FailedPrecondition as i32,
message: summary.to_string(),
details: vec![any],
};
let mut status_buf: Vec<u8> = Vec::with_capacity(status_proto.encoded_len());
status_proto.encode(&mut status_buf).unwrap_or(());

Status::with_details(
Code::FailedPrecondition,
summary.to_string(),
Bytes::from(status_buf),
)
}

type InstanceInfoName = String;

Expand Down Expand Up @@ -239,10 +320,17 @@ impl ExecutionServer {
})
}

/// Outer `Err`: an internal error that the outer handler will tip
/// and convert via `Status::from(Error)`.
/// Inner `Err(Status)`: a pre-built `tonic::Status` that must be
/// returned to the client verbatim (e.g. `FAILED_PRECONDITION` with
/// a `PreconditionFailure` detail listing missing CAS blobs, which
/// Bazel uses to drive automatic re-upload + retry).
async fn inner_execute(
&self,
request: ExecuteRequest,
) -> Result<impl Stream<Item = Result<Operation, Status>> + Send + use<>, Error> {
) -> Result<Result<impl Stream<Item = Result<Operation, Status>> + Send + use<>, Status>, Error>
{
let instance_name = request.instance_name;

let instance_info = self
Expand All @@ -261,8 +349,95 @@ impl ExecutionServer {
.execution_policy
.map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority);

let action =
get_and_decode_digest::<Action>(&instance_info.cas_store, digest.into()).await?;
// Same recovery model as the post-Action blob check below, but
// for the Action proto itself: Bazel's interrupted-build state
// can leave the Action digest absent from CAS; without a
// `PreconditionFailure` detail Bazel can't know which blob to
// re-upload and surfaces the failure as terminal. Detect
// NotFound on the Action fetch and translate it into the same
// FAILED_PRECONDITION + violations shape so Bazel auto-recovers.
let action = match get_and_decode_digest::<Action>(&instance_info.cas_store, digest.into())
.await
{
Ok(a) => a,
Err(e) if e.code == Code::NotFound => {
warn!(
%digest,
%e,
"Execute: Action proto missing from CAS; returning FAILED_PRECONDITION with PreconditionFailure detail so Bazel can re-upload"
);
let summary = format!(
"Action {digest} is missing from CAS; client should re-upload it and retry"
);
return Ok(Err(missing_blobs_failed_precondition(
&[(digest, "Action")],
&summary,
)));
}
Err(e) => return Err(e).err_tip(|| "Decoding Action proto in Execute")?,
};

// Eager-validate that the blobs we'll feed to the worker are
// present in CAS *before* queuing the action. The most common
// way to land here without those blobs is an interrupted
// previous build: Bazel's local upload-state believes the blobs
// were sent, but the CAS does not have them, so a worker would
// later fail with `FAILED_PRECONDITION ... not found in either
// fast or slow store`. Surfaced to Bazel without a
// `PreconditionFailure` detail it becomes a hard build error;
// surfaced *with* one Bazel re-uploads the missing blobs and
// retries the Execute call automatically.
let action_command_digest = action
.command_digest
.as_ref()
.map(|d| DigestInfo::try_from(d.clone()))
.transpose()
.err_tip(|| "Failed to parse command_digest from Action")?;
let action_input_root_digest = action
.input_root_digest
.as_ref()
.map(|d| DigestInfo::try_from(d.clone()))
.transpose()
.err_tip(|| "Failed to parse input_root_digest from Action")?;
let mut blobs_to_check: Vec<DigestInfo> = Vec::with_capacity(2);
if let Some(d) = action_command_digest {
blobs_to_check.push(d);
}
if let Some(d) = action_input_root_digest {
blobs_to_check.push(d);
}
if !blobs_to_check.is_empty() {
let store_keys: Vec<_> = blobs_to_check.iter().map(|d| (*d).into()).collect();
let sizes = instance_info
.cas_store
.has_many(&store_keys)
.await
.err_tip(|| "Validating Action input blobs in CAS")?;
let mut missing: Vec<(DigestInfo, &'static str)> = Vec::new();
for ((digest, present), label) in blobs_to_check
.iter()
.zip(sizes.iter())
.zip(["Action.command_digest", "Action.input_root_digest"].iter())
{
if present.is_none() {
missing.push((*digest, label));
}
}
if !missing.is_empty() {
warn!(
?missing,
%digest,
"Execute pre-check found missing CAS blobs; returning FAILED_PRECONDITION with PreconditionFailure detail so Bazel can re-upload"
);
let summary = format!(
"{} CAS blob(s) referenced by action {} are missing; client should re-upload them and retry",
missing.len(),
digest,
);
return Ok(Err(missing_blobs_failed_precondition(&missing, &summary)));
}
}

let action_info = instance_info
.build_action_info(
instance_name.clone(),
Expand All @@ -283,7 +458,7 @@ impl ExecutionServer {
.await
.err_tip(|| "Failed to schedule task")?;

Ok(Box::pin(Self::to_execute_stream(
Ok(Ok(Self::to_execute_stream(
&NativelinkOperationId::new(
instance_name,
action_listener
Expand Down Expand Up @@ -355,7 +530,15 @@ impl Execution for ExecutionServer {
.await
.err_tip(|| "Failed on execute() command")?;

Ok(Response::new(Box::pin(result)))
// Inner result distinguishes "internal error" (already
// converted via err_tip + Status::from above) from
// "deliberately constructed Status to return verbatim", so we
// can deliver `FAILED_PRECONDITION` with `PreconditionFailure`
// details to Bazel.
match result {
Ok(stream) => Ok(Response::new(Box::pin(stream))),
Err(status) => Err(status),
}
}

#[instrument(
Expand Down
Loading
Loading