From a644f6473b8103ac7dc7aab3ce76f3078cd3857c Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Sat, 2 May 2026 10:24:38 +0100 Subject: [PATCH 01/18] fast_slow_store: only bound followers' wait, never the leader's populate --- nativelink-service/src/cas_server.rs | 46 +++++++++++++---- nativelink-store/src/fast_slow_store.rs | 68 +++++++++++++++++++++++-- nativelink-store/src/redis_store.rs | 4 ++ 3 files changed, 104 insertions(+), 14 deletions(-) diff --git a/nativelink-service/src/cas_server.rs b/nativelink-service/src/cas_server.rs index 9e0424316..68e146686 100644 --- a/nativelink-service/src/cas_server.rs +++ b/nativelink-service/src/cas_server.rs @@ -14,13 +14,14 @@ use core::convert::Into; use core::pin::Pin; +use core::time::Duration; use std::collections::{HashMap, VecDeque}; use bytes::Bytes; use futures::stream::{FuturesUnordered, Stream}; use futures::{StreamExt, TryStreamExt}; use nativelink_config::cas_server::{CasStoreConfig, WithInstanceName}; -use nativelink_error::{Code, Error, ResultExt, error_if, make_input_err}; +use nativelink_error::{Code, Error, ResultExt, error_if, make_err, make_input_err}; use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_server::{ ContentAddressableStorage, ContentAddressableStorageServer as Server, }; @@ -48,6 +49,9 @@ pub struct CasServer { type GetTreeStream = Pin> + Send + 'static>>; +/// Per-blob deadline applied inside `BatchReadBlobs` / `BatchUpdateBlobs`. +const BATCH_PER_BLOB_TIMEOUT: Duration = Duration::from_secs(30); + impl CasServer { pub fn new( configs: &[WithInstanceName], @@ -135,10 +139,22 @@ impl CasServer { size_bytes, request_data.len() ); - let result = store_ref - .update_oneshot(digest_info, request_data) - .await - .err_tip(|| "Error writing to store"); + // Apply a per-blob deadline so one slow upload does not + // make the whole batch hit the client's overall deadline. + let result = match tokio::time::timeout( + BATCH_PER_BLOB_TIMEOUT, + store_ref.update_oneshot(digest_info, request_data), + ) + .await + { + Ok(r) => r.err_tip(|| "Error writing to store"), + Err(_elapsed) => Err(make_err!( + Code::DeadlineExceeded, + "BatchUpdateBlobs per-blob timeout ({} s) elapsed for digest {}", + BATCH_PER_BLOB_TIMEOUT.as_secs(), + digest_info, + )), + }; Ok::<_, Error>(batch_update_blobs_response::Response { digest: Some(digest), status: Some(result.map_or_else(Into::into, |()| GrpcStatus::default())), @@ -178,10 +194,22 @@ impl CasServer { .map(|digest| async move { let digest_copy = DigestInfo::try_from(digest.clone())?; // TODO(palfrey) There is a security risk here of someone taking all the memory on the instance. - let result = store_ref - .get_part_unchunked(digest_copy, 0, None) - .await - .err_tip(|| "Error reading from store"); + // Apply a per-blob deadline so one slow read does not + // make the whole batch hit the client's overall deadline. + let result = match tokio::time::timeout( + BATCH_PER_BLOB_TIMEOUT, + store_ref.get_part_unchunked(digest_copy, 0, None), + ) + .await + { + Ok(r) => r.err_tip(|| "Error reading from store"), + Err(_elapsed) => Err(make_err!( + Code::DeadlineExceeded, + "BatchReadBlobs per-blob timeout ({} s) elapsed for digest {}", + BATCH_PER_BLOB_TIMEOUT.as_secs(), + digest_copy, + )), + }; let (status, data) = result.map_or_else( |mut e| { if e.code == Code::NotFound { diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 0b03a9d6a..617e6f36b 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -17,6 +17,7 @@ use core::cmp::{max, min}; use core::ops::Range; use core::pin::Pin; use core::sync::atomic::{AtomicU64, Ordering}; +use core::time::Duration; use std::collections::HashMap; use std::ffi::OsString; use std::sync::{Arc, Weak}; @@ -72,6 +73,7 @@ struct LoaderGuard<'a> { weak_store: Weak, key: StoreKey<'a>, loader: Option, + is_leader: bool, } impl LoaderGuard<'_> { @@ -142,6 +144,7 @@ impl FastSlowStore { fn get_loader<'a>(&self, key: StoreKey<'a>) -> LoaderGuard<'a> { // Get a single loader instance that's used to populate the fast store // for this digest. If another request comes in then it's de-duplicated. + let mut is_leader = false; let loader = match self .populating_digests .lock() @@ -151,6 +154,7 @@ impl FastSlowStore { occupied_entry.get().clone() } std::collections::hash_map::Entry::Vacant(vacant_entry) => { + is_leader = true; vacant_entry.insert(Arc::new(OnceCell::new())).clone() } }; @@ -158,6 +162,7 @@ impl FastSlowStore { weak_store: self.weak_self.clone(), key, loader: Some(loader), + is_leader, } } @@ -640,11 +645,51 @@ impl StoreDriver for FastSlowStore { } let mut writer = Some(writer); - self.get_loader(key.borrow()) - .get_or_try_init(|| { - self.populate_and_maybe_stream(key.borrow(), writer.take(), offset, length) - }) - .await?; + + let needs_slow_store_fallback: bool = { + let loader_guard = self.get_loader(key.borrow()); + let is_leader = loader_guard.is_leader; + if is_leader { + loader_guard + .get_or_try_init(|| { + self.populate_and_maybe_stream(key.borrow(), writer.take(), offset, length) + }) + .await?; + false + } else { + let load_fut = loader_guard.get_or_try_init(|| async { + self.populate_and_maybe_stream(key.borrow(), writer.take(), offset, length) + .await + }); + match tokio::time::timeout(LEADER_WAIT_TIMEOUT, load_fut).await { + Ok(result) => { + result?; + false + } + Err(_elapsed) => { + self.metrics + .leader_wait_timeouts + .fetch_add(1, Ordering::Acquire); + warn!( + %key, + timeout_secs = LEADER_WAIT_TIMEOUT.as_secs(), + "FastSlowStore::get_part: leader-wait exceeded timeout, bypassing dedup and reading slow store directly", + ); + true + } + } + } + }; + + if needs_slow_store_fallback && let Some(writer) = writer.take() { + return self + .slow_store + .get_part(key, writer, offset, length) + .await + .err_tip( + || "In FastSlowStore::get_part slow_store fallback after leader-wait timeout", + ); + } // If we didn't stream then re-enter which will stream from the fast // store, or retry the download. We should not get in a loop here @@ -691,6 +736,19 @@ struct FastSlowStoreMetrics { slow_store_hit_count: AtomicU64, #[metric(help = "Downloaded bytes from the slow store")] slow_store_downloaded_bytes: AtomicU64, + #[metric( + help = "Number of times a follower bypassed the populating-digests dedup because the leader exceeded LEADER_WAIT_TIMEOUT" + )] + leader_wait_timeouts: AtomicU64, } +/// Maximum time a follower will wait on the leader-populator before +/// bypassing the dedup map and reading directly from the slow store. +/// +/// Without this bound a single wedged populator would block every +/// concurrent reader of the same digest until each one's own `gRPC` +/// deadline fired (e.g. Bazel's `--remote_timeout`), turning a +/// single slow read into a fan-out of `DEADLINE_EXCEEDED` errors. +const LEADER_WAIT_TIMEOUT: Duration = Duration::from_secs(60); + default_health_status_indicator!(FastSlowStore); diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index 78053d216..979883c43 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -1755,6 +1755,10 @@ where } }; + if matches!(raw_redis_map, Value::Int(_)) { + return None; + } + let Some(redis_map) = raw_redis_map.as_sequence() else { return Some(Err(Error::new( Code::Internal, From 35cf8f1f2bdf01880019aad78246113f0e4146d3 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Sat, 2 May 2026 10:35:06 +0100 Subject: [PATCH 02/18] fast_slow_store: never pass caller's writer into follower closures --- nativelink-store/src/fast_slow_store.rs | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 617e6f36b..41645daad 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -646,6 +646,25 @@ impl StoreDriver for FastSlowStore { let mut writer = Some(writer); + // Drive the dedup loader. Two distinct paths: + // + // * Leader (created the OnceCell entry): runs `populate` with + // OUR `writer`, streaming directly to the caller while + // filling the fast cache. No timeout: a multi-GB blob + // legitimately takes minutes to stream, and cancelling our + // own populate would propagate the failure to every other + // reader of this digest. + // + // * Follower: bound the wait so a wedged leader does not pin + // us until the upstream `gRPC` deadline fires. The follower + // closure passes `None` for `writer`. This is critical: if + // the OnceCell ever promotes our follower closure to leader + // (because the original leader's future was dropped), and + // our `tokio::time::timeout` then cancels it, *no* caller + // `writer` was ever moved into the populate stream, so no + // `gRPC` sender is dropped without EOF. The follower then + // re-enters `get_part` below and reads from the now-warm + // fast cache, OR falls back to the slow store on timeout. let needs_slow_store_fallback: bool = { let loader_guard = self.get_loader(key.borrow()); let is_leader = loader_guard.is_leader; @@ -657,9 +676,8 @@ impl StoreDriver for FastSlowStore { .await?; false } else { - let load_fut = loader_guard.get_or_try_init(|| async { - self.populate_and_maybe_stream(key.borrow(), writer.take(), offset, length) - .await + let load_fut = loader_guard.get_or_try_init(|| { + self.populate_and_maybe_stream(key.borrow(), None, offset, length) }); match tokio::time::timeout(LEADER_WAIT_TIMEOUT, load_fut).await { Ok(result) => { From 1a236a02c33c18068e033e7edcc34be29bbd69c1 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Sun, 3 May 2026 00:04:02 +0100 Subject: [PATCH 03/18] execution_server: pre-validate CAS blobs and return PreconditionFailure --- nativelink-service/src/execution_server.rs | 170 ++++++++++++++++++++- 1 file changed, 164 insertions(+), 6 deletions(-) diff --git a/nativelink-service/src/execution_server.rs b/nativelink-service/src/execution_server.rs index f3f00878a..8234dfb3a 100644 --- a/nativelink-service/src/execution_server.rs +++ b/nativelink-service/src/execution_server.rs @@ -20,6 +20,7 @@ use std::fmt; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; +use bytes::Bytes; use futures::stream::unfold; use futures::{Stream, StreamExt}; use nativelink_config::cas_server::{ExecutionConfig, InstanceName, WithInstanceName}; @@ -35,6 +36,7 @@ use nativelink_proto::google::longrunning::{ CancelOperationRequest, DeleteOperationRequest, GetOperationRequest, ListOperationsRequest, ListOperationsResponse, Operation, WaitOperationRequest, }; +use nativelink_proto::google::rpc::Status as GrpcStatusProto; use nativelink_store::ac_utils::get_and_decode_digest; use nativelink_store::store_manager::StoreManager; use nativelink_util::action_messages::{ @@ -45,10 +47,89 @@ use nativelink_util::digest_hasher::{DigestHasherFunc, make_ctx_for_hash_func}; use nativelink_util::operation_state_manager::{ ActionStateResult, ClientStateManager, OperationFilter, }; -use nativelink_util::store_trait::Store; +use nativelink_util::store_trait::{Store, StoreLike}; use opentelemetry::context::FutureExt; -use tonic::{Request, Response, Status}; -use tracing::{Instrument, Level, debug, error, error_span, instrument}; +use prost::Message as _; +use tonic::{Code, Request, Response, Status}; +use tracing::{Instrument, Level, debug, error, error_span, instrument, warn}; + +/// Inline definition of `google.rpc.PreconditionFailure`. Bazel's +/// `RemoteSpawnRunner` inspects this proto in the gRPC `Status.details` +/// of a `FAILED_PRECONDITION` response and, when violations of type +/// `MISSING` are present, automatically re-uploads the named blobs and +/// retries the Execute call. Without this detail Bazel surfaces the +/// failure as a hard build error — exactly the symptom triggered by an +/// interrupted previous build leaving partial CAS uploads. +mod precondition_failure { + /// `google.rpc.PreconditionFailure`. + #[derive(Clone, PartialEq, ::prost::Message)] + pub(super) struct PreconditionFailure { + #[prost(message, repeated, tag = "1")] + pub(super) violations: ::prost::alloc::vec::Vec, + } + /// `google.rpc.PreconditionFailure.Violation`. + #[derive(Clone, PartialEq, ::prost::Message)] + pub(super) struct Violation { + #[prost(string, tag = "1")] + pub(super) r#type: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub(super) subject: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub(super) description: ::prost::alloc::string::String, + } + pub(super) const TYPE_URL: &str = "type.googleapis.com/google.rpc.PreconditionFailure"; + pub(super) const VIOLATION_TYPE_MISSING: &str = "MISSING"; +} + +/// Build a tonic [`Status`] of code `FAILED_PRECONDITION` whose details +/// carry a `google.rpc.PreconditionFailure` listing the missing CAS +/// blobs. The format matches what Bazel's `RemoteExecutionService` +/// expects in order to trigger automatic re-upload + retry. +fn missing_blobs_failed_precondition( + missing: &[(DigestInfo, &'static str)], + summary: &str, +) -> Status { + let pf = precondition_failure::PreconditionFailure { + violations: missing + .iter() + .map(|(d, ctx)| precondition_failure::Violation { + r#type: precondition_failure::VIOLATION_TYPE_MISSING.to_string(), + // Per REv2, the subject for a missing-blob violation is + // `blobs//` so the client knows exactly + // which digest to re-upload. + subject: format!("blobs/{}/{}", d.packed_hash(), d.size_bytes()), + description: (*ctx).to_string(), + }) + .collect(), + }; + + // Wrap PreconditionFailure into a google.protobuf.Any. + let mut pf_buf: Vec = Vec::with_capacity(pf.encoded_len()); + pf.encode(&mut pf_buf).unwrap_or(()); + let any = prost_types::Any { + type_url: precondition_failure::TYPE_URL.to_string(), + value: pf_buf, + }; + + // tonic places the `details` argument verbatim into the + // `grpc-status-details-bin` trailer, which is itself a serialized + // `google.rpc.Status` proto. Encode the whole Status (code + + // message + details) so Bazel's RemoteExecutionService can decode + // the PreconditionFailure violations and re-upload missing blobs. + let status_proto = GrpcStatusProto { + code: Code::FailedPrecondition as i32, + message: summary.to_string(), + details: vec![any], + }; + let mut status_buf: Vec = Vec::with_capacity(status_proto.encoded_len()); + status_proto.encode(&mut status_buf).unwrap_or(()); + + Status::with_details( + Code::FailedPrecondition, + summary.to_string(), + Bytes::from(status_buf), + ) +} type InstanceInfoName = String; @@ -239,10 +320,17 @@ impl ExecutionServer { }) } + /// Outer `Err`: an internal error that the outer handler will tip + /// and convert via `Status::from(Error)`. + /// Inner `Err(Status)`: a pre-built `tonic::Status` that must be + /// returned to the client verbatim (e.g. `FAILED_PRECONDITION` with + /// a `PreconditionFailure` detail listing missing CAS blobs, which + /// Bazel uses to drive automatic re-upload + retry). async fn inner_execute( &self, request: ExecuteRequest, - ) -> Result> + Send + use<>, Error> { + ) -> Result> + Send + use<>, Status>, Error> + { let instance_name = request.instance_name; let instance_info = self @@ -263,6 +351,68 @@ impl ExecutionServer { let action = get_and_decode_digest::(&instance_info.cas_store, digest.into()).await?; + + // Eager-validate that the blobs we'll feed to the worker are + // present in CAS *before* queuing the action. The most common + // way to land here without those blobs is an interrupted + // previous build: Bazel's local upload-state believes the blobs + // were sent, but the CAS does not have them, so a worker would + // later fail with `FAILED_PRECONDITION ... not found in either + // fast or slow store`. Surfaced to Bazel without a + // `PreconditionFailure` detail it becomes a hard build error; + // surfaced *with* one Bazel re-uploads the missing blobs and + // retries the Execute call automatically. + let action_command_digest = action + .command_digest + .as_ref() + .map(|d| DigestInfo::try_from(d.clone())) + .transpose() + .err_tip(|| "Failed to parse command_digest from Action")?; + let action_input_root_digest = action + .input_root_digest + .as_ref() + .map(|d| DigestInfo::try_from(d.clone())) + .transpose() + .err_tip(|| "Failed to parse input_root_digest from Action")?; + let mut blobs_to_check: Vec = Vec::with_capacity(2); + if let Some(d) = action_command_digest { + blobs_to_check.push(d); + } + if let Some(d) = action_input_root_digest { + blobs_to_check.push(d); + } + if !blobs_to_check.is_empty() { + let store_keys: Vec<_> = blobs_to_check.iter().map(|d| (*d).into()).collect(); + let sizes = instance_info + .cas_store + .has_many(&store_keys) + .await + .err_tip(|| "Validating Action input blobs in CAS")?; + let mut missing: Vec<(DigestInfo, &'static str)> = Vec::new(); + for ((digest, present), label) in blobs_to_check + .iter() + .zip(sizes.iter()) + .zip(["Action.command_digest", "Action.input_root_digest"].iter()) + { + if present.is_none() { + missing.push((*digest, label)); + } + } + if !missing.is_empty() { + warn!( + ?missing, + %digest, + "Execute pre-check found missing CAS blobs; returning FAILED_PRECONDITION with PreconditionFailure detail so Bazel can re-upload" + ); + let summary = format!( + "{} CAS blob(s) referenced by action {} are missing; client should re-upload them and retry", + missing.len(), + digest, + ); + return Ok(Err(missing_blobs_failed_precondition(&missing, &summary))); + } + } + let action_info = instance_info .build_action_info( instance_name.clone(), @@ -283,7 +433,7 @@ impl ExecutionServer { .await .err_tip(|| "Failed to schedule task")?; - Ok(Box::pin(Self::to_execute_stream( + Ok(Ok(Self::to_execute_stream( &NativelinkOperationId::new( instance_name, action_listener @@ -355,7 +505,15 @@ impl Execution for ExecutionServer { .await .err_tip(|| "Failed on execute() command")?; - Ok(Response::new(Box::pin(result))) + // Inner result distinguishes "internal error" (already + // converted via err_tip + Status::from above) from + // "deliberately constructed Status to return verbatim", so we + // can deliver `FAILED_PRECONDITION` with `PreconditionFailure` + // details to Bazel. + match result { + Ok(stream) => Ok(Response::new(Box::pin(stream))), + Err(status) => Err(status), + } } #[instrument( From 20b1de91df1136e91c4f61c49435ff6518def41b Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Sun, 3 May 2026 00:50:09 +0100 Subject: [PATCH 04/18] execution_server: detect missing Action proto and surface PreconditionFailure --- nativelink-service/src/execution_server.rs | 29 ++++++++++++++++++++-- nativelink-store/src/redis_store.rs | 24 +++++++++++++----- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/nativelink-service/src/execution_server.rs b/nativelink-service/src/execution_server.rs index 8234dfb3a..cdd6d52f6 100644 --- a/nativelink-service/src/execution_server.rs +++ b/nativelink-service/src/execution_server.rs @@ -349,8 +349,33 @@ impl ExecutionServer { .execution_policy .map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority); - let action = - get_and_decode_digest::(&instance_info.cas_store, digest.into()).await?; + // Same recovery model as the post-Action blob check below, but + // for the Action proto itself: Bazel's interrupted-build state + // can leave the Action digest absent from CAS; without a + // `PreconditionFailure` detail Bazel can't know which blob to + // re-upload and surfaces the failure as terminal. Detect + // NotFound on the Action fetch and translate it into the same + // FAILED_PRECONDITION + violations shape so Bazel auto-recovers. + let action = match get_and_decode_digest::(&instance_info.cas_store, digest.into()) + .await + { + Ok(a) => a, + Err(e) if e.code == Code::NotFound => { + warn!( + %digest, + %e, + "Execute: Action proto missing from CAS; returning FAILED_PRECONDITION with PreconditionFailure detail so Bazel can re-upload" + ); + let summary = format!( + "Action {digest} is missing from CAS; client should re-upload it and retry" + ); + return Ok(Err(missing_blobs_failed_precondition( + &[(digest, "Action")], + &summary, + ))); + } + Err(e) => return Err(e).err_tip(|| "Decoding Action proto in Execute")?, + }; // Eager-validate that the blobs we'll feed to the worker are // present in CAS *before* queuing the action. The most common diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index 979883c43..f1b8559cb 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -1723,12 +1723,24 @@ where } result => (connection_manager, result), }; - let create_result = result.err_tip(|| { - format!( - "Error with ft_create in RedisStore::search_by_index_prefix({})", - get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY), - ) - }); + // RediSearch returns `Extension: "Index": already exists` + // when our `ft_create` races another caller who already + // created (or never lost) the index. That is *not* a + // real failure: it just means the index is in place, + // which is the only postcondition we care about. Treat + // it as Ok so the merged error below only carries the + // signal-bearing failure (e.g. an actual ft_aggregate + // timeout) instead of polluting it with noise. + let create_result = match result { + Ok(()) => Ok(()), + Err(ref e) if format!("{e:?}").contains("already exists") => Ok(()), + Err(_) => result.err_tip(|| { + format!( + "Error with ft_create in RedisStore::search_by_index_prefix({})", + get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY), + ) + }), + }; let run_result = run_ft_aggregate(connection_manager).await.err_tip(|| { format!( From 55ac2f820588b0d50743f15a9423726dec31d67a Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Tue, 5 May 2026 02:03:20 +0100 Subject: [PATCH 05/18] ft_aggregate: pass explicit TIMEOUT to absorb RediSearch slow scans --- nativelink-store/src/redis_utils/ft_aggregate.rs | 15 +++++++++++++++ nativelink-store/tests/redis_store_test.rs | 8 +++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/nativelink-store/src/redis_utils/ft_aggregate.rs b/nativelink-store/src/redis_utils/ft_aggregate.rs index b987754a3..1bde24d74 100644 --- a/nativelink-store/src/redis_utils/ft_aggregate.rs +++ b/nativelink-store/src/redis_utils/ft_aggregate.rs @@ -35,6 +35,19 @@ pub(crate) struct FtAggregateOptions { pub sort_by: Vec, } +/// Per-query `FT.AGGREGATE` timeout in milliseconds. +/// +/// `RediSearch`'s module default (≈500 ms) is far too tight for the +/// scheduler's awaited-action index under any meaningful load: queries +/// time out, NativeLink surfaces them as parse errors, and the dedup +/// lookup fails. When dedup fails the scheduler creates a duplicate +/// operation for an action that is already in flight — observed as +/// "two same actions running on different PRs" with each running the +/// full `maxActionExecutingTimeoutS` window before completing. Pass an +/// explicit value generous enough to absorb 1M+ document scans on a +/// busy `RediSearch` instance. +const FT_AGGREGATE_TIMEOUT_MS: u64 = 10_000; + /// Calls `FT.AGGREGATE` in redis. redis-rs does not properly support this command /// so we have to manually handle it. pub(crate) async fn ft_aggregate( @@ -56,6 +69,8 @@ where let mut ft_aggregate_cmd = cmd .arg(&index) .arg(&query) + .arg("TIMEOUT") + .arg(FT_AGGREGATE_TIMEOUT_MS) .arg("LOAD") .arg(options.load.len()) .arg(&options.load) diff --git a/nativelink-store/tests/redis_store_test.rs b/nativelink-store/tests/redis_store_test.rs index 64fabcaca..97941b504 100644 --- a/nativelink-store/tests/redis_store_test.rs +++ b/nativelink-store/tests/redis_store_test.rs @@ -1035,6 +1035,8 @@ fn test_search_by_index() -> Result<(), Error> { redis::cmd("FT.AGGREGATE") .arg("test:_content_prefix_sort_key_3e762c15") .arg("@content_prefix:{ Searchable }") + .arg("TIMEOUT") + .arg(10000_u64) .arg("LOAD") .arg(2) .arg("data") @@ -1125,7 +1127,7 @@ fn test_search_by_index_failure() -> Result<(), Error> { "Client: TEST - Client: unexpected command", "Error with ft_create in RedisStore::search_by_index_prefix(test:_content_prefix_sort_key_3e762c15)", "---", "Client: TEST - Client: unexpected command", "Error with second ft_aggregate in RedisStore::search_by_index_prefix(test:_content_prefix_sort_key_3e762c15)"].iter().map(ToString::to_string).collect())); assert!(logs_contain( - "Error calling ft.aggregate e=TEST - Client: unexpected command index=\"test:_content_prefix_sort_key_3e762c15\" query=\"*\" options=FtAggregateOptions { load: [\"data\", \"version\"], cursor: FtAggregateCursor { count: 1500, max_idle: 30000 }, sort_by: [\"@sort_key\"] } all_args=[\"FT.AGGREGATE\", \"test:_content_prefix_sort_key_3e762c15\", \"*\", \"LOAD\", \"2\", \"data\", \"version\", \"WITHCURSOR\", \"COUNT\", \"1500\", \"MAXIDLE\", \"30000\", \"SORTBY\", \"2\", \"@sort_key\", \"ASC\"]" + "Error calling ft.aggregate e=TEST - Client: unexpected command index=\"test:_content_prefix_sort_key_3e762c15\" query=\"*\" options=FtAggregateOptions { load: [\"data\", \"version\"], cursor: FtAggregateCursor { count: 1500, max_idle: 30000 }, sort_by: [\"@sort_key\"] } all_args=[\"FT.AGGREGATE\", \"test:_content_prefix_sort_key_3e762c15\", \"*\", \"TIMEOUT\", \"10000\", \"LOAD\", \"2\", \"data\", \"version\", \"WITHCURSOR\", \"COUNT\", \"1500\", \"MAXIDLE\", \"30000\", \"SORTBY\", \"2\", \"@sort_key\", \"ASC\"]" )); Ok(()) @@ -1138,6 +1140,8 @@ fn test_search_by_index_with_sort_key() -> Result<(), Error> { redis::cmd("FT.AGGREGATE") .arg("test:_content_prefix_sort_key_3e762c15") .arg("@content_prefix:{ Searchable }") + .arg("TIMEOUT") + .arg(10000_u64) .arg("LOAD") .arg(2) .arg("data") @@ -1221,6 +1225,8 @@ fn test_search_by_index_resp3() -> Result<(), Error> { redis::cmd("FT.AGGREGATE") .arg("test:_content_prefix_sort_key_3e762c15") .arg("@content_prefix:{ Searchable }") + .arg("TIMEOUT") + .arg(10000_u64) .arg("LOAD") .arg(2) .arg("data") From 749e1b4d04c72df21381009a5cc2ba233232d713 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Tue, 5 May 2026 07:09:00 +0100 Subject: [PATCH 06/18] health_utils: run indicator checks in parallel, not serially --- nativelink-util/src/health_utils.rs | 51 +++++++++++++++++++---------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/nativelink-util/src/health_utils.rs b/nativelink-util/src/health_utils.rs index ebb4d1abf..c0a3a6b64 100644 --- a/nativelink-util/src/health_utils.rs +++ b/nativelink-util/src/health_utils.rs @@ -201,30 +201,45 @@ pub trait HealthStatusReporter { /// Health status reporter implementation for the health registry that provides a stream /// of health status descriptions. +/// +/// Indicator checks run **in parallel**: each indicator's +/// `check_health` does real I/O against its store (write/has/read +/// roundtrip per [`StoreDriver::check_health`]), so iterating +/// indicators serially makes the total response time `N * timeout`. +/// Under load that easily exceeds Kubernetes liveness probe budgets, +/// causing kubelet to kill an otherwise-healthy pod whose only sin +/// was that one congested store made the probe handler queue behind +/// it. Parallel execution caps the total at ~`timeout` regardless of +/// how many stores are registered. impl HealthStatusReporter for HealthRegistry { fn health_status_report( &self, timeout_limit: &Duration, ) -> Pin + Send + '_>> { - let local_timeout_limit = Arc::new(*timeout_limit); + let local_timeout_limit = *timeout_limit; Box::pin( - futures::stream::iter( - self.indicators - .iter() - .zip(core::iter::repeat(local_timeout_limit)), - ) - .then(|((namespace, indicator), internal_timeout)| async move { - let status_res = - timeout(*internal_timeout, indicator.check_health(namespace.clone())).await; - HealthStatusDescription { - namespace: namespace.clone(), - status: status_res.unwrap_or_else(|_| { - let struct_name = indicator.struct_name(); - warn!(struct_name, "Timeout during health check"); - HealthStatus::Timeout { struct_name } - }), - } - }), + futures::stream::iter(self.indicators.iter().map( + move |(namespace, indicator)| async move { + let status_res = timeout( + local_timeout_limit, + indicator.check_health(namespace.clone()), + ) + .await; + HealthStatusDescription { + namespace: namespace.clone(), + status: status_res.unwrap_or_else(|_| { + let struct_name = indicator.struct_name(); + warn!(struct_name, "Timeout during health check"); + HealthStatus::Timeout { struct_name } + }), + } + }, + )) + // Drive every indicator's check concurrently rather than + // serially. The order of the resulting descriptions is + // not part of the API contract; collect-into-Vec callers + // already ignore order. + .buffer_unordered(usize::MAX), ) } } From a424b38db57e30865028ebfd2bca41d7e276974a Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Tue, 5 May 2026 07:17:31 +0100 Subject: [PATCH 07/18] health_utils: run indicator checks in parallel, not serially --- nativelink-store/src/redis_utils/ft_aggregate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nativelink-store/src/redis_utils/ft_aggregate.rs b/nativelink-store/src/redis_utils/ft_aggregate.rs index 1bde24d74..7974285bd 100644 --- a/nativelink-store/src/redis_utils/ft_aggregate.rs +++ b/nativelink-store/src/redis_utils/ft_aggregate.rs @@ -39,7 +39,7 @@ pub(crate) struct FtAggregateOptions { /// /// `RediSearch`'s module default (≈500 ms) is far too tight for the /// scheduler's awaited-action index under any meaningful load: queries -/// time out, NativeLink surfaces them as parse errors, and the dedup +/// time out, `NativeLink` surfaces them as parse errors, and the dedup /// lookup fails. When dedup fails the scheduler creates a duplicate /// operation for an action that is already in flight — observed as /// "two same actions running on different PRs" with each running the From 8266b7d8917683b949b5580c9a5aee84a36762ca Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Wed, 6 May 2026 01:17:08 +0100 Subject: [PATCH 08/18] action_messages: surface PreconditionFailure for any missing-CAS-blob worker error --- nativelink-util/src/action_messages.rs | 96 +++++++++++++++++++++++++- 1 file changed, 95 insertions(+), 1 deletion(-) diff --git a/nativelink-util/src/action_messages.rs b/nativelink-util/src/action_messages.rs index eaabc37e6..e10b34e02 100644 --- a/nativelink-util/src/action_messages.rs +++ b/nativelink-util/src/action_messages.rs @@ -843,6 +843,85 @@ impl From<&ActionStage> for execution_stage::Value { } } +/// `google.rpc.PreconditionFailure` (defined inline to avoid pulling +/// `tonic-types` and the additional proto-generated module). Bazel's +/// `RemoteExecutionService` reads this proto out of +/// `ExecuteResponse.status.details` for `FAILED_PRECONDITION` results +/// and, for `MISSING` violations, automatically re-uploads the named +/// blobs and retries the Execute call. Mirrors the schema in +/// `google/rpc/error_details.proto`. +mod precondition_failure { + #[derive(Clone, PartialEq, ::prost::Message)] + pub(super) struct PreconditionFailure { + #[prost(message, repeated, tag = "1")] + pub(super) violations: ::prost::alloc::vec::Vec, + } + #[derive(Clone, PartialEq, ::prost::Message)] + pub(super) struct Violation { + #[prost(string, tag = "1")] + pub(super) r#type: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub(super) subject: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub(super) description: ::prost::alloc::string::String, + } + pub(super) const TYPE_URL: &str = "type.googleapis.com/google.rpc.PreconditionFailure"; + pub(super) const VIOLATION_TYPE_MISSING: &str = "MISSING"; +} + +/// Marker substring emitted by `FastSlowStore::populate_and_maybe_stream` +/// when a digest is missing from both tiers. Matching on the string is +/// the only signal we have because `nativelink_error::Error` doesn't +/// carry typed metadata and the worker has already wrapped the inner +/// error with `make_err!(Code::FailedPrecondition, "{}", ...)` by the +/// time we see it here. +const MISSING_BLOB_MARKER: &str = "not found in either fast or slow store"; + +/// Best-effort parse of a `(hash, size)` pair out of a missing-blob +/// error message. The format is `Object {hash}-{size} not found in +/// either fast or slow store ...`. Returns `None` if the message +/// doesn't match — caller falls back to the plain `Error -> Status` +/// conversion. +fn extract_missing_blob_digest(err: &Error) -> Option<(String, i64)> { + for msg in &err.messages { + if !msg.contains(MISSING_BLOB_MARKER) { + continue; + } + let after_object = msg.find("Object ").map(|i| &msg[i + "Object ".len()..])?; + let digest_str = after_object.split(" not found").next()?; + let (hash, size) = digest_str.rsplit_once('-')?; + let size: i64 = size.parse().ok()?; + return Some((hash.to_string(), size)); + } + None +} + +/// Build a `google.rpc.Status` of code `FAILED_PRECONDITION` whose +/// `details` carry a `google.rpc.PreconditionFailure` listing the +/// missing CAS blob. Bazel uses this exact shape to drive automatic +/// re-upload + retry of the missing blob. +fn missing_blob_failed_precondition_status(err: &Error, hash: &str, size: i64) -> Status { + let pf = precondition_failure::PreconditionFailure { + violations: vec![precondition_failure::Violation { + r#type: precondition_failure::VIOLATION_TYPE_MISSING.to_string(), + // REv2-mandated subject format for missing-blob violations. + subject: format!("blobs/{hash}/{size}"), + description: err.message_string(), + }], + }; + let mut buf: Vec = Vec::with_capacity(pf.encoded_len()); + pf.encode(&mut buf).unwrap_or(()); + let any = Any { + type_url: precondition_failure::TYPE_URL.to_string(), + value: buf, + }; + Status { + code: Code::FailedPrecondition as i32, + message: err.message_string(), + details: vec![any], + } +} + pub fn to_execute_response(action_result: ActionResult) -> ExecuteResponse { fn logs_from(server_logs: HashMap) -> HashMap { let mut logs = HashMap::with_capacity(server_logs.len()); @@ -858,11 +937,26 @@ pub fn to_execute_response(action_result: ActionResult) -> ExecuteResponse { logs } + // If the action failed because a CAS blob is missing — most often a + // `Directory` proto in the input tree (the Execute pre-check only + // validates the top-level Action, command_digest, and + // input_root_digest; nested Directories are fetched lazily by the + // worker) — surface the failure as `FAILED_PRECONDITION` with a + // `PreconditionFailure` detail naming the digest. Bazel sees the + // detail, re-uploads the missing blob, and retries automatically; + // without the detail it gives up and the build fails. let status = Some( action_result .error .clone() - .map_or_else(Status::default, Into::into), + .map(|err| { + if let Some((hash, size)) = extract_missing_blob_digest(&err) { + missing_blob_failed_precondition_status(&err, &hash, size) + } else { + err.into() + } + }) + .unwrap_or_default(), ); let message = action_result.message.clone(); ExecuteResponse { From 4f8e090d38e8714fac127474228c07f9976e42b3 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Wed, 6 May 2026 01:33:27 +0100 Subject: [PATCH 09/18] fast_slow_store: fall through to slow on stale fast-tier map entries --- nativelink-store/src/fast_slow_store.rs | 61 ++++++++++++++++++++----- 1 file changed, 49 insertions(+), 12 deletions(-) diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 41645daad..fac5d2a4b 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -609,19 +609,52 @@ impl StoreDriver for FastSlowStore { offset: u64, length: Option, ) -> Result<(), Error> { - // TODO(palfrey) Investigate if we should maybe ignore errors here instead of - // forwarding them up. + // The fast store's `has()` is a metadata lookup against its + // in-memory eviction map; the actual file may have already been + // evicted off disk between `has()` and the subsequent + // `get_part()` (or — observed in production — added to the map + // by a write whose final rename failed, leaving a phantom + // entry). When that happens the filesystem store returns + // `NotFound` from `get_part`. Falling through to populate from + // the slow store recovers transparently; propagating the error + // would surface as `FAILED_PRECONDITION ... not found in + // either fast or slow store` to the worker and fail the + // action even though the slow store has the blob. + // + // We only fall through when no bytes have been written to the + // caller's `writer` yet — once partial bytes have been streamed + // we cannot honour a retry without producing a corrupt result. if self.fast_store.has(key.borrow()).await?.is_some() { - self.metrics - .fast_store_hit_count - .fetch_add(1, Ordering::Acquire); - self.fast_store - .get_part(key, writer.borrow_mut(), offset, length) - .await?; - self.metrics - .fast_store_downloaded_bytes - .fetch_add(writer.get_bytes_written(), Ordering::Acquire); - return Ok(()); + let bytes_before = writer.get_bytes_written(); + match self + .fast_store + .get_part(key.borrow(), writer.borrow_mut(), offset, length) + .await + { + Ok(()) => { + self.metrics + .fast_store_hit_count + .fetch_add(1, Ordering::Acquire); + self.metrics + .fast_store_downloaded_bytes + .fetch_add(writer.get_bytes_written(), Ordering::Acquire); + return Ok(()); + } + Err(e) + if e.code == Code::NotFound && writer.get_bytes_written() == bytes_before => + { + self.metrics + .fast_store_stale_map_falls_through + .fetch_add(1, Ordering::Acquire); + warn!( + %key, + ?e, + "FastSlowStore::get_part: fast store had a map entry but the file was missing on disk; falling through to slow store", + ); + // fall through to the populate path below + } + Err(e) => return Err(e), + } } // If the fast store is noop or read only or update only then bypass it. @@ -758,6 +791,10 @@ struct FastSlowStoreMetrics { help = "Number of times a follower bypassed the populating-digests dedup because the leader exceeded LEADER_WAIT_TIMEOUT" )] leader_wait_timeouts: AtomicU64, + #[metric( + help = "Number of times the fast store reported has() == Some but the subsequent get_part returned NotFound (stale eviction-map entry); request fell through to populate from the slow store" + )] + fast_store_stale_map_falls_through: AtomicU64, } /// Maximum time a follower will wait on the leader-populator before From c461440591063c1eaf003c38bb4d7baf4b46c18a Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Wed, 6 May 2026 01:52:37 +0100 Subject: [PATCH 10/18] dynamic_fake_redis: tolerate the new explicit FT.AGGREGATE TIMEOUT clause --- nativelink-redis-tester/src/dynamic_fake_redis.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/nativelink-redis-tester/src/dynamic_fake_redis.rs b/nativelink-redis-tester/src/dynamic_fake_redis.rs index fa642f453..14610ae78 100644 --- a/nativelink-redis-tester/src/dynamic_fake_redis.rs +++ b/nativelink-redis-tester/src/dynamic_fake_redis.rs @@ -137,9 +137,21 @@ impl FakeRedisBackend { panic!("Aggregate query should be a string: {args:?}"); }; let query = str::from_utf8(raw_query).unwrap(); + // The real ft_aggregate caller now passes an explicit + // `TIMEOUT ` clause before `LOAD`. Tolerate both + // shapes here so this fake doesn't break older callers + // and the LOAD-args check still validates the bit we + // actually care about. + let load_offset = if matches!(args.get(2), Some(OwnedFrame::BulkString(b)) if b == b"TIMEOUT") + { + // Skip "TIMEOUT" and its millisecond argument. + 4 + } else { + 2 + }; // Lazy implementation making assumptions. assert_eq!( - args[2..6], + args[load_offset..load_offset + 4], vec![ OwnedFrame::BulkString(b"LOAD".to_vec()), OwnedFrame::BulkString(b"2".to_vec()), From 065daaa4f79626e63f44f5a15ada6568a9221159 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Wed, 6 May 2026 13:02:20 +0100 Subject: [PATCH 11/18] store_awaited_action_db: retry try_subscribe once on miss to close dedup --- .../src/store_awaited_action_db.rs | 48 +++++++++++++++---- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/nativelink-scheduler/src/store_awaited_action_db.rs b/nativelink-scheduler/src/store_awaited_action_db.rs index d55039e8e..909008f09 100644 --- a/nativelink-scheduler/src/store_awaited_action_db.rs +++ b/nativelink-scheduler/src/store_awaited_action_db.rs @@ -677,16 +677,44 @@ where ActionUniqueQualifier::Cacheable(_) => {} ActionUniqueQualifier::Uncacheable(_) => return Ok(None), } - let stream = self - .store - .search_by_index_prefix(SearchUniqueQualifierToAwaitedAction(unique_qualifier)) - .await - .err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?; - tokio::pin!(stream); - let maybe_awaited_action = stream - .try_next() - .await - .err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?; + // Lookup, then on a miss retry once after a brief sleep. The + // backing index is `RediSearch`'s secondary index over the + // awaited-action hash-set; there is a sub-millisecond-scale + // window between a peer's HSET completing on the master and + // the index commit becoming visible to a concurrent reader. + // Two near-simultaneous `add_action` calls for the same + // `unique_qualifier` can both observe an empty result in that + // window and each create a separate operation, leading to + // duplicate scheduler operations for the same action digest + // (observed in production as "two same actions running on + // different PRs"). One short retry closes the window without + // a heavyweight atomic-claim mechanism. + // + // Uses real tokio time rather than the configurable `now_fn` + // sleep: `MockInstantWrapped::sleep` busy-yields until mock + // time advances, which scheduler tests don't do for this + // path, and the retry itself is a millisecond-scale physical + // wait that should never participate in test-controlled time. + const SUBSCRIBE_RACE_RETRY_DELAY: Duration = Duration::from_millis(20); + let mut maybe_awaited_action: Option = None; + for attempt in 0..2_u32 { + if attempt > 0 { + tokio::time::sleep(SUBSCRIBE_RACE_RETRY_DELAY).await; + } + let stream = self + .store + .search_by_index_prefix(SearchUniqueQualifierToAwaitedAction(unique_qualifier)) + .await + .err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?; + tokio::pin!(stream); + maybe_awaited_action = stream + .try_next() + .await + .err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?; + if maybe_awaited_action.is_some() { + break; + } + } match maybe_awaited_action { Some(awaited_action) => { // TODO(palfrey) We don't support joining completed jobs because we From ee1164537d106b36624662027f78361a5b0852b2 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Wed, 6 May 2026 18:56:01 +0100 Subject: [PATCH 12/18] redis_store: lightweight check_health using PING instead of full I/O --- nativelink-store/src/filesystem_store.rs | 37 +++++++++++++--- nativelink-store/src/redis_store.rs | 55 +++++++++++++++++++++++- 2 files changed, 83 insertions(+), 9 deletions(-) diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 9cc8b2507..0d1c9a54d 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -383,13 +383,36 @@ impl LenEntry for FileEntryImpl { let to_path = to_full_path_from_key(&encoded_file_path.shared_context.temp_path, &new_key); if let Err(err) = fs::rename(&from_path, &to_path).await { - warn!( - key = ?encoded_file_path.key, - ?from_path, - ?to_path, - ?err, - "Failed to rename file", - ); + // ENOENT here means the file we expected at `from_path` + // was already gone — typically because another thread's + // eviction beat us to the unref, or because the entry + // ended up in our map without its file ever landing on + // disk (the "phantom-map" case the runtime recovers from + // via `FastSlowStore::get_part`'s slow-store fallback). + // It is benign at this site — there is no file to move + // — and historically dominates the log volume of this + // store under heavy write+evict concurrency, fast enough + // to drown the runtime under sustained pressure. Demote + // to `debug` and drop the per-emission path fields so it + // stops costing serialization in the hot path. + // + // Other rename failures (EACCES, EXDEV, EBUSY, …) are + // genuinely unexpected and stay at `warn` with full + // context. + if err.code == Code::NotFound { + debug!( + key = ?encoded_file_path.key, + "Failed to rename file (already gone, treating as benign)", + ); + } else { + warn!( + key = ?encoded_file_path.key, + ?from_path, + ?to_path, + ?err, + "Failed to rename file", + ); + } } else { debug!( key = ?encoded_file_path.key, diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index f1b8559cb..7daff53c5 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -1100,8 +1100,59 @@ where "RedisStore" } - async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { - StoreDriver::check_health(Pin::new(self), namespace).await + /// Lightweight health check: just `PING` the master, bounded by a + /// short physical timeout. The default `StoreDriver::check_health` + /// performs a full `update_oneshot` + `has` + `get_part_unchunked` + /// roundtrip, which queues behind real production traffic on the + /// same connection-permit semaphore and Redis master. When the + /// store is even moderately loaded that easily exceeds the + /// `HealthServer` per-indicator budget (default 5 s), each + /// RedisStore-backed indicator (AC, small-blob CAS, scheduler) + /// reports `HealthStatus::Timeout`, and `/status` returns 503 — + /// surfaced as a readiness-probe failure that sheds traffic from + /// an otherwise-functional pod. A `PING` proves the connection + /// is reachable and the master is accepting commands; that is + /// the only invariant a kubelet probe needs. + async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus { + /// Per-check physical ceiling. Tight enough to stay well + /// under `HealthServer`'s default per-indicator budget; + /// loose enough to absorb a normally-slow PING during a + /// `BGSAVE` fork or sentinel rebalance. + const PING_TIMEOUT: Duration = Duration::from_secs(2); + + let mut client = match self.get_client().await { + Ok(c) => c, + Err(e) => { + return HealthStatus::new_failed( + self, + format!("RedisStore::check_health: failed to acquire connection: {e}").into(), + ); + } + }; + + // Hold the `ClientWithPermit` for the duration of the call so + // its `Drop` releases the semaphore permit on exit. We just + // need a `&mut` to the connection manager underneath. + let ping = async { + redis::cmd("PING") + .query_async::<()>(&mut client.connection_manager) + .await + }; + match tokio::time::timeout(PING_TIMEOUT, ping).await { + Ok(Ok(())) => HealthStatus::new_ok(self, "RedisStore::check_health: PING ok".into()), + Ok(Err(e)) => HealthStatus::new_failed( + self, + format!("RedisStore::check_health: PING errored: {e}").into(), + ), + Err(_) => HealthStatus::new_failed( + self, + format!( + "RedisStore::check_health: PING exceeded {} s timeout", + PING_TIMEOUT.as_secs() + ) + .into(), + ), + } } } From e5a7c9428f479eb77740ced4ca6c7f7bd83ac599 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Wed, 6 May 2026 19:29:53 +0100 Subject: [PATCH 13/18] filesystem_store: make unref ENOENT idempotent and demote map/disk divergence to warn --- nativelink-store/src/filesystem_store.rs | 32 ++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 0d1c9a54d..451323988 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -396,6 +396,20 @@ impl LenEntry for FileEntryImpl { // to `debug` and drop the per-emission path fields so it // stops costing serialization in the hot path. // + // We also flip `path_type` to `Temp` and adopt the + // would-be temp key here. The semantic effect of `unref` + // is "this entry no longer owns the content path"; if + // the source file is already gone, that postcondition + // already holds. Updating the metadata to match has two + // payoffs: (1) a subsequent `unref` on the same entry + // hits the `path_type == Temp` early-return instead of + // racing again on the same vanished path; (2) the + // EncodedFilePath::drop logic short-circuits on Content + // and would otherwise leak the entry's claim on the + // (already-moved) content path. Marking Temp here keeps + // the entry consistent with reality even though the + // physical move was a no-op. + // // Other rename failures (EACCES, EXDEV, EBUSY, …) are // genuinely unexpected and stay at `warn` with full // context. @@ -404,6 +418,8 @@ impl LenEntry for FileEntryImpl { key = ?encoded_file_path.key, "Failed to rename file (already gone, treating as benign)", ); + encoded_file_path.path_type = PathType::Temp; + encoded_file_path.key = new_key; } else { warn!( key = ?encoded_file_path.key, @@ -1094,10 +1110,22 @@ impl StoreDriver for FilesystemStore { let mut temp_file = entry.read_file_part(offset, read_limit).or_else(|err| async move { // If the file is not found, we need to remove it from the eviction map. if err.code == Code::NotFound { - error!( + // The eviction map and on-disk state have diverged + // for this digest — the map said the file was at the + // content path but `open()` reported ENOENT. The + // runtime recovers automatically: `FastSlowStore::get_part` + // catches our `NotFound` and falls through to populate + // from the slow store (see the + // `fast_store_stale_map_falls_through` metric). So + // this is observed-but-handled, not catastrophic, + // and the original "process probably need restarted" + // message is stale — the divergence self-heals via + // the map removal below plus the slow-store + // re-populate. Demote to `warn` and reword. + warn!( ?err, key = ?owned_key, - "Entry was in our map, but not found on disk. Removing from map as a precaution, but process probably need restarted." + "Filesystem store map/disk divergence: removing entry; reader will fall through to slow store", ); self.evicting_map.remove(&owned_key).await; } From 56b0f020a408863a4403205112c3cd758d1f2ad1 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Thu, 7 May 2026 12:28:30 +0100 Subject: [PATCH 14/18] fast_slow_store: bypass leader/follower dedup for huge blobs The populating-digests dedup is a net loss for multi-GB blobs: the leader's pull from the slow store reliably exceeds LEADER_WAIT_TIMEOUT, every concurrent follower falls through to its own slow-store read anyway, and populating the fast tier with the huge blob evicts a large number of smaller, more-useful entries. Add a configurable size threshold (FastSlowSpec::bypass_dedup_threshold_bytes, default 256 MiB when unset) at and above which get_part streams straight from the slow store and skips the populate. Below the threshold the existing leader/follower path is unchanged. Tests: blobs at and above the threshold drive one slow-store call per concurrent reader and leave the fast tier empty; blobs below the threshold collapse to a single slow-store call and populate fast. --- nativelink-config/src/stores.rs | 20 ++ nativelink-store/src/fast_slow_store.rs | 68 +++++ .../tests/fast_slow_store_test.rs | 277 +++++++++++++++++- nativelink-worker/tests/local_worker_test.rs | 2 + .../tests/running_actions_manager_test.rs | 1 + 5 files changed, 367 insertions(+), 1 deletion(-) diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index e752ca051..fef6d714c 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -739,6 +739,26 @@ pub struct FastSlowSpec { /// and you wish to have an upstream read only store. #[serde(default)] pub slow_direction: StoreDirection, + + /// Reads of blobs at or above this size bypass the populating-digests + /// dedup map and stream directly from the slow store, without + /// populating the fast tier. + /// + /// Rationale: the leader/follower dedup is a win for blobs whose + /// transfer time is short relative to `LEADER_WAIT_TIMEOUT` — one + /// slow-store fetch fills the fast cache, subsequent readers serve + /// from fast. For multi-GB blobs (typically container layers) the + /// leader's transfer takes minutes; every concurrent follower hits + /// the timeout, falls through to the slow store anyway, and the + /// fast cache is then evicted aggressively to make room for the + /// huge blob — pushing out smaller, more-frequently-read entries. + /// Bypassing dedup for huge blobs avoids both pathologies. + /// + /// Set to 0 (default) to use the built-in default of 256 MiB. Set + /// to a very large value (e.g. `u64::MAX`) to disable the bypass + /// and always use dedup regardless of size. + #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")] + pub bypass_dedup_threshold_bytes: u64, } #[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)] diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index fac5d2a4b..aafc961aa 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -57,6 +57,10 @@ pub struct FastSlowStore { #[metric(group = "slow_store")] slow_store: Store, slow_direction: StoreDirection, + /// Reads of blobs whose digest size is >= this value bypass the + /// populating-digests dedup map and stream directly from the slow + /// store. See [`FastSlowSpec::bypass_dedup_threshold_bytes`]. + bypass_dedup_threshold_bytes: u64, weak_self: Weak, #[metric] metrics: FastSlowStoreMetrics, @@ -118,17 +122,40 @@ impl Drop for LoaderGuard<'_> { impl FastSlowStore { pub fn new(spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc { + let bypass_dedup_threshold_bytes = if spec.bypass_dedup_threshold_bytes == 0 { + DEFAULT_BYPASS_DEDUP_THRESHOLD_BYTES + } else { + spec.bypass_dedup_threshold_bytes + }; Arc::new_cyclic(|weak_self| Self { fast_store, fast_direction: spec.fast_direction, slow_store, slow_direction: spec.slow_direction, + bypass_dedup_threshold_bytes, weak_self: weak_self.clone(), metrics: FastSlowStoreMetrics::default(), populating_digests: Mutex::new(HashMap::new()), }) } + /// Returns the digest size in bytes if `key` is a digest key, else `None`. + /// Non-digest keys (e.g. action-cache `StoreKey::Str`) carry no inherent + /// size and are never subject to the huge-blob bypass. + fn digest_size_bytes(key: &StoreKey<'_>) -> Option { + match key { + StoreKey::Digest(d) => Some(d.size_bytes()), + StoreKey::Str(_) => None, + } + } + + /// Whether a read of `key` should bypass the populating-digests + /// dedup and read directly from the slow store. True only when the + /// blob's digest size meets or exceeds the configured threshold. + fn should_bypass_dedup(&self, key: &StoreKey<'_>) -> bool { + Self::digest_size_bytes(key).is_some_and(|size| size >= self.bypass_dedup_threshold_bytes) + } + pub const fn fast_store(&self) -> &Store { &self.fast_store } @@ -677,6 +704,35 @@ impl StoreDriver for FastSlowStore { return Ok(()); } + // Huge-blob bypass. The leader/follower dedup is a net loss for + // multi-GB blobs: the leader's pull from the slow store takes + // long enough that every concurrent follower hits + // `LEADER_WAIT_TIMEOUT` and falls through to its own slow-store + // read anyway, *and* populating the fast tier with the huge + // blob evicts a large number of smaller, more-useful entries. + // Stream directly from the slow store and skip the populate. + if self.should_bypass_dedup(&key) { + self.metrics + .huge_blob_dedup_bypasses + .fetch_add(1, Ordering::Acquire); + self.metrics + .slow_store_hit_count + .fetch_add(1, Ordering::Acquire); + debug!( + %key, + threshold_bytes = self.bypass_dedup_threshold_bytes, + "FastSlowStore::get_part: blob meets huge-blob threshold; bypassing dedup", + ); + self.slow_store + .get_part(key, writer.borrow_mut(), offset, length) + .await + .err_tip(|| "In FastSlowStore::get_part huge-blob bypass")?; + self.metrics + .slow_store_downloaded_bytes + .fetch_add(writer.get_bytes_written(), Ordering::Acquire); + return Ok(()); + } + let mut writer = Some(writer); // Drive the dedup loader. Two distinct paths: @@ -795,6 +851,10 @@ struct FastSlowStoreMetrics { help = "Number of times the fast store reported has() == Some but the subsequent get_part returned NotFound (stale eviction-map entry); request fell through to populate from the slow store" )] fast_store_stale_map_falls_through: AtomicU64, + #[metric( + help = "Number of get_part calls that bypassed the populating-digests dedup because the blob met the huge-blob threshold and were served directly from the slow store" + )] + huge_blob_dedup_bypasses: AtomicU64, } /// Maximum time a follower will wait on the leader-populator before @@ -806,4 +866,12 @@ struct FastSlowStoreMetrics { /// single slow read into a fan-out of `DEADLINE_EXCEEDED` errors. const LEADER_WAIT_TIMEOUT: Duration = Duration::from_secs(60); +/// Default value for [`FastSlowStore::bypass_dedup_threshold_bytes`] when +/// the spec leaves it unset (zero). 256 MiB is large enough that typical +/// Bazel build outputs (sources, jars, intermediate artifacts) still go +/// through dedup and benefit from fast-tier caching, but small enough to +/// catch container-image layers and other multi-hundred-MB blobs whose +/// dedup-leader transfer reliably exceeds `LEADER_WAIT_TIMEOUT`. +const DEFAULT_BYPASS_DEDUP_THRESHOLD_BYTES: u64 = 256 * 1024 * 1024; + default_health_status_indicator!(FastSlowStore); diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index 53dd12387..428ab2518 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -13,7 +13,7 @@ // limitations under the License. use core::pin::Pin; -use core::sync::atomic::{AtomicBool, Ordering}; +use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use async_trait::async_trait; @@ -47,6 +47,7 @@ fn make_stores_direction( slow: StoreSpec::Memory(MemorySpec::default()), fast_direction, slow_direction, + bypass_dedup_threshold_bytes: 0, }, fast_store.clone(), slow_store.clone(), @@ -350,6 +351,7 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { slow: StoreSpec::Memory(MemorySpec::default()), fast_direction: StoreDirection::default(), slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: 0, }, fast_store, slow_store, @@ -393,6 +395,7 @@ async fn ignore_value_in_fast_store() -> Result<(), Error> { slow: StoreSpec::Memory(MemorySpec::default()), fast_direction: StoreDirection::default(), slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: 0, }, fast_store.clone(), slow_store, @@ -418,6 +421,7 @@ async fn has_checks_fast_store_when_noop() -> Result<(), Error> { slow: StoreSpec::Noop(NoopSpec::default()), fast_direction: StoreDirection::default(), slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: 0, }; let fast_slow_store = Arc::new(FastSlowStore::new( &fast_slow_store_config, @@ -654,6 +658,7 @@ fn make_stores_with_lazy_slow() -> (Store, Store, Store) { slow: StoreSpec::Memory(MemorySpec::default()), fast_direction: StoreDirection::default(), slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: 0, }, fast_store.clone(), slow_store.clone(), @@ -705,3 +710,273 @@ async fn lazy_not_found_syncs_to_fast_store() -> Result<(), Error> { ); Ok(()) } + +// --------------------------------------------------------------------------- +// Huge-blob dedup bypass. +// +// For multi-GB blobs the leader/follower dedup is a net loss: the leader's +// transfer reliably exceeds `LEADER_WAIT_TIMEOUT`, every concurrent follower +// falls through to its own slow-store read anyway, and populating the fast +// tier with the huge blob evicts a large number of smaller, more-useful +// entries. The bypass short-circuits that pathology by streaming straight +// from the slow store and skipping the populate. +// +// The tests here use a tiny threshold (so the test stays bounded in memory) +// and an instrumented slow store that counts get_part invocations. Under +// dedup we expect a single slow-store call regardless of fan-in; under +// bypass we expect one per caller. +// --------------------------------------------------------------------------- + +/// `StoreDriver` that wraps a `MemoryStore` and counts how many times +/// `get_part` is entered. Used to assert dedup behaviour: under dedup +/// the leader is the only `get_part` invocation; under bypass each +/// concurrent caller drives its own. +#[derive(MetricsComponent)] +struct CountingSlowStore { + inner: Arc, + get_part_calls: Arc, +} + +#[async_trait] +impl StoreDriver for CountingSlowStore { + async fn has_with_results( + self: Pin<&Self>, + keys: &[StoreKey<'_>], + results: &mut [Option], + ) -> Result<(), Error> { + Pin::new(self.inner.as_ref()) + .has_with_results(keys, results) + .await + } + + async fn update( + self: Pin<&Self>, + key: StoreKey<'_>, + reader: nativelink_util::buf_channel::DropCloserReadHalf, + size_info: nativelink_util::store_trait::UploadSizeInfo, + ) -> Result<(), Error> { + Pin::new(self.inner.as_ref()) + .update(key, reader, size_info) + .await + } + + async fn get_part( + self: Pin<&Self>, + key: StoreKey<'_>, + writer: &mut nativelink_util::buf_channel::DropCloserWriteHalf, + offset: u64, + length: Option, + ) -> Result<(), Error> { + self.get_part_calls.fetch_add(1, Ordering::AcqRel); + Pin::new(self.inner.as_ref()) + .get_part(key, writer, offset, length) + .await + } + + fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { + self + } + fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) { + self + } + fn as_any_arc(self: Arc) -> Arc { + self + } + fn register_remove_callback( + self: Arc, + _callback: Arc, + ) -> Result<(), Error> { + Ok(()) + } +} + +default_health_status_indicator!(CountingSlowStore); + +#[nativelink_test] +async fn huge_blob_bypasses_dedup_and_skips_populate() -> Result<(), Error> { + // 1 KiB threshold so a 2 KiB blob trips the bypass. + const THRESHOLD: u64 = 1024; + const BLOB_SIZE: usize = 2 * 1024; + const READERS: usize = 8; + + let original_data = make_random_data(BLOB_SIZE); + let digest = DigestInfo::try_new(VALID_HASH, original_data.len()).unwrap(); + + // Seed the slow tier directly so a fast-tier miss is the realistic + // starting state. Populating through the wrapper would also fill + // the fast store via the dual-write path of `update`. + let inner_slow = MemoryStore::new(&MemorySpec::default()); + inner_slow + .update_oneshot(digest, original_data.clone().into()) + .await?; + let calls = Arc::new(AtomicUsize::new(0)); + let counting = Arc::new(CountingSlowStore { + inner: inner_slow, + get_part_calls: calls.clone(), + }); + let fast_store = Store::new(MemoryStore::new(&MemorySpec::default())); + let slow_store = Store::new(counting); + let fast_slow_store = Store::new(FastSlowStore::new( + &FastSlowSpec { + fast: StoreSpec::Memory(MemorySpec::default()), + slow: StoreSpec::Memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), + slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: THRESHOLD, + }, + fast_store.clone(), + slow_store, + )); + + // Fan out READERS concurrent get_part calls. + let mut joins = Vec::with_capacity(READERS); + for _ in 0..READERS { + let store = fast_slow_store.clone(); + let expected = original_data.clone(); + joins.push(tokio::spawn(async move { + let got = store.get_part_unchunked(digest, 0, None).await?; + assert_eq!(got.as_ref(), expected.as_slice(), "data mismatch"); + Ok::<_, Error>(()) + })); + } + for j in joins { + j.await + .map_err(|e| make_err!(Code::Internal, "join failed: {e}"))??; + } + + // Bypass should drive one slow-store call per reader, not one for + // all of them. + assert_eq!( + calls.load(Ordering::Acquire), + READERS, + "expected {READERS} slow-store get_part calls under bypass, observed dedup" + ); + + // And the fast tier must not have been populated. + assert!( + fast_store.has(digest).await?.is_none(), + "huge-blob bypass populated the fast tier; that defeats the point of the bypass" + ); + + Ok(()) +} + +#[nativelink_test] +async fn small_blob_still_dedups_and_populates() -> Result<(), Error> { + // 1 MiB threshold; a 64-byte blob sits comfortably below it so the + // existing dedup path runs. + const THRESHOLD: u64 = 1024 * 1024; + const BLOB_SIZE: usize = 64; + const READERS: usize = 8; + + let original_data = make_random_data(BLOB_SIZE); + let digest = DigestInfo::try_new(VALID_HASH, original_data.len()).unwrap(); + + let inner_slow = MemoryStore::new(&MemorySpec::default()); + inner_slow + .update_oneshot(digest, original_data.clone().into()) + .await?; + let calls = Arc::new(AtomicUsize::new(0)); + let counting = Arc::new(CountingSlowStore { + inner: inner_slow, + get_part_calls: calls.clone(), + }); + let fast_store = Store::new(MemoryStore::new(&MemorySpec::default())); + let slow_store = Store::new(counting); + let fast_slow_store = Store::new(FastSlowStore::new( + &FastSlowSpec { + fast: StoreSpec::Memory(MemorySpec::default()), + slow: StoreSpec::Memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), + slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: THRESHOLD, + }, + fast_store.clone(), + slow_store, + )); + + let mut joins = Vec::with_capacity(READERS); + for _ in 0..READERS { + let store = fast_slow_store.clone(); + let expected = original_data.clone(); + joins.push(tokio::spawn(async move { + let got = store.get_part_unchunked(digest, 0, None).await?; + assert_eq!(got.as_ref(), expected.as_slice(), "data mismatch"); + Ok::<_, Error>(()) + })); + } + for j in joins { + j.await + .map_err(|e| make_err!(Code::Internal, "join failed: {e}"))??; + } + + // Dedup should collapse all readers down to a single slow-store + // call. + assert_eq!( + calls.load(Ordering::Acquire), + 1, + "small-blob path lost dedup; observed >1 slow-store call" + ); + + // And the fast tier should be populated. + assert!( + fast_store.has(digest).await?.is_some(), + "small-blob path failed to populate the fast tier" + ); + + Ok(()) +} + +#[nativelink_test] +async fn bypass_threshold_is_inclusive_at_exact_size() -> Result<(), Error> { + // The bypass condition is `size >= threshold`. A blob whose digest + // size exactly equals the threshold must take the bypass path. + const SIZE: usize = 4096; + const READERS: usize = 4; + + let original_data = make_random_data(SIZE); + let digest = DigestInfo::try_new(VALID_HASH, original_data.len()).unwrap(); + + let inner_slow = MemoryStore::new(&MemorySpec::default()); + inner_slow + .update_oneshot(digest, original_data.clone().into()) + .await?; + let calls = Arc::new(AtomicUsize::new(0)); + let counting = Arc::new(CountingSlowStore { + inner: inner_slow, + get_part_calls: calls.clone(), + }); + let fast_store = Store::new(MemoryStore::new(&MemorySpec::default())); + let slow_store = Store::new(counting); + let fast_slow_store = Store::new(FastSlowStore::new( + &FastSlowSpec { + fast: StoreSpec::Memory(MemorySpec::default()), + slow: StoreSpec::Memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), + slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: SIZE as u64, + }, + fast_store.clone(), + slow_store, + )); + + let mut joins = Vec::with_capacity(READERS); + for _ in 0..READERS { + let store = fast_slow_store.clone(); + joins.push(tokio::spawn(async move { + let _ignored = store.get_part_unchunked(digest, 0, None).await?; + Ok::<_, Error>(()) + })); + } + for j in joins { + j.await + .map_err(|e| make_err!(Code::Internal, "join failed: {e}"))??; + } + + assert_eq!( + calls.load(Ordering::Acquire), + READERS, + "size == threshold should bypass dedup but observed dedup", + ); + Ok(()) +} diff --git a/nativelink-worker/tests/local_worker_test.rs b/nativelink-worker/tests/local_worker_test.rs index 23199e988..1a2b83661 100644 --- a/nativelink-worker/tests/local_worker_test.rs +++ b/nativelink-worker/tests/local_worker_test.rs @@ -434,6 +434,7 @@ async fn new_local_worker_creates_work_directory_test() -> Result<(), Error> { slow: StoreSpec::Memory(MemorySpec::default()), fast_direction: StoreDirection::default(), slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: 0, }, Store::new( ::new(&FilesystemSpec { @@ -475,6 +476,7 @@ async fn new_local_worker_removes_work_directory_before_start_test() -> Result<( slow: StoreSpec::Memory(MemorySpec::default()), fast_direction: StoreDirection::default(), slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: 0, }, Store::new( ::new(&FilesystemSpec { diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs index ab6a91f9c..413eedb8c 100644 --- a/nativelink-worker/tests/running_actions_manager_test.rs +++ b/nativelink-worker/tests/running_actions_manager_test.rs @@ -137,6 +137,7 @@ mod tests { slow: StoreSpec::Memory(slow_config), fast_direction: StoreDirection::default(), slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: 0, }, Store::new(fast_store.clone()), Store::new(slow_store.clone()), From cc6dd6f09a04daa5afa506642f7de4c86f9bc1bb Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Thu, 7 May 2026 12:28:47 +0100 Subject: [PATCH 15/18] gcs_store, filesystem_store: lightweight check_health probes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The default StoreDriver::check_health performs a full update_oneshot + has + get_part_unchunked roundtrip. Under any meaningful slow-store load (e.g. concurrent multi-GB blob reads saturating the network) that roundtrip queues behind production traffic and easily exceeds the per-indicator budget, causing /status to return 503 even when the pod is otherwise functional. Kubelet treats those as probe failures and eventually restarts the pod, dropping every connected client. Override check_health on both registered indicators with a probe that shares no path with production traffic: * GcsStore: a single object_exists() call against a fixed never-existing path. Metadata roundtrip only — independent of body-transfer bandwidth and the upload buffer pool. * FilesystemStore: a stat() of the configured content_path. A single syscall, microseconds on a healthy mount; bounded with a timeout so a hung NFS / EBS mount cannot wedge the indicator. Both paths are bounded by a 2 s ceiling so they remain well inside the HealthServer per-indicator budget. --- nativelink-store/src/filesystem_store.rs | 58 ++++++++++++++++++++++- nativelink-store/src/gcs_store.rs | 60 ++++++++++++++++++++++-- 2 files changed, 113 insertions(+), 5 deletions(-) diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 451323988..cd0672b3b 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -41,6 +41,7 @@ use nativelink_util::store_trait::{ }; use tokio::io::{AsyncReadExt, AsyncWriteExt, Take}; use tokio::sync::Semaphore; +use tokio::time::timeout; use tokio_stream::wrappers::ReadDirStream; use tracing::{debug, error, info, trace, warn}; @@ -1186,7 +1187,60 @@ impl HealthStatusIndicator for FilesystemStore { "FilesystemStore" } - async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { - StoreDriver::check_health(Pin::new(self), namespace).await + /// Lightweight health check: stat the configured `content_path` + /// directory. Proves the underlying mount is present and we have + /// permission to read it — without writing a probe blob, hashing + /// it, and reading it back through the eviction map. + /// + /// The default `StoreDriver::check_health` performs a full + /// `update_oneshot` + `has` + `get_part_unchunked` roundtrip. Each + /// of those operations contends on the same write-semaphore and + /// eviction-map locks as in-flight production traffic; under load + /// (large blob streams, dense eviction churn) the probe queues + /// behind real work and overruns the per-indicator budget, + /// surfacing as an HTTP 503 even when the disk itself is fine. + /// A `metadata` call is a single `stat()` syscall — bounded, + /// shared with no production code path, and a strict superset of + /// what a kubelet probe needs to know. + async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus { + /// Per-check physical ceiling. A `stat()` of a healthy local + /// directory completes in microseconds; this bound exists only + /// so a hung NFS / EBS / persistent-volume mount cannot wedge + /// the indicator. + const HEALTH_PROBE_TIMEOUT: Duration = Duration::from_secs(2); + + let content_path = self.shared_context.content_path.clone(); + let stat = tokio::fs::metadata(content_path.clone()); + match timeout(HEALTH_PROBE_TIMEOUT, stat).await { + Ok(Ok(meta)) if meta.is_dir() => { + HealthStatus::new_ok(self, "FilesystemStore::check_health: ok".into()) + } + Ok(Ok(_)) => HealthStatus::new_failed( + self, + format!( + "FilesystemStore::check_health: content_path {content_path} is not a directory" + ) + .into(), + ), + Ok(Err(e)) => { + warn!( + ?e, + %content_path, + "FilesystemStore::check_health: stat errored", + ); + HealthStatus::new_failed( + self, + format!("FilesystemStore::check_health: stat errored: {e}").into(), + ) + } + Err(_) => HealthStatus::new_failed( + self, + format!( + "FilesystemStore::check_health: stat of {content_path} exceeded {} s timeout", + HEALTH_PROBE_TIMEOUT.as_secs() + ) + .into(), + ), + } } } diff --git a/nativelink-store/src/gcs_store.rs b/nativelink-store/src/gcs_store.rs index 9266c17cb..c83ffaa1d 100644 --- a/nativelink-store/src/gcs_store.rs +++ b/nativelink-store/src/gcs_store.rs @@ -14,6 +14,7 @@ use core::fmt::Debug; use core::pin::Pin; +use core::time::Duration; use std::borrow::Cow; use std::sync::Arc; @@ -32,7 +33,8 @@ use nativelink_util::store_trait::{ RemoveItemCallback, StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo, }; use rand::Rng; -use tokio::time::sleep; +use tokio::time::{sleep, timeout}; +use tracing::warn; use crate::cas_utils::is_zero_digest; use crate::gcs_client::client::{GcsClient, GcsOperations}; @@ -486,7 +488,59 @@ where "GcsStore" } - async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { - StoreDriver::check_health(Pin::new(self), namespace).await + /// Lightweight health check: a single `object_exists` call against + /// a deterministic, never-existing object path. Proves the bucket + /// is reachable, the credentials are valid, and the GCS API is + /// answering — without sharing bandwidth, connection pool, or + /// upload buffers with in-flight production traffic. + /// + /// The default `StoreDriver::check_health` performs a full + /// `update_oneshot` + `has` + `get_part_unchunked` roundtrip; under + /// any meaningful slow-store load (e.g. concurrent multi-GB blob + /// pulls saturating the network), that roundtrip queues behind + /// production traffic and easily exceeds the per-indicator budget. + /// Each timeout shows up at the kubelet as an HTTP 503 on the + /// liveness/readiness path even though the pod is otherwise + /// functional, eventually triggering a probe-failure restart that + /// drops every connected client. + /// + /// `object_exists` issues a metadata HEAD-equivalent: a few hundred + /// bytes on the wire and a single round-trip, independent of the + /// store's body-transfer bandwidth. A `false` result is the success + /// case (proves the API path works); only an outright `Err` + /// indicates an unhealthy slow store. + async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus { + /// Per-check physical ceiling. Tight enough to stay well under + /// the `HealthServer` per-indicator budget, loose enough to + /// absorb a slow round-trip during transient network blips. + const HEALTH_PROBE_TIMEOUT: Duration = Duration::from_secs(2); + + // Path is deliberately fixed and obviously-not-real. The probe + // does not depend on the bucket containing any object; it only + // depends on GCS answering "no, that doesn't exist." + let probe_path = ObjectPath::new( + self.bucket.clone(), + "__nativelink_health_probe__/does-not-exist", + ); + + let probe = self.client.object_exists(&probe_path); + match timeout(HEALTH_PROBE_TIMEOUT, probe).await { + Ok(Ok(_)) => HealthStatus::new_ok(self, "GcsStore::check_health: ok".into()), + Ok(Err(e)) => { + warn!(?e, "GcsStore::check_health: object_exists errored"); + HealthStatus::new_failed( + self, + format!("GcsStore::check_health: object_exists errored: {e}").into(), + ) + } + Err(_) => HealthStatus::new_failed( + self, + format!( + "GcsStore::check_health: probe exceeded {} s timeout", + HEALTH_PROBE_TIMEOUT.as_secs() + ) + .into(), + ), + } } } From 7a89d6b2ca2c284ca851e71b8abc5e52374f1ac1 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Thu, 7 May 2026 14:34:31 +0100 Subject: [PATCH 16/18] redis_store: raise check_health PING ceiling from 2s to 4s MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous 2-second PING ceiling was tight enough that a routine Redis BGSAVE fork would push every RedisStore indicator over the line simultaneously: on an 11 GB production master under load we observe fork-induced pauses around 3 seconds, and with three RedisStore indicators (AC, CAS, scheduler) all PING-ing through the same connection pool, all three return Failed in lockstep — surfacing as a 503 on /status and a kubelet probe-failure event even though the Redis service is otherwise healthy. Verified by capturing /status response bodies during a flap window: [{"namespace": ".../SCHEDULER_STORE/RedisStore", "status": {"Failed": {"message": "RedisStore::check_health: PING exceeded 2 s timeout"}}}, {"namespace": ".../CAS_REDIS_STORE/RedisStore", "status": {"Failed": {"message": "RedisStore::check_health: PING exceeded 2 s timeout"}}}, {"namespace": ".../AC_REDIS_STORE/RedisStore", "status": {"Failed": {"message": "RedisStore::check_health: PING exceeded 2 s timeout"}}}] The HealthServer's per-indicator wrapper budget is 5 s (DEFAULT_HEALTH_CHECK_TIMEOUT_SECONDS), so 4 s leaves a small safety margin while comfortably absorbing the BGSAVE worst case we have observed in practice. --- nativelink-store/src/redis_store.rs | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index 7daff53c5..f9aea2742 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -1114,11 +1114,24 @@ where /// is reachable and the master is accepting commands; that is /// the only invariant a kubelet probe needs. async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus { - /// Per-check physical ceiling. Tight enough to stay well - /// under `HealthServer`'s default per-indicator budget; - /// loose enough to absorb a normally-slow PING during a - /// `BGSAVE` fork or sentinel rebalance. - const PING_TIMEOUT: Duration = Duration::from_secs(2); + /// Per-check physical ceiling. The default `HealthServer` + /// per-indicator budget is 5 s + /// (`DEFAULT_HEALTH_CHECK_TIMEOUT_SECONDS`); this stays a + /// margin under that so the wrapper has time to receive our + /// answer. + /// + /// 4 s is the smallest ceiling that absorbs a normally-slow + /// PING during the realistic worst case for our deployments: + /// a `BGSAVE` `fork()` on a multi-GB Redis instance under + /// load (the parent process is briefly paused while the + /// kernel sets up copy-on-write page tables; on our + /// production sentinel cluster we have observed pauses up + /// to ~3 s on an 11 GB master). A tighter ceiling caused + /// every `RedisStore` indicator to flap simultaneously + /// during routine RDB checkpoints, surfacing as 503s on + /// `/status` and probe-failure events even though Redis was + /// otherwise healthy. + const PING_TIMEOUT: Duration = Duration::from_secs(4); let mut client = match self.get_client().await { Ok(c) => c, From ccdfee2752fc4d6dac386325efdb09a2dc7c93eb Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Sat, 9 May 2026 01:49:43 +0100 Subject: [PATCH 17/18] connection_manager: replace usize counter with Semaphore RAII permit Production worker pods were OOMKilled (exit 137) after the ConnectionManager's `available_connections: usize` counter underflowed to ~u64::MAX while `waiting_connections` climbed unbounded. The manual decrement-on-issue / increment-on-Dropped accounting balances on paper, but a leak path was occasionally missing a `Dropped` delivery during tonic transport errors and task aborts. Switch to `Arc` with `OwnedSemaphorePermit` on the Connection. RAII makes leakage structurally impossible: every Drop path (panic, abort, dropped oneshot receiver, transport error) releases the permit exactly once. Adds 3 integration tests covering the request/acquire/release cycle, an aborted-caller-future cleanup scenario, and the MAX_CONCURRENT ceiling. Co-Authored-By: Claude Opus 4.7 (1M context) --- nativelink-util/src/connection_manager.rs | 127 +++++++-- .../tests/connection_manager_test.rs | 260 ++++++++++++++++++ 2 files changed, 358 insertions(+), 29 deletions(-) create mode 100644 nativelink-util/tests/connection_manager_test.rs diff --git a/nativelink-util/src/connection_manager.rs b/nativelink-util/src/connection_manager.rs index eaa5d0d99..f92772a63 100644 --- a/nativelink-util/src/connection_manager.rs +++ b/nativelink-util/src/connection_manager.rs @@ -22,7 +22,7 @@ use futures::Future; use futures::stream::{FuturesUnordered, StreamExt, unfold}; use nativelink_config::stores::Retry; use nativelink_error::{Code, Error, make_err}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; use tonic::transport::{Channel, Endpoint, channel}; use tracing::{debug, error, info, warn}; @@ -95,8 +95,30 @@ struct ConnectionManagerWorker { endpoints: Vec<(ConnectionIndex, Endpoint)>, /// The channel used to communicate between a Connection and the worker. connection_tx: mpsc::UnboundedSender, - /// The number of connections that are currently allowed to be made. - available_connections: usize, + /// Semaphore that gates the maximum number of in-flight `Connection` + /// objects. + /// + /// **Why a semaphore and not a `usize` counter.** Earlier versions + /// tracked permits with `available_connections: usize`, decrementing + /// in `provide_channel` and re-incrementing on `ConnectionRequest::Dropped`. + /// In production we observed the counter underflowing + /// (`u64::MAX − N`) while `waiting_connections` climbed unbounded — a + /// permit-leak whose surface symptom was the worker process growing + /// memory until the kernel `OOMKilled` it (exit 137), with the + /// in-flight action stranded as `Executing` in Redis. The exact + /// leaking code path was hard to pin down because the protocol *looks* + /// balanced on paper (one decrement per `provide_channel`, one + /// increment per `Dropped` event from `Connection::drop`); somewhere + /// in the worker / tonic interaction, a `Dropped` was occasionally + /// not being delivered or processed. + /// + /// `tokio::sync::Semaphore`'s `OwnedSemaphorePermit` makes leakage + /// **structurally impossible**: the permit is acquired in + /// `provide_channel`, handed to the `Connection` as a private field, + /// and released exactly once when the `Connection` is dropped (Rust's + /// RAII guarantee). No code path — panic, task cancellation, dropped + /// receiver, anything — can release the permit twice or leak it. + semaphore: Arc, /// Channels that are currently being connected. connecting_channels: FuturesUnordered + Send>>>, /// Connected channels that are available for use. @@ -136,14 +158,22 @@ impl ConnectionManager { .collect(); if max_concurrent_requests == 0 { - max_concurrent_requests = usize::MAX; + // `Semaphore::MAX_PERMITS` is `usize::MAX >> 3` in tokio; + // any value at or above is treated by `Semaphore` as + // effectively unbounded for our purposes. + max_concurrent_requests = Semaphore::MAX_PERMITS; + } else { + // Defensive cap: callers shouldn't be passing values larger + // than `MAX_PERMITS`, but if they do, clamp rather than + // panic. + max_concurrent_requests = max_concurrent_requests.min(Semaphore::MAX_PERMITS); } if connections_per_endpoint == 0 { connections_per_endpoint = 1; } let worker = ConnectionManagerWorker { endpoints, - available_connections: max_concurrent_requests, + semaphore: Arc::new(Semaphore::new(max_concurrent_requests)), connection_tx, connecting_channels: FuturesUnordered::new(), available_channels: VecDeque::new(), @@ -309,49 +339,75 @@ impl ConnectionManagerWorker { // This must never be made async otherwise the select may cancel it. fn handle_worker(&mut self, reason: String, tx: oneshot::Sender) { - if let Some(channel) = (self.available_connections > 0) - .then_some(()) - .and_then(|()| self.available_channels.pop_front()) + // Permit is acquired here so the count is held by the + // `Connection` that gets handed back to the requester. RAII on + // the permit means we don't have to track this in our own + // counter and there is no path that can leak it. + let maybe_permit = self.semaphore.clone().try_acquire_owned().ok(); + if let Some(permit) = maybe_permit + && let Some(channel) = self.available_channels.pop_front() { debug!(reason, "ConnectionManager: request running"); - self.provide_channel(channel, tx); + self.provide_channel(channel, tx, permit); } else { debug!( - available_connections = self.available_connections, + available_permits = self.semaphore.available_permits(), available_channels = self.available_channels.len(), waiting_connections = self.waiting_connections.len(), reason, "ConnectionManager: no connection available, request queued", ); self.waiting_connections.push_back((reason, tx)); + // `maybe_permit` (if any) is dropped here, releasing the + // permit back to the semaphore — we only consume it when we + // successfully matched it with a channel. } } - fn provide_channel(&mut self, channel: EstablishedChannel, tx: oneshot::Sender) { - // We decrement here because we create Connection, this will signal when - // it is Dropped and therefore increment this again. - self.available_connections -= 1; + fn provide_channel( + &mut self, + channel: EstablishedChannel, + tx: oneshot::Sender, + permit: OwnedSemaphorePermit, + ) { + // The permit lives on the `Connection`. When the `Connection` + // is dropped (normal completion, transport error, oneshot + // receiver gone — any path), the permit is released to the + // semaphore exactly once. No code in this struct touches the + // permit count directly. drop(tx.send(Connection { tx: self.connection_tx.clone(), pending_channel: Some(channel.channel.clone()), channel, + _permit: permit, })); } fn maybe_available_connection(&mut self) { - while self.available_connections > 0 - && !self.waiting_connections.is_empty() - && !self.available_channels.is_empty() - { - if let Some(channel) = self.available_channels.pop_front() { - if let Some((reason, tx)) = self.waiting_connections.pop_front() { - debug!(reason, "ConnectionManager: channel available, running"); - self.provide_channel(channel, tx); - } else { - // This should never happen, but better than an unwrap. - self.available_channels.push_front(channel); - } - } + // Pump as many waiting requests as we have (permit, channel) + // pairs available. Each loop iteration must acquire a fresh + // permit so it can be moved into the resulting `Connection`. + while !self.waiting_connections.is_empty() && !self.available_channels.is_empty() { + let Some(permit) = self.semaphore.clone().try_acquire_owned().ok() else { + // No more permits — leave the request queued; future + // permit releases will re-trigger this loop via the + // `Dropped` path. + break; + }; + let Some(channel) = self.available_channels.pop_front() else { + drop(permit); + break; + }; + let Some((reason, tx)) = self.waiting_connections.pop_front() else { + // This should never happen given the loop guard, but + // putting the channel back rather than dropping it + // matches the previous defensive behaviour. + self.available_channels.push_front(channel); + drop(permit); + break; + }; + debug!(reason, "ConnectionManager: channel available, running"); + self.provide_channel(channel, tx, permit); } } @@ -359,10 +415,15 @@ impl ConnectionManagerWorker { fn handle_connection(&mut self, request: ConnectionRequest) { match request { ConnectionRequest::Dropped(maybe_channel) => { + // The Connection's `_permit` was released when the + // Connection dropped (RAII), *before* this message was + // processed — so by the time we get here a permit is + // already available. We just need to put any + // outstanding pending-channel back into the pool and + // try to wake a waiter. if let Some(channel) = maybe_channel { self.available_channels.push_back(channel); } - self.available_connections += 1; self.maybe_available_connection(); } ConnectionRequest::Connected(channel) => { @@ -394,7 +455,8 @@ impl ConnectionManagerWorker { /// re-connecting the underlying channel on error. It depends on users /// reporting all errors. /// NOTE: This should never be cloneable because its lifetime is linked to the -/// `ConnectionManagerWorker::available_connections`. +/// semaphore permit it carries — `_permit` is released exactly once, +/// when the `Connection` drops. #[derive(Debug)] pub struct Connection { /// Communication with `ConnectionManagerWorker` to inform about transport @@ -406,6 +468,13 @@ pub struct Connection { pending_channel: Option, /// The identifier to send to `tx`. channel: EstablishedChannel, + /// Semaphore permit gating the maximum number of concurrent + /// `Connection` objects. Held purely for its `Drop` side effect + /// (releases the permit back to the worker's semaphore). Never + /// read; the leading underscore is the conventional marker. + /// Replaces the prior manual `available_connections: usize` + /// counter that was observed underflowing in production. + _permit: OwnedSemaphorePermit, } impl Drop for Connection { diff --git a/nativelink-util/tests/connection_manager_test.rs b/nativelink-util/tests/connection_manager_test.rs new file mode 100644 index 000000000..f8ae581e7 --- /dev/null +++ b/nativelink-util/tests/connection_manager_test.rs @@ -0,0 +1,260 @@ +// Copyright 2026 The NativeLink Authors. All rights reserved. +// +// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// See LICENSE file for details +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Tests for `ConnectionManager`'s permit accounting. +//! +//! The bug these tests exist to prevent: in production we observed +//! `available_connections: 18446744073709551589` (`u64::MAX − 26`) while +//! `waiting_connections` climbed unbounded, ultimately killing the worker +//! process via `OOMKilled` (exit 137). Switching from a manual `usize` +//! counter to `Arc` with `OwnedSemaphorePermit` makes the leak +//! structurally impossible — these tests pin that property by exercising +//! the full request-acquire-release cycle through the public API many +//! times over a tight permit budget. With a leak, the cycle eventually +//! blocks forever; without one, every iteration completes inside the +//! per-call timeout. + +use core::pin::Pin; +use core::time::Duration; +use std::sync::Arc; + +use nativelink_config::stores::Retry; +use nativelink_error::Error; +use nativelink_macro::nativelink_test; +use nativelink_proto::google::bytestream::byte_stream_server::{ByteStream, ByteStreamServer}; +use nativelink_proto::google::bytestream::{ + QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest, + WriteResponse, +}; +use nativelink_util::background_spawn; +use nativelink_util::connection_manager::ConnectionManager; +use pretty_assertions::assert_eq; +use tokio::time::timeout; +use tokio_stream::Stream; +use tonic::transport::server::TcpIncoming; +use tonic::transport::{Endpoint, Server}; +use tonic::{Request, Response, Status, Streaming}; + +/// Trivial `ByteStream` service that errors on every method. We don't +/// actually call it — its sole purpose is to give the `tonic::Server` +/// an inner `Service` so `serve_with_incoming` accepts the listener. +/// `tonic::Channel`'s `poll_ready` only requires that the underlying +/// HTTP/2 connection can be established and stay open, which a vanilla +/// tonic Server with any service registered satisfies. +#[derive(Clone)] +struct FakeByteStream; + +#[tonic::async_trait] +impl ByteStream for FakeByteStream { + type ReadStream = Pin> + Send + 'static>>; + + async fn read( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("fake")) + } + + async fn write( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("fake")) + } + + async fn query_write_status( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("fake")) + } +} + +/// Stand up a tonic Server on `127.0.0.1:0` with the trivial +/// `FakeByteStream` registered. Returns an `Endpoint` pointing at it, +/// which `ConnectionManager` will drive through its full +/// `provide_channel` / `Connection` lifecycle paths. +async fn fake_grpc_server_endpoint() -> Endpoint { + let listener = TcpIncoming::bind("127.0.0.1:0".parse().unwrap()).unwrap(); + let port = listener.local_addr().unwrap().port(); + background_spawn!("connection_manager_test_server", async move { + Server::builder() + .add_service(ByteStreamServer::new(FakeByteStream)) + .serve_with_incoming(listener) + .await + .unwrap(); + }); + Endpoint::from_shared(format!("http://127.0.0.1:{port}")).unwrap() +} + +/// Identity jitter so retry timing stays predictable in tests. +fn no_jitter() -> Arc Duration + Send + Sync> { + Arc::new(|d| d) +} + +/// Loop the request-acquire-drop cycle far more times than the permit +/// budget. Pre-fix, `available_connections` would underflow within the +/// first few iterations under any error path; post-fix, the +/// `OwnedSemaphorePermit` released by `Connection::drop` keeps the budget +/// balanced indefinitely. A 5-second per-call timeout is the failure +/// signal: the test fails if any single acquire blocks beyond it. +/// +/// `connections_per_endpoint` matches `MAX_CONCURRENT` so the **permit** +/// is the gate being tested, not the channel pool. +#[nativelink_test] +async fn permits_released_on_drop_no_leak() -> Result<(), Error> { + const MAX_CONCURRENT: usize = 2; + const ITERATIONS: usize = 100; + + let endpoint = fake_grpc_server_endpoint().await; + let cm = ConnectionManager::new( + vec![endpoint], + /* connections_per_endpoint = */ MAX_CONCURRENT, + MAX_CONCURRENT, + Retry::default(), + no_jitter(), + ); + + for i in 0..ITERATIONS { + let c1 = timeout(Duration::from_secs(5), cm.connection(format!("iter-{i}-a"))) + .await + .unwrap_or_else(|_| panic!("iter {i}: first acquire blocked >5s — permit leak"))?; + let c2 = timeout(Duration::from_secs(5), cm.connection(format!("iter-{i}-b"))) + .await + .unwrap_or_else(|_| panic!("iter {i}: second acquire blocked >5s — permit leak"))?; + drop(c1); + drop(c2); + } + + Ok(()) +} + +/// When the requester drops the future returned by `connection(...)` +/// *before* the worker has handed the `Connection` over, the underlying +/// `oneshot::Sender::send` returns `Err(Connection)` inside +/// `provide_channel`. Pre-fix that path was particularly brittle: it +/// decremented the counter and depended on the synchronously-generated +/// `Connection::drop` → `ConnectionRequest::Dropped` round-trip +/// reaching the worker's queue *and* getting processed. Post-fix the +/// `OwnedSemaphorePermit` releases the moment the local `Connection` +/// goes out of scope on the worker's stack — no inter-task message +/// required. We verify that permit availability returns to baseline +/// even when caller futures are aborted in flight. +#[nativelink_test] +async fn aborted_caller_future_does_not_leak_permits() -> Result<(), Error> { + const MAX_CONCURRENT: usize = 2; + + let endpoint = fake_grpc_server_endpoint().await; + let cm = Arc::new(ConnectionManager::new( + vec![endpoint], + /* connections_per_endpoint = */ MAX_CONCURRENT, + MAX_CONCURRENT, + Retry::default(), + no_jitter(), + )); + + // Round 1: spawn `MAX_CONCURRENT * 5` acquire futures and abort + // them while still holding their `Connection`. Each task that + // managed to take a permit must release it when its task is + // aborted (Connection::drop runs on field drop). The queued tasks + // (those that hadn't taken a permit yet) drop their oneshot + // receiver, and when the worker later tries to deliver to them the + // tx.send fails and the un-deliverable Connection is dropped on + // the worker's stack — releasing its permit too. + let mut handles = Vec::new(); + for i in 0..(MAX_CONCURRENT * 5) { + let cm = Arc::clone(&cm); + handles.push(tokio::spawn(async move { + // Bind to `_conn` (not `_`) so the Connection lives until + // task abort; bare `let _ = ...` would drop it immediately + // and defeat the test. + let _conn = cm.connection(format!("aborted-{i}")).await; + futures::future::pending::<()>().await + })); + } + // Give the worker time to populate the channel pool, drain the + // request queue, and let the first MAX_CONCURRENT tasks acquire + // permits. + tokio::time::sleep(Duration::from_millis(100)).await; + for h in handles { + h.abort(); + } + // Yield long enough for the abort-driven drops to propagate the + // permit release back to the semaphore, *and* for the worker to + // process the cascade of `Dropped` messages that drain stale + // entries from `waiting_connections`. + tokio::time::sleep(Duration::from_millis(500)).await; + + // Round 2: full-budget acquire. Pre-fix the round-1 aborts could + // leave the counter underflowed or saturated, and round-2 acquires + // would block forever; post-fix every permit is back. + let c1 = timeout(Duration::from_secs(5), cm.connection("post-abort-a".into())) + .await + .expect("post-abort acquire 1 blocked >5s — permit leak")?; + let c2 = timeout(Duration::from_secs(5), cm.connection("post-abort-b".into())) + .await + .expect("post-abort acquire 2 blocked >5s — permit leak")?; + drop(c1); + drop(c2); + Ok(()) +} + +/// `MAX_CONCURRENT` simultaneous holders is the steady-state ceiling. +/// One more request must queue (rather than be served by an underflowed +/// counter, which is exactly what production was doing). We confirm +/// queuing by making the extra request race a 200 ms timeout against +/// the drop of one held connection: only after the drop does the queued +/// request resolve. +/// +/// `connections_per_endpoint = MAX_CONCURRENT + 1` so the channel pool +/// is *not* the bottleneck — we want to prove the **permit** is what +/// blocks the third request, not channel availability. +#[nativelink_test] +async fn extra_request_above_max_blocks_until_a_release() -> Result<(), Error> { + const MAX_CONCURRENT: usize = 2; + + let endpoint = fake_grpc_server_endpoint().await; + let cm = Arc::new(ConnectionManager::new( + vec![endpoint], + MAX_CONCURRENT + 1, + MAX_CONCURRENT, + Retry::default(), + no_jitter(), + )); + + let c1 = cm.connection("hold-1".into()).await?; + let c2 = cm.connection("hold-2".into()).await?; + + // Third request must be queued — racing it against a short timeout + // proves it doesn't resolve while permits are exhausted. + let cm_for_third = Arc::clone(&cm); + let third = tokio::spawn(async move { cm_for_third.connection("queued-3".into()).await }); + tokio::time::sleep(Duration::from_millis(200)).await; + assert_eq!( + third.is_finished(), + false, + "third connection resolved while permits were exhausted", + ); + + // Drop one held permit; the queued request should now resolve. + drop(c1); + let c3 = timeout(Duration::from_secs(5), third) + .await + .expect("queued request did not resolve within 5s of permit release") + .unwrap()?; + + drop(c2); + drop(c3); + Ok(()) +} From c6c69bd4aeeb49e861e9777ccc3222de815405c0 Mon Sep 17 00:00:00 2001 From: Aman Kumar Date: Sat, 9 May 2026 01:50:37 +0100 Subject: [PATCH 18/18] redis_store: hold subscribed_keys write lock across receiver drop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `RedisSubscription::Drop` previously dropped the `watch::Receiver` *before* taking the `subscribed_keys` write lock, then decided whether to remove the publisher entry based on `receiver_count() == 0`. Two concurrent drops on subscriptions sharing a publisher (e.g. multiple `WaitExecution` clients on the same operation_id) could both decrement their counts before either took the lock, then race for it: the loser saw the entry already removed and emitted a spurious "Key … was not found in subscribed keys" error. Worse, if a fresh `subscribe(same_key)` interleaved between the two drops, the second drop could remove the freshly-inserted publisher and silently strand its subscribers. Acquire the write lock *first*, evaluate "count == 1 with my receiver still alive", remove the entry under the lock if so, then drop the receiver. The lock now serialises both the count read and the map mutation, closing both race windows. Demote the absence log from `error!` to `warn!`: with the fix, that path now indicates a genuine unexpected mutation outside the lock, not the race noise. Adds 4 regression tests covering single-drop silence, drop-one-of-two preserving the publisher, 200-iteration concurrent-drop race, and resubscribe-after-drop creating a fresh publisher. Co-Authored-By: Claude Opus 4.7 (1M context) --- nativelink-store/src/redis_store.rs | 57 +++++-- nativelink-store/tests/redis_store_test.rs | 176 ++++++++++++++++++++- 2 files changed, 220 insertions(+), 13 deletions(-) diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index f9aea2742..0d6f3a3a9 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -1335,22 +1335,61 @@ impl Drop for RedisSubscription { return; // Already dropped, nothing to do. }; let key = receiver.borrow().clone(); - // IMPORTANT: This must be dropped before receiver_count() is called. - drop(receiver); let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else { - return; // Already dropped, nothing to do. + return; // Parent dropped — nothing to do. }; + // Acquire the write lock BEFORE dropping our receiver. Earlier + // versions of this Drop dropped the receiver first and then + // acquired the lock to decide whether to remove the publisher + // entry; under concurrent drops of two subscriptions sharing the + // same publisher (e.g. two `WaitExecution` clients on the same + // operation_id), both threads decremented their receiver count + // before either took the lock, both saw `receiver_count() == 0` + // when they finally got in, and the loser of the lock race + // logged the spurious + // "Key … was not found in subscribed keys when checking if + // it should be removed." + // error against an entry the winner had just removed. Worse, + // the desync window could let a fresh `subscribe(same_key)` + // re-insert a publisher between the two drops, after which the + // second drop would mutate state belonging to *that* fresh + // publisher. + // + // Holding the write lock across the receiver drop closes the + // window: no other `subscribe`/`Drop` can interleave with our + // count change. We use the "is count == 1 with my receiver + // still alive?" predicate instead of the original "is count == 0 + // after I drop?" — both express "am I the last subscriber", + // but the former is decidable while we still hold the receiver + // (and therefore can be evaluated under the lock without + // requiring the receiver-drop to happen first). let mut subscribed_keys = subscribed_keys.write(); - let Some(value) = subscribed_keys.get(&key) else { - error!( - "Key {key} was not found in subscribed keys when checking if it should be removed." + let Some(publisher) = subscribed_keys.get(&key) else { + // Genuinely should not happen with the write lock held — + // every removal goes through this same lock. Demoted from + // ERROR to WARN: most observed occurrences in production + // were the lockless race above, not real corruption, and + // we've now eliminated that source. Keep the log so any + // *new* path that removes entries without holding the lock + // surfaces. + warn!( + %key, + "RedisSubscription::drop: key absent from subscribed_keys under write lock — \ + indicates an unexpected removal path", ); return; }; - // If we have no receivers, cleanup the entry from our map. - if value.receiver_count() == 0 { - subscribed_keys.remove(key); + // Count includes our own (still-alive) receiver. If we are the + // sole subscriber, remove the publisher entry. + if publisher.receiver_count() == 1 { + subscribed_keys.remove(&key); } + // Drop our receiver only after the bookkeeping decision. The + // count change becomes visible only here, but by this point + // either (a) we already removed the publisher from the map + // (count is irrelevant), or (b) other live receivers remain + // and the count truthfully drops by one. + drop(receiver); } } diff --git a/nativelink-store/tests/redis_store_test.rs b/nativelink-store/tests/redis_store_test.rs index 97941b504..de351541f 100644 --- a/nativelink-store/tests/redis_store_test.rs +++ b/nativelink-store/tests/redis_store_test.rs @@ -23,8 +23,8 @@ use nativelink_config::stores::{RedisMode, RedisSpec}; use nativelink_error::{Code, Error, ResultExt, make_err}; use nativelink_macro::nativelink_test; use nativelink_redis_tester::{ - ReadOnlyRedis, add_lua_script, fake_redis_sentinel_master_stream, fake_redis_sentinel_stream, - fake_redis_stream, make_fake_redis_with_responses, + ReadOnlyRedis, SubscriptionManagerNotify, add_lua_script, fake_redis_sentinel_master_stream, + fake_redis_sentinel_stream, fake_redis_stream, make_fake_redis_with_responses, }; use nativelink_store::cas_utils::ZERO_BYTE_DIGESTS; use nativelink_store::redis_store::{ @@ -36,8 +36,9 @@ use nativelink_util::common::DigestInfo; use nativelink_util::health_utils::HealthStatus; use nativelink_util::store_trait::{ FalseValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore, - SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider, StoreKey, - StoreLike, TrueValue, UploadSizeInfo, + SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider, + SchedulerSubscription, SchedulerSubscriptionManager, StoreKey, StoreLike, TrueValue, + UploadSizeInfo, }; use pretty_assertions::assert_eq; use redis::{PushInfo, RedisError, Value}; @@ -1390,3 +1391,170 @@ async fn send_messages_to_subscription_channel() -> Result<(), Error> { Ok(()) } + +// --------------------------------------------------------------------------- +// Regression tests for `RedisSubscription::Drop` map-desync. +// +// Pre-fix, `Drop` dropped its `watch::Receiver` *before* taking the +// `subscribed_keys` write lock. With two `RedisSubscription`s sharing +// a publisher (e.g. two `WaitExecution` clients on the same operation), +// both threads could race past the receiver-drop and contend for the +// lock holding stale `receiver_count() == 0` views — the loser would +// remove an entry that the winner had already removed (or, worse, +// remove an entry that a fresh `subscribe(same_key)` had already +// re-inserted). The visible symptom in production was a spurious +// log: +// "Key {key} was not found in subscribed keys when checking if it +// should be removed" +// firing whenever multiple watchers on a hot operation_id were torn +// down concurrently. The post-fix predicate ("count == 1 with my +// receiver still alive, evaluated under the write lock") makes the +// race structurally impossible. +// +// The post-fix `Drop` now reaches its warning path *only* if the +// `subscribed_keys` map gets mutated by a code path that doesn't take +// the lock — which is what we want it to flag, not the noise the +// pre-fix code generated. So these tests assert that the post-fix +// warning is silent under normal subscribe/drop traffic, including +// concurrent drops. + +/// Test key provider that just wraps a string. Reused across the +/// subscription regression tests below. +#[derive(Clone)] +struct TestSubKey(String); + +impl SchedulerStoreKeyProvider for TestSubKey { + type Versioned = FalseValue; + fn get_key(&self) -> StoreKey<'static> { + StoreKey::Str(std::borrow::Cow::Owned(self.0.clone())) + } +} + +/// Sanity: a single subscriber that drops cleanly produces no warning +/// and no error. +#[nativelink_test] +async fn redis_subscription_single_drop_is_silent() -> Result<(), Error> { + let (_tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let manager = RedisSubscriptionManager::new(rx); + + let sub = manager.subscribe(TestSubKey("solo-key".to_string()))?; + drop(sub); + sleep(Duration::from_millis(10)).await; + + assert!( + !logs_contain("key absent from subscribed_keys under write lock"), + "single-subscriber drop unexpectedly logged the absence warning", + ); + assert!(!logs_contain("ERROR")); + + drop(manager); + Ok(()) +} + +/// Two subscribers share a publisher. Dropping the first must leave +/// the publisher entry in place so the second's `changed()` keeps +/// firing on `notify_for_test`. Pre-fix this test was *also* clean +/// because the bug only fires on concurrent drops; this is the +/// non-racing baseline. +#[nativelink_test] +async fn redis_subscription_drop_one_of_two_keeps_publisher() -> Result<(), Error> { + let (_tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let manager = RedisSubscriptionManager::new(rx); + + let key = "shared-key"; + let sub_a = manager.subscribe(TestSubKey(key.to_string()))?; + let mut sub_b = manager.subscribe(TestSubKey(key.to_string()))?; + + // Drop the first; the second's subscription must still resolve + // when we notify on the same key. + drop(sub_a); + + manager.notify_for_test(key.to_string()); + timeout(Duration::from_secs(2), sub_b.changed()) + .await + .expect("sub_b.changed() did not fire — publisher entry was dropped prematurely")?; + + assert!( + !logs_contain("key absent from subscribed_keys under write lock"), + "absence warning fired during single drop with another receiver alive", + ); + drop(sub_b); + drop(manager); + Ok(()) +} + +/// **Race regression.** Two subscribers share a publisher; both are +/// dropped concurrently from separate tasks. Pre-fix this would +/// occasionally fire the absence warning (the receiver-drop happened +/// before the lock, so both threads' `receiver_count()` could read 0 +/// inside the lock, and the loser found the entry already removed). +/// We loop the scenario many times to keep the race window addressed +/// even on machines where the timing is favourable to the bug. +#[nativelink_test] +async fn redis_subscription_concurrent_drops_no_absence_warn() -> Result<(), Error> { + let (_tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let manager = RedisSubscriptionManager::new(rx); + + const ITERATIONS: usize = 200; + for i in 0..ITERATIONS { + let key = format!("race-key-{i}"); + let sub_a = manager.subscribe(TestSubKey(key.clone()))?; + let sub_b = manager.subscribe(TestSubKey(key.clone()))?; + + // `spawn_blocking` puts each Drop on its own thread; with the + // pre-fix code's "drop receiver, then take lock" sequence, + // both threads can race into the lock with already-decremented + // counts. With the post-fix "take lock, then evaluate, then + // drop receiver" sequence, the lock serialises the decision + // and the warning never fires. + let h_a = tokio::task::spawn_blocking(move || drop(sub_a)); + let h_b = tokio::task::spawn_blocking(move || drop(sub_b)); + h_a.await.unwrap(); + h_b.await.unwrap(); + } + sleep(Duration::from_millis(50)).await; + + assert!( + !logs_contain("key absent from subscribed_keys under write lock"), + "concurrent drops produced the absence warning at least once across {ITERATIONS} \ + iterations — the Drop ordering regressed", + ); + assert!(!logs_contain("ERROR")); + + drop(manager); + Ok(()) +} + +/// Subscribe → drop both → re-subscribe to the same key. Post-fix the +/// re-subscribe must create a fresh publisher (the old entry was +/// removed by the last drop). We confirm the fresh publisher is +/// connected by notifying and waiting for `changed()`. +#[nativelink_test] +async fn redis_subscription_resubscribe_after_drop_creates_fresh_publisher() -> Result<(), Error> { + let (_tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let manager = RedisSubscriptionManager::new(rx); + + let key = "cycle-key"; + let sub_a = manager.subscribe(TestSubKey(key.to_string()))?; + let sub_b = manager.subscribe(TestSubKey(key.to_string()))?; + drop(sub_a); + drop(sub_b); + + // Re-subscribe to the same key. If the previous drops left the + // map in an inconsistent state (stale publisher kept, or a + // partially-deconstructed entry), this either reuses a dead + // publisher (changed() never fires) or panics inside the + // patricia map. + let mut sub_c = manager.subscribe(TestSubKey(key.to_string()))?; + manager.notify_for_test(key.to_string()); + timeout(Duration::from_secs(2), sub_c.changed()) + .await + .expect("re-subscribe after drops produced a dead publisher")?; + + assert!(!logs_contain( + "key absent from subscribed_keys under write lock" + )); + drop(sub_c); + drop(manager); + Ok(()) +}