diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index e752ca051..fef6d714c 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -739,6 +739,26 @@ pub struct FastSlowSpec { /// and you wish to have an upstream read only store. #[serde(default)] pub slow_direction: StoreDirection, + + /// Reads of blobs at or above this size bypass the populating-digests + /// dedup map and stream directly from the slow store, without + /// populating the fast tier. + /// + /// Rationale: the leader/follower dedup is a win for blobs whose + /// transfer time is short relative to `LEADER_WAIT_TIMEOUT` — one + /// slow-store fetch fills the fast cache, subsequent readers serve + /// from fast. For multi-GB blobs (typically container layers) the + /// leader's transfer takes minutes; every concurrent follower hits + /// the timeout, falls through to the slow store anyway, and the + /// fast cache is then evicted aggressively to make room for the + /// huge blob — pushing out smaller, more-frequently-read entries. + /// Bypassing dedup for huge blobs avoids both pathologies. + /// + /// Set to 0 (default) to use the built-in default of 256 MiB. Set + /// to a very large value (e.g. `u64::MAX`) to disable the bypass + /// and always use dedup regardless of size. + #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")] + pub bypass_dedup_threshold_bytes: u64, } #[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)] diff --git a/nativelink-redis-tester/src/dynamic_fake_redis.rs b/nativelink-redis-tester/src/dynamic_fake_redis.rs index 8298c53aa..53586cf26 100644 --- a/nativelink-redis-tester/src/dynamic_fake_redis.rs +++ b/nativelink-redis-tester/src/dynamic_fake_redis.rs @@ -137,9 +137,21 @@ impl FakeRedisBackend { panic!("Aggregate query should be a string: {args:?}"); }; let query = str::from_utf8(raw_query).unwrap(); + // The real ft_aggregate caller now passes an explicit + // `TIMEOUT ` clause before `LOAD`. Tolerate both + // shapes here so this fake doesn't break older callers + // and the LOAD-args check still validates the bit we + // actually care about. + let load_offset = if matches!(args.get(2), Some(OwnedFrame::BulkString(b)) if b == b"TIMEOUT") + { + // Skip "TIMEOUT" and its millisecond argument. + 4 + } else { + 2 + }; // Lazy implementation making assumptions. assert_eq!( - args[2..6], + args[load_offset..load_offset + 4], vec![ OwnedFrame::BulkString(b"LOAD".to_vec()), OwnedFrame::BulkString(b"2".to_vec()), diff --git a/nativelink-scheduler/src/store_awaited_action_db.rs b/nativelink-scheduler/src/store_awaited_action_db.rs index f4cbedd03..5bfc5dcc1 100644 --- a/nativelink-scheduler/src/store_awaited_action_db.rs +++ b/nativelink-scheduler/src/store_awaited_action_db.rs @@ -695,16 +695,44 @@ where ActionUniqueQualifier::Cacheable(_) => {} ActionUniqueQualifier::Uncacheable(_) => return Ok(None), } - let stream = self - .store - .search_by_index_prefix(SearchUniqueQualifierToAwaitedAction(unique_qualifier)) - .await - .err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?; - tokio::pin!(stream); - let maybe_awaited_action = stream - .try_next() - .await - .err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?; + // Lookup, then on a miss retry once after a brief sleep. The + // backing index is `RediSearch`'s secondary index over the + // awaited-action hash-set; there is a sub-millisecond-scale + // window between a peer's HSET completing on the master and + // the index commit becoming visible to a concurrent reader. + // Two near-simultaneous `add_action` calls for the same + // `unique_qualifier` can both observe an empty result in that + // window and each create a separate operation, leading to + // duplicate scheduler operations for the same action digest + // (observed in production as "two same actions running on + // different PRs"). One short retry closes the window without + // a heavyweight atomic-claim mechanism. + // + // Uses real tokio time rather than the configurable `now_fn` + // sleep: `MockInstantWrapped::sleep` busy-yields until mock + // time advances, which scheduler tests don't do for this + // path, and the retry itself is a millisecond-scale physical + // wait that should never participate in test-controlled time. + const SUBSCRIBE_RACE_RETRY_DELAY: Duration = Duration::from_millis(20); + let mut maybe_awaited_action: Option = None; + for attempt in 0..2_u32 { + if attempt > 0 { + tokio::time::sleep(SUBSCRIBE_RACE_RETRY_DELAY).await; + } + let stream = self + .store + .search_by_index_prefix(SearchUniqueQualifierToAwaitedAction(unique_qualifier)) + .await + .err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?; + tokio::pin!(stream); + maybe_awaited_action = stream + .try_next() + .await + .err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?; + if maybe_awaited_action.is_some() { + break; + } + } match maybe_awaited_action { Some(awaited_action) => { // TODO(palfrey) We don't support joining completed jobs because we diff --git a/nativelink-service/src/execution_server.rs b/nativelink-service/src/execution_server.rs index f3f00878a..cdd6d52f6 100644 --- a/nativelink-service/src/execution_server.rs +++ b/nativelink-service/src/execution_server.rs @@ -20,6 +20,7 @@ use std::fmt; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; +use bytes::Bytes; use futures::stream::unfold; use futures::{Stream, StreamExt}; use nativelink_config::cas_server::{ExecutionConfig, InstanceName, WithInstanceName}; @@ -35,6 +36,7 @@ use nativelink_proto::google::longrunning::{ CancelOperationRequest, DeleteOperationRequest, GetOperationRequest, ListOperationsRequest, ListOperationsResponse, Operation, WaitOperationRequest, }; +use nativelink_proto::google::rpc::Status as GrpcStatusProto; use nativelink_store::ac_utils::get_and_decode_digest; use nativelink_store::store_manager::StoreManager; use nativelink_util::action_messages::{ @@ -45,10 +47,89 @@ use nativelink_util::digest_hasher::{DigestHasherFunc, make_ctx_for_hash_func}; use nativelink_util::operation_state_manager::{ ActionStateResult, ClientStateManager, OperationFilter, }; -use nativelink_util::store_trait::Store; +use nativelink_util::store_trait::{Store, StoreLike}; use opentelemetry::context::FutureExt; -use tonic::{Request, Response, Status}; -use tracing::{Instrument, Level, debug, error, error_span, instrument}; +use prost::Message as _; +use tonic::{Code, Request, Response, Status}; +use tracing::{Instrument, Level, debug, error, error_span, instrument, warn}; + +/// Inline definition of `google.rpc.PreconditionFailure`. Bazel's +/// `RemoteSpawnRunner` inspects this proto in the gRPC `Status.details` +/// of a `FAILED_PRECONDITION` response and, when violations of type +/// `MISSING` are present, automatically re-uploads the named blobs and +/// retries the Execute call. Without this detail Bazel surfaces the +/// failure as a hard build error — exactly the symptom triggered by an +/// interrupted previous build leaving partial CAS uploads. +mod precondition_failure { + /// `google.rpc.PreconditionFailure`. + #[derive(Clone, PartialEq, ::prost::Message)] + pub(super) struct PreconditionFailure { + #[prost(message, repeated, tag = "1")] + pub(super) violations: ::prost::alloc::vec::Vec, + } + /// `google.rpc.PreconditionFailure.Violation`. + #[derive(Clone, PartialEq, ::prost::Message)] + pub(super) struct Violation { + #[prost(string, tag = "1")] + pub(super) r#type: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub(super) subject: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub(super) description: ::prost::alloc::string::String, + } + pub(super) const TYPE_URL: &str = "type.googleapis.com/google.rpc.PreconditionFailure"; + pub(super) const VIOLATION_TYPE_MISSING: &str = "MISSING"; +} + +/// Build a tonic [`Status`] of code `FAILED_PRECONDITION` whose details +/// carry a `google.rpc.PreconditionFailure` listing the missing CAS +/// blobs. The format matches what Bazel's `RemoteExecutionService` +/// expects in order to trigger automatic re-upload + retry. +fn missing_blobs_failed_precondition( + missing: &[(DigestInfo, &'static str)], + summary: &str, +) -> Status { + let pf = precondition_failure::PreconditionFailure { + violations: missing + .iter() + .map(|(d, ctx)| precondition_failure::Violation { + r#type: precondition_failure::VIOLATION_TYPE_MISSING.to_string(), + // Per REv2, the subject for a missing-blob violation is + // `blobs//` so the client knows exactly + // which digest to re-upload. + subject: format!("blobs/{}/{}", d.packed_hash(), d.size_bytes()), + description: (*ctx).to_string(), + }) + .collect(), + }; + + // Wrap PreconditionFailure into a google.protobuf.Any. + let mut pf_buf: Vec = Vec::with_capacity(pf.encoded_len()); + pf.encode(&mut pf_buf).unwrap_or(()); + let any = prost_types::Any { + type_url: precondition_failure::TYPE_URL.to_string(), + value: pf_buf, + }; + + // tonic places the `details` argument verbatim into the + // `grpc-status-details-bin` trailer, which is itself a serialized + // `google.rpc.Status` proto. Encode the whole Status (code + + // message + details) so Bazel's RemoteExecutionService can decode + // the PreconditionFailure violations and re-upload missing blobs. + let status_proto = GrpcStatusProto { + code: Code::FailedPrecondition as i32, + message: summary.to_string(), + details: vec![any], + }; + let mut status_buf: Vec = Vec::with_capacity(status_proto.encoded_len()); + status_proto.encode(&mut status_buf).unwrap_or(()); + + Status::with_details( + Code::FailedPrecondition, + summary.to_string(), + Bytes::from(status_buf), + ) +} type InstanceInfoName = String; @@ -239,10 +320,17 @@ impl ExecutionServer { }) } + /// Outer `Err`: an internal error that the outer handler will tip + /// and convert via `Status::from(Error)`. + /// Inner `Err(Status)`: a pre-built `tonic::Status` that must be + /// returned to the client verbatim (e.g. `FAILED_PRECONDITION` with + /// a `PreconditionFailure` detail listing missing CAS blobs, which + /// Bazel uses to drive automatic re-upload + retry). async fn inner_execute( &self, request: ExecuteRequest, - ) -> Result> + Send + use<>, Error> { + ) -> Result> + Send + use<>, Status>, Error> + { let instance_name = request.instance_name; let instance_info = self @@ -261,8 +349,95 @@ impl ExecutionServer { .execution_policy .map_or(DEFAULT_EXECUTION_PRIORITY, |p| p.priority); - let action = - get_and_decode_digest::(&instance_info.cas_store, digest.into()).await?; + // Same recovery model as the post-Action blob check below, but + // for the Action proto itself: Bazel's interrupted-build state + // can leave the Action digest absent from CAS; without a + // `PreconditionFailure` detail Bazel can't know which blob to + // re-upload and surfaces the failure as terminal. Detect + // NotFound on the Action fetch and translate it into the same + // FAILED_PRECONDITION + violations shape so Bazel auto-recovers. + let action = match get_and_decode_digest::(&instance_info.cas_store, digest.into()) + .await + { + Ok(a) => a, + Err(e) if e.code == Code::NotFound => { + warn!( + %digest, + %e, + "Execute: Action proto missing from CAS; returning FAILED_PRECONDITION with PreconditionFailure detail so Bazel can re-upload" + ); + let summary = format!( + "Action {digest} is missing from CAS; client should re-upload it and retry" + ); + return Ok(Err(missing_blobs_failed_precondition( + &[(digest, "Action")], + &summary, + ))); + } + Err(e) => return Err(e).err_tip(|| "Decoding Action proto in Execute")?, + }; + + // Eager-validate that the blobs we'll feed to the worker are + // present in CAS *before* queuing the action. The most common + // way to land here without those blobs is an interrupted + // previous build: Bazel's local upload-state believes the blobs + // were sent, but the CAS does not have them, so a worker would + // later fail with `FAILED_PRECONDITION ... not found in either + // fast or slow store`. Surfaced to Bazel without a + // `PreconditionFailure` detail it becomes a hard build error; + // surfaced *with* one Bazel re-uploads the missing blobs and + // retries the Execute call automatically. + let action_command_digest = action + .command_digest + .as_ref() + .map(|d| DigestInfo::try_from(d.clone())) + .transpose() + .err_tip(|| "Failed to parse command_digest from Action")?; + let action_input_root_digest = action + .input_root_digest + .as_ref() + .map(|d| DigestInfo::try_from(d.clone())) + .transpose() + .err_tip(|| "Failed to parse input_root_digest from Action")?; + let mut blobs_to_check: Vec = Vec::with_capacity(2); + if let Some(d) = action_command_digest { + blobs_to_check.push(d); + } + if let Some(d) = action_input_root_digest { + blobs_to_check.push(d); + } + if !blobs_to_check.is_empty() { + let store_keys: Vec<_> = blobs_to_check.iter().map(|d| (*d).into()).collect(); + let sizes = instance_info + .cas_store + .has_many(&store_keys) + .await + .err_tip(|| "Validating Action input blobs in CAS")?; + let mut missing: Vec<(DigestInfo, &'static str)> = Vec::new(); + for ((digest, present), label) in blobs_to_check + .iter() + .zip(sizes.iter()) + .zip(["Action.command_digest", "Action.input_root_digest"].iter()) + { + if present.is_none() { + missing.push((*digest, label)); + } + } + if !missing.is_empty() { + warn!( + ?missing, + %digest, + "Execute pre-check found missing CAS blobs; returning FAILED_PRECONDITION with PreconditionFailure detail so Bazel can re-upload" + ); + let summary = format!( + "{} CAS blob(s) referenced by action {} are missing; client should re-upload them and retry", + missing.len(), + digest, + ); + return Ok(Err(missing_blobs_failed_precondition(&missing, &summary))); + } + } + let action_info = instance_info .build_action_info( instance_name.clone(), @@ -283,7 +458,7 @@ impl ExecutionServer { .await .err_tip(|| "Failed to schedule task")?; - Ok(Box::pin(Self::to_execute_stream( + Ok(Ok(Self::to_execute_stream( &NativelinkOperationId::new( instance_name, action_listener @@ -355,7 +530,15 @@ impl Execution for ExecutionServer { .await .err_tip(|| "Failed on execute() command")?; - Ok(Response::new(Box::pin(result))) + // Inner result distinguishes "internal error" (already + // converted via err_tip + Status::from above) from + // "deliberately constructed Status to return verbatim", so we + // can deliver `FAILED_PRECONDITION` with `PreconditionFailure` + // details to Bazel. + match result { + Ok(stream) => Ok(Response::new(Box::pin(stream))), + Err(status) => Err(status), + } } #[instrument( diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 41645daad..aafc961aa 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -57,6 +57,10 @@ pub struct FastSlowStore { #[metric(group = "slow_store")] slow_store: Store, slow_direction: StoreDirection, + /// Reads of blobs whose digest size is >= this value bypass the + /// populating-digests dedup map and stream directly from the slow + /// store. See [`FastSlowSpec::bypass_dedup_threshold_bytes`]. + bypass_dedup_threshold_bytes: u64, weak_self: Weak, #[metric] metrics: FastSlowStoreMetrics, @@ -118,17 +122,40 @@ impl Drop for LoaderGuard<'_> { impl FastSlowStore { pub fn new(spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc { + let bypass_dedup_threshold_bytes = if spec.bypass_dedup_threshold_bytes == 0 { + DEFAULT_BYPASS_DEDUP_THRESHOLD_BYTES + } else { + spec.bypass_dedup_threshold_bytes + }; Arc::new_cyclic(|weak_self| Self { fast_store, fast_direction: spec.fast_direction, slow_store, slow_direction: spec.slow_direction, + bypass_dedup_threshold_bytes, weak_self: weak_self.clone(), metrics: FastSlowStoreMetrics::default(), populating_digests: Mutex::new(HashMap::new()), }) } + /// Returns the digest size in bytes if `key` is a digest key, else `None`. + /// Non-digest keys (e.g. action-cache `StoreKey::Str`) carry no inherent + /// size and are never subject to the huge-blob bypass. + fn digest_size_bytes(key: &StoreKey<'_>) -> Option { + match key { + StoreKey::Digest(d) => Some(d.size_bytes()), + StoreKey::Str(_) => None, + } + } + + /// Whether a read of `key` should bypass the populating-digests + /// dedup and read directly from the slow store. True only when the + /// blob's digest size meets or exceeds the configured threshold. + fn should_bypass_dedup(&self, key: &StoreKey<'_>) -> bool { + Self::digest_size_bytes(key).is_some_and(|size| size >= self.bypass_dedup_threshold_bytes) + } + pub const fn fast_store(&self) -> &Store { &self.fast_store } @@ -609,19 +636,52 @@ impl StoreDriver for FastSlowStore { offset: u64, length: Option, ) -> Result<(), Error> { - // TODO(palfrey) Investigate if we should maybe ignore errors here instead of - // forwarding them up. + // The fast store's `has()` is a metadata lookup against its + // in-memory eviction map; the actual file may have already been + // evicted off disk between `has()` and the subsequent + // `get_part()` (or — observed in production — added to the map + // by a write whose final rename failed, leaving a phantom + // entry). When that happens the filesystem store returns + // `NotFound` from `get_part`. Falling through to populate from + // the slow store recovers transparently; propagating the error + // would surface as `FAILED_PRECONDITION ... not found in + // either fast or slow store` to the worker and fail the + // action even though the slow store has the blob. + // + // We only fall through when no bytes have been written to the + // caller's `writer` yet — once partial bytes have been streamed + // we cannot honour a retry without producing a corrupt result. if self.fast_store.has(key.borrow()).await?.is_some() { - self.metrics - .fast_store_hit_count - .fetch_add(1, Ordering::Acquire); - self.fast_store - .get_part(key, writer.borrow_mut(), offset, length) - .await?; - self.metrics - .fast_store_downloaded_bytes - .fetch_add(writer.get_bytes_written(), Ordering::Acquire); - return Ok(()); + let bytes_before = writer.get_bytes_written(); + match self + .fast_store + .get_part(key.borrow(), writer.borrow_mut(), offset, length) + .await + { + Ok(()) => { + self.metrics + .fast_store_hit_count + .fetch_add(1, Ordering::Acquire); + self.metrics + .fast_store_downloaded_bytes + .fetch_add(writer.get_bytes_written(), Ordering::Acquire); + return Ok(()); + } + Err(e) + if e.code == Code::NotFound && writer.get_bytes_written() == bytes_before => + { + self.metrics + .fast_store_stale_map_falls_through + .fetch_add(1, Ordering::Acquire); + warn!( + %key, + ?e, + "FastSlowStore::get_part: fast store had a map entry but the file was missing on disk; falling through to slow store", + ); + // fall through to the populate path below + } + Err(e) => return Err(e), + } } // If the fast store is noop or read only or update only then bypass it. @@ -644,6 +704,35 @@ impl StoreDriver for FastSlowStore { return Ok(()); } + // Huge-blob bypass. The leader/follower dedup is a net loss for + // multi-GB blobs: the leader's pull from the slow store takes + // long enough that every concurrent follower hits + // `LEADER_WAIT_TIMEOUT` and falls through to its own slow-store + // read anyway, *and* populating the fast tier with the huge + // blob evicts a large number of smaller, more-useful entries. + // Stream directly from the slow store and skip the populate. + if self.should_bypass_dedup(&key) { + self.metrics + .huge_blob_dedup_bypasses + .fetch_add(1, Ordering::Acquire); + self.metrics + .slow_store_hit_count + .fetch_add(1, Ordering::Acquire); + debug!( + %key, + threshold_bytes = self.bypass_dedup_threshold_bytes, + "FastSlowStore::get_part: blob meets huge-blob threshold; bypassing dedup", + ); + self.slow_store + .get_part(key, writer.borrow_mut(), offset, length) + .await + .err_tip(|| "In FastSlowStore::get_part huge-blob bypass")?; + self.metrics + .slow_store_downloaded_bytes + .fetch_add(writer.get_bytes_written(), Ordering::Acquire); + return Ok(()); + } + let mut writer = Some(writer); // Drive the dedup loader. Two distinct paths: @@ -758,6 +847,14 @@ struct FastSlowStoreMetrics { help = "Number of times a follower bypassed the populating-digests dedup because the leader exceeded LEADER_WAIT_TIMEOUT" )] leader_wait_timeouts: AtomicU64, + #[metric( + help = "Number of times the fast store reported has() == Some but the subsequent get_part returned NotFound (stale eviction-map entry); request fell through to populate from the slow store" + )] + fast_store_stale_map_falls_through: AtomicU64, + #[metric( + help = "Number of get_part calls that bypassed the populating-digests dedup because the blob met the huge-blob threshold and were served directly from the slow store" + )] + huge_blob_dedup_bypasses: AtomicU64, } /// Maximum time a follower will wait on the leader-populator before @@ -769,4 +866,12 @@ struct FastSlowStoreMetrics { /// single slow read into a fan-out of `DEADLINE_EXCEEDED` errors. const LEADER_WAIT_TIMEOUT: Duration = Duration::from_secs(60); +/// Default value for [`FastSlowStore::bypass_dedup_threshold_bytes`] when +/// the spec leaves it unset (zero). 256 MiB is large enough that typical +/// Bazel build outputs (sources, jars, intermediate artifacts) still go +/// through dedup and benefit from fast-tier caching, but small enough to +/// catch container-image layers and other multi-hundred-MB blobs whose +/// dedup-leader transfer reliably exceeds `LEADER_WAIT_TIMEOUT`. +const DEFAULT_BYPASS_DEDUP_THRESHOLD_BYTES: u64 = 256 * 1024 * 1024; + default_health_status_indicator!(FastSlowStore); diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 0d1c9a54d..cd0672b3b 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -41,6 +41,7 @@ use nativelink_util::store_trait::{ }; use tokio::io::{AsyncReadExt, AsyncWriteExt, Take}; use tokio::sync::Semaphore; +use tokio::time::timeout; use tokio_stream::wrappers::ReadDirStream; use tracing::{debug, error, info, trace, warn}; @@ -396,6 +397,20 @@ impl LenEntry for FileEntryImpl { // to `debug` and drop the per-emission path fields so it // stops costing serialization in the hot path. // + // We also flip `path_type` to `Temp` and adopt the + // would-be temp key here. The semantic effect of `unref` + // is "this entry no longer owns the content path"; if + // the source file is already gone, that postcondition + // already holds. Updating the metadata to match has two + // payoffs: (1) a subsequent `unref` on the same entry + // hits the `path_type == Temp` early-return instead of + // racing again on the same vanished path; (2) the + // EncodedFilePath::drop logic short-circuits on Content + // and would otherwise leak the entry's claim on the + // (already-moved) content path. Marking Temp here keeps + // the entry consistent with reality even though the + // physical move was a no-op. + // // Other rename failures (EACCES, EXDEV, EBUSY, …) are // genuinely unexpected and stay at `warn` with full // context. @@ -404,6 +419,8 @@ impl LenEntry for FileEntryImpl { key = ?encoded_file_path.key, "Failed to rename file (already gone, treating as benign)", ); + encoded_file_path.path_type = PathType::Temp; + encoded_file_path.key = new_key; } else { warn!( key = ?encoded_file_path.key, @@ -1094,10 +1111,22 @@ impl StoreDriver for FilesystemStore { let mut temp_file = entry.read_file_part(offset, read_limit).or_else(|err| async move { // If the file is not found, we need to remove it from the eviction map. if err.code == Code::NotFound { - error!( + // The eviction map and on-disk state have diverged + // for this digest — the map said the file was at the + // content path but `open()` reported ENOENT. The + // runtime recovers automatically: `FastSlowStore::get_part` + // catches our `NotFound` and falls through to populate + // from the slow store (see the + // `fast_store_stale_map_falls_through` metric). So + // this is observed-but-handled, not catastrophic, + // and the original "process probably need restarted" + // message is stale — the divergence self-heals via + // the map removal below plus the slow-store + // re-populate. Demote to `warn` and reword. + warn!( ?err, key = ?owned_key, - "Entry was in our map, but not found on disk. Removing from map as a precaution, but process probably need restarted." + "Filesystem store map/disk divergence: removing entry; reader will fall through to slow store", ); self.evicting_map.remove(&owned_key).await; } @@ -1158,7 +1187,60 @@ impl HealthStatusIndicator for FilesystemStore { "FilesystemStore" } - async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { - StoreDriver::check_health(Pin::new(self), namespace).await + /// Lightweight health check: stat the configured `content_path` + /// directory. Proves the underlying mount is present and we have + /// permission to read it — without writing a probe blob, hashing + /// it, and reading it back through the eviction map. + /// + /// The default `StoreDriver::check_health` performs a full + /// `update_oneshot` + `has` + `get_part_unchunked` roundtrip. Each + /// of those operations contends on the same write-semaphore and + /// eviction-map locks as in-flight production traffic; under load + /// (large blob streams, dense eviction churn) the probe queues + /// behind real work and overruns the per-indicator budget, + /// surfacing as an HTTP 503 even when the disk itself is fine. + /// A `metadata` call is a single `stat()` syscall — bounded, + /// shared with no production code path, and a strict superset of + /// what a kubelet probe needs to know. + async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus { + /// Per-check physical ceiling. A `stat()` of a healthy local + /// directory completes in microseconds; this bound exists only + /// so a hung NFS / EBS / persistent-volume mount cannot wedge + /// the indicator. + const HEALTH_PROBE_TIMEOUT: Duration = Duration::from_secs(2); + + let content_path = self.shared_context.content_path.clone(); + let stat = tokio::fs::metadata(content_path.clone()); + match timeout(HEALTH_PROBE_TIMEOUT, stat).await { + Ok(Ok(meta)) if meta.is_dir() => { + HealthStatus::new_ok(self, "FilesystemStore::check_health: ok".into()) + } + Ok(Ok(_)) => HealthStatus::new_failed( + self, + format!( + "FilesystemStore::check_health: content_path {content_path} is not a directory" + ) + .into(), + ), + Ok(Err(e)) => { + warn!( + ?e, + %content_path, + "FilesystemStore::check_health: stat errored", + ); + HealthStatus::new_failed( + self, + format!("FilesystemStore::check_health: stat errored: {e}").into(), + ) + } + Err(_) => HealthStatus::new_failed( + self, + format!( + "FilesystemStore::check_health: stat of {content_path} exceeded {} s timeout", + HEALTH_PROBE_TIMEOUT.as_secs() + ) + .into(), + ), + } } } diff --git a/nativelink-store/src/gcs_store.rs b/nativelink-store/src/gcs_store.rs index 9266c17cb..c83ffaa1d 100644 --- a/nativelink-store/src/gcs_store.rs +++ b/nativelink-store/src/gcs_store.rs @@ -14,6 +14,7 @@ use core::fmt::Debug; use core::pin::Pin; +use core::time::Duration; use std::borrow::Cow; use std::sync::Arc; @@ -32,7 +33,8 @@ use nativelink_util::store_trait::{ RemoveItemCallback, StoreDriver, StoreKey, StoreOptimizations, UploadSizeInfo, }; use rand::Rng; -use tokio::time::sleep; +use tokio::time::{sleep, timeout}; +use tracing::warn; use crate::cas_utils::is_zero_digest; use crate::gcs_client::client::{GcsClient, GcsOperations}; @@ -486,7 +488,59 @@ where "GcsStore" } - async fn check_health(&self, namespace: Cow<'static, str>) -> HealthStatus { - StoreDriver::check_health(Pin::new(self), namespace).await + /// Lightweight health check: a single `object_exists` call against + /// a deterministic, never-existing object path. Proves the bucket + /// is reachable, the credentials are valid, and the GCS API is + /// answering — without sharing bandwidth, connection pool, or + /// upload buffers with in-flight production traffic. + /// + /// The default `StoreDriver::check_health` performs a full + /// `update_oneshot` + `has` + `get_part_unchunked` roundtrip; under + /// any meaningful slow-store load (e.g. concurrent multi-GB blob + /// pulls saturating the network), that roundtrip queues behind + /// production traffic and easily exceeds the per-indicator budget. + /// Each timeout shows up at the kubelet as an HTTP 503 on the + /// liveness/readiness path even though the pod is otherwise + /// functional, eventually triggering a probe-failure restart that + /// drops every connected client. + /// + /// `object_exists` issues a metadata HEAD-equivalent: a few hundred + /// bytes on the wire and a single round-trip, independent of the + /// store's body-transfer bandwidth. A `false` result is the success + /// case (proves the API path works); only an outright `Err` + /// indicates an unhealthy slow store. + async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus { + /// Per-check physical ceiling. Tight enough to stay well under + /// the `HealthServer` per-indicator budget, loose enough to + /// absorb a slow round-trip during transient network blips. + const HEALTH_PROBE_TIMEOUT: Duration = Duration::from_secs(2); + + // Path is deliberately fixed and obviously-not-real. The probe + // does not depend on the bucket containing any object; it only + // depends on GCS answering "no, that doesn't exist." + let probe_path = ObjectPath::new( + self.bucket.clone(), + "__nativelink_health_probe__/does-not-exist", + ); + + let probe = self.client.object_exists(&probe_path); + match timeout(HEALTH_PROBE_TIMEOUT, probe).await { + Ok(Ok(_)) => HealthStatus::new_ok(self, "GcsStore::check_health: ok".into()), + Ok(Err(e)) => { + warn!(?e, "GcsStore::check_health: object_exists errored"); + HealthStatus::new_failed( + self, + format!("GcsStore::check_health: object_exists errored: {e}").into(), + ) + } + Err(_) => HealthStatus::new_failed( + self, + format!( + "GcsStore::check_health: probe exceeded {} s timeout", + HEALTH_PROBE_TIMEOUT.as_secs() + ) + .into(), + ), + } } } diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index 80cd11fb1..631c720b7 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -1114,11 +1114,24 @@ where /// is reachable and the master is accepting commands; that is /// the only invariant a kubelet probe needs. async fn check_health(&self, _namespace: Cow<'static, str>) -> HealthStatus { - /// Per-check physical ceiling. Tight enough to stay well - /// under `HealthServer`'s default per-indicator budget; - /// loose enough to absorb a normally-slow PING during a - /// `BGSAVE` fork or sentinel rebalance. - const PING_TIMEOUT: Duration = Duration::from_secs(2); + /// Per-check physical ceiling. The default `HealthServer` + /// per-indicator budget is 5 s + /// (`DEFAULT_HEALTH_CHECK_TIMEOUT_SECONDS`); this stays a + /// margin under that so the wrapper has time to receive our + /// answer. + /// + /// 4 s is the smallest ceiling that absorbs a normally-slow + /// PING during the realistic worst case for our deployments: + /// a `BGSAVE` `fork()` on a multi-GB Redis instance under + /// load (the parent process is briefly paused while the + /// kernel sets up copy-on-write page tables; on our + /// production sentinel cluster we have observed pauses up + /// to ~3 s on an 11 GB master). A tighter ceiling caused + /// every `RedisStore` indicator to flap simultaneously + /// during routine RDB checkpoints, surfacing as 503s on + /// `/status` and probe-failure events even though Redis was + /// otherwise healthy. + const PING_TIMEOUT: Duration = Duration::from_secs(4); let mut client = match self.get_client().await { Ok(c) => c, @@ -1327,22 +1340,61 @@ impl Drop for RedisSubscription { return; // Already dropped, nothing to do. }; let key = receiver.borrow().clone(); - // IMPORTANT: This must be dropped before receiver_count() is called. - drop(receiver); let Some(subscribed_keys) = self.weak_subscribed_keys.upgrade() else { - return; // Already dropped, nothing to do. + return; // Parent dropped — nothing to do. }; + // Acquire the write lock BEFORE dropping our receiver. Earlier + // versions of this Drop dropped the receiver first and then + // acquired the lock to decide whether to remove the publisher + // entry; under concurrent drops of two subscriptions sharing the + // same publisher (e.g. two `WaitExecution` clients on the same + // operation_id), both threads decremented their receiver count + // before either took the lock, both saw `receiver_count() == 0` + // when they finally got in, and the loser of the lock race + // logged the spurious + // "Key … was not found in subscribed keys when checking if + // it should be removed." + // error against an entry the winner had just removed. Worse, + // the desync window could let a fresh `subscribe(same_key)` + // re-insert a publisher between the two drops, after which the + // second drop would mutate state belonging to *that* fresh + // publisher. + // + // Holding the write lock across the receiver drop closes the + // window: no other `subscribe`/`Drop` can interleave with our + // count change. We use the "is count == 1 with my receiver + // still alive?" predicate instead of the original "is count == 0 + // after I drop?" — both express "am I the last subscriber", + // but the former is decidable while we still hold the receiver + // (and therefore can be evaluated under the lock without + // requiring the receiver-drop to happen first). let mut subscribed_keys = subscribed_keys.write(); - let Some(value) = subscribed_keys.get(&key) else { - error!( - "Key {key} was not found in subscribed keys when checking if it should be removed." + let Some(publisher) = subscribed_keys.get(&key) else { + // Genuinely should not happen with the write lock held — + // every removal goes through this same lock. Demoted from + // ERROR to WARN: most observed occurrences in production + // were the lockless race above, not real corruption, and + // we've now eliminated that source. Keep the log so any + // *new* path that removes entries without holding the lock + // surfaces. + warn!( + %key, + "RedisSubscription::drop: key absent from subscribed_keys under write lock — \ + indicates an unexpected removal path", ); return; }; - // If we have no receivers, cleanup the entry from our map. - if value.receiver_count() == 0 { - subscribed_keys.remove(key); + // Count includes our own (still-alive) receiver. If we are the + // sole subscriber, remove the publisher entry. + if publisher.receiver_count() == 1 { + subscribed_keys.remove(&key); } + // Drop our receiver only after the bookkeeping decision. The + // count change becomes visible only here, but by this point + // either (a) we already removed the publisher from the map + // (count is irrelevant), or (b) other live receivers remain + // and the count truthfully drops by one. + drop(receiver); } } @@ -1812,12 +1864,24 @@ where } result => (connection_manager, result), }; - let create_result = result.err_tip(|| { - format!( - "Error with ft_create in RedisStore::search_by_index_prefix({})", - get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY), - ) - }); + // RediSearch returns `Extension: "Index": already exists` + // when our `ft_create` races another caller who already + // created (or never lost) the index. That is *not* a + // real failure: it just means the index is in place, + // which is the only postcondition we care about. Treat + // it as Ok so the merged error below only carries the + // signal-bearing failure (e.g. an actual ft_aggregate + // timeout) instead of polluting it with noise. + let create_result = match result { + Ok(()) => Ok(()), + Err(ref e) if format!("{e:?}").contains("already exists") => Ok(()), + Err(_) => result.err_tip(|| { + format!( + "Error with ft_create in RedisStore::search_by_index_prefix({})", + get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY), + ) + }), + }; let run_result = run_ft_aggregate(connection_manager).await.err_tip(|| { format!( diff --git a/nativelink-store/src/redis_utils/ft_aggregate.rs b/nativelink-store/src/redis_utils/ft_aggregate.rs index b987754a3..7974285bd 100644 --- a/nativelink-store/src/redis_utils/ft_aggregate.rs +++ b/nativelink-store/src/redis_utils/ft_aggregate.rs @@ -35,6 +35,19 @@ pub(crate) struct FtAggregateOptions { pub sort_by: Vec, } +/// Per-query `FT.AGGREGATE` timeout in milliseconds. +/// +/// `RediSearch`'s module default (≈500 ms) is far too tight for the +/// scheduler's awaited-action index under any meaningful load: queries +/// time out, `NativeLink` surfaces them as parse errors, and the dedup +/// lookup fails. When dedup fails the scheduler creates a duplicate +/// operation for an action that is already in flight — observed as +/// "two same actions running on different PRs" with each running the +/// full `maxActionExecutingTimeoutS` window before completing. Pass an +/// explicit value generous enough to absorb 1M+ document scans on a +/// busy `RediSearch` instance. +const FT_AGGREGATE_TIMEOUT_MS: u64 = 10_000; + /// Calls `FT.AGGREGATE` in redis. redis-rs does not properly support this command /// so we have to manually handle it. pub(crate) async fn ft_aggregate( @@ -56,6 +69,8 @@ where let mut ft_aggregate_cmd = cmd .arg(&index) .arg(&query) + .arg("TIMEOUT") + .arg(FT_AGGREGATE_TIMEOUT_MS) .arg("LOAD") .arg(options.load.len()) .arg(&options.load) diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index 7ab8b8d7d..6ebc6345e 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -13,7 +13,7 @@ // limitations under the License. use core::pin::Pin; -use core::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use core::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use core::time::Duration; use std::sync::{Arc, Mutex}; @@ -53,6 +53,7 @@ fn make_stores_direction( slow: StoreSpec::Memory(MemorySpec::default()), fast_direction, slow_direction, + bypass_dedup_threshold_bytes: 0, }, fast_store.clone(), slow_store.clone(), @@ -356,6 +357,7 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { slow: StoreSpec::Memory(MemorySpec::default()), fast_direction: StoreDirection::default(), slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: 0, }, fast_store, slow_store, @@ -399,6 +401,7 @@ async fn ignore_value_in_fast_store() -> Result<(), Error> { slow: StoreSpec::Memory(MemorySpec::default()), fast_direction: StoreDirection::default(), slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: 0, }, fast_store.clone(), slow_store, @@ -424,6 +427,7 @@ async fn has_checks_fast_store_when_noop() -> Result<(), Error> { slow: StoreSpec::Noop(NoopSpec::default()), fast_direction: StoreDirection::default(), slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: 0, }; let fast_slow_store = Arc::new(FastSlowStore::new( &fast_slow_store_config, @@ -660,6 +664,7 @@ fn make_stores_with_lazy_slow() -> (Store, Store, Store) { slow: StoreSpec::Memory(MemorySpec::default()), fast_direction: StoreDirection::default(), slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: 0, }, fast_store.clone(), slow_store.clone(), @@ -712,6 +717,276 @@ async fn lazy_not_found_syncs_to_fast_store() -> Result<(), Error> { Ok(()) } +// --------------------------------------------------------------------------- +// Huge-blob dedup bypass. +// +// For multi-GB blobs the leader/follower dedup is a net loss: the leader's +// transfer reliably exceeds `LEADER_WAIT_TIMEOUT`, every concurrent follower +// falls through to its own slow-store read anyway, and populating the fast +// tier with the huge blob evicts a large number of smaller, more-useful +// entries. The bypass short-circuits that pathology by streaming straight +// from the slow store and skipping the populate. +// +// The tests here use a tiny threshold (so the test stays bounded in memory) +// and an instrumented slow store that counts get_part invocations. Under +// dedup we expect a single slow-store call regardless of fan-in; under +// bypass we expect one per caller. +// --------------------------------------------------------------------------- + +/// `StoreDriver` that wraps a `MemoryStore` and counts how many times +/// `get_part` is entered. Used to assert dedup behaviour: under dedup +/// the leader is the only `get_part` invocation; under bypass each +/// concurrent caller drives its own. +#[derive(MetricsComponent)] +struct CountingSlowStore { + inner: Arc, + get_part_calls: Arc, +} + +#[async_trait] +impl StoreDriver for CountingSlowStore { + async fn has_with_results( + self: Pin<&Self>, + keys: &[StoreKey<'_>], + results: &mut [Option], + ) -> Result<(), Error> { + Pin::new(self.inner.as_ref()) + .has_with_results(keys, results) + .await + } + + async fn update( + self: Pin<&Self>, + key: StoreKey<'_>, + reader: nativelink_util::buf_channel::DropCloserReadHalf, + size_info: nativelink_util::store_trait::UploadSizeInfo, + ) -> Result<(), Error> { + Pin::new(self.inner.as_ref()) + .update(key, reader, size_info) + .await + } + + async fn get_part( + self: Pin<&Self>, + key: StoreKey<'_>, + writer: &mut nativelink_util::buf_channel::DropCloserWriteHalf, + offset: u64, + length: Option, + ) -> Result<(), Error> { + self.get_part_calls.fetch_add(1, Ordering::AcqRel); + Pin::new(self.inner.as_ref()) + .get_part(key, writer, offset, length) + .await + } + + fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { + self + } + fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) { + self + } + fn as_any_arc(self: Arc) -> Arc { + self + } + fn register_remove_callback( + self: Arc, + _callback: Arc, + ) -> Result<(), Error> { + Ok(()) + } +} + +default_health_status_indicator!(CountingSlowStore); + +#[nativelink_test] +async fn huge_blob_bypasses_dedup_and_skips_populate() -> Result<(), Error> { + // 1 KiB threshold so a 2 KiB blob trips the bypass. + const THRESHOLD: u64 = 1024; + const BLOB_SIZE: usize = 2 * 1024; + const READERS: usize = 8; + + let original_data = make_random_data(BLOB_SIZE); + let digest = DigestInfo::try_new(VALID_HASH, original_data.len()).unwrap(); + + // Seed the slow tier directly so a fast-tier miss is the realistic + // starting state. Populating through the wrapper would also fill + // the fast store via the dual-write path of `update`. + let inner_slow = MemoryStore::new(&MemorySpec::default()); + inner_slow + .update_oneshot(digest, original_data.clone().into()) + .await?; + let calls = Arc::new(AtomicUsize::new(0)); + let counting = Arc::new(CountingSlowStore { + inner: inner_slow, + get_part_calls: calls.clone(), + }); + let fast_store = Store::new(MemoryStore::new(&MemorySpec::default())); + let slow_store = Store::new(counting); + let fast_slow_store = Store::new(FastSlowStore::new( + &FastSlowSpec { + fast: StoreSpec::Memory(MemorySpec::default()), + slow: StoreSpec::Memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), + slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: THRESHOLD, + }, + fast_store.clone(), + slow_store, + )); + + // Fan out READERS concurrent get_part calls. + let mut joins = Vec::with_capacity(READERS); + for _ in 0..READERS { + let store = fast_slow_store.clone(); + let expected = original_data.clone(); + joins.push(tokio::spawn(async move { + let got = store.get_part_unchunked(digest, 0, None).await?; + assert_eq!(got.as_ref(), expected.as_slice(), "data mismatch"); + Ok::<_, Error>(()) + })); + } + for j in joins { + j.await + .map_err(|e| make_err!(Code::Internal, "join failed: {e}"))??; + } + + // Bypass should drive one slow-store call per reader, not one for + // all of them. + assert_eq!( + calls.load(Ordering::Acquire), + READERS, + "expected {READERS} slow-store get_part calls under bypass, observed dedup" + ); + + // And the fast tier must not have been populated. + assert!( + fast_store.has(digest).await?.is_none(), + "huge-blob bypass populated the fast tier; that defeats the point of the bypass" + ); + + Ok(()) +} + +#[nativelink_test] +async fn small_blob_still_dedups_and_populates() -> Result<(), Error> { + // 1 MiB threshold; a 64-byte blob sits comfortably below it so the + // existing dedup path runs. + const THRESHOLD: u64 = 1024 * 1024; + const BLOB_SIZE: usize = 64; + const READERS: usize = 8; + + let original_data = make_random_data(BLOB_SIZE); + let digest = DigestInfo::try_new(VALID_HASH, original_data.len()).unwrap(); + + let inner_slow = MemoryStore::new(&MemorySpec::default()); + inner_slow + .update_oneshot(digest, original_data.clone().into()) + .await?; + let calls = Arc::new(AtomicUsize::new(0)); + let counting = Arc::new(CountingSlowStore { + inner: inner_slow, + get_part_calls: calls.clone(), + }); + let fast_store = Store::new(MemoryStore::new(&MemorySpec::default())); + let slow_store = Store::new(counting); + let fast_slow_store = Store::new(FastSlowStore::new( + &FastSlowSpec { + fast: StoreSpec::Memory(MemorySpec::default()), + slow: StoreSpec::Memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), + slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: THRESHOLD, + }, + fast_store.clone(), + slow_store, + )); + + let mut joins = Vec::with_capacity(READERS); + for _ in 0..READERS { + let store = fast_slow_store.clone(); + let expected = original_data.clone(); + joins.push(tokio::spawn(async move { + let got = store.get_part_unchunked(digest, 0, None).await?; + assert_eq!(got.as_ref(), expected.as_slice(), "data mismatch"); + Ok::<_, Error>(()) + })); + } + for j in joins { + j.await + .map_err(|e| make_err!(Code::Internal, "join failed: {e}"))??; + } + + // Dedup should collapse all readers down to a single slow-store + // call. + assert_eq!( + calls.load(Ordering::Acquire), + 1, + "small-blob path lost dedup; observed >1 slow-store call" + ); + + // And the fast tier should be populated. + assert!( + fast_store.has(digest).await?.is_some(), + "small-blob path failed to populate the fast tier" + ); + + Ok(()) +} + +#[nativelink_test] +async fn bypass_threshold_is_inclusive_at_exact_size() -> Result<(), Error> { + // The bypass condition is `size >= threshold`. A blob whose digest + // size exactly equals the threshold must take the bypass path. + const SIZE: usize = 4096; + const READERS: usize = 4; + + let original_data = make_random_data(SIZE); + let digest = DigestInfo::try_new(VALID_HASH, original_data.len()).unwrap(); + + let inner_slow = MemoryStore::new(&MemorySpec::default()); + inner_slow + .update_oneshot(digest, original_data.clone().into()) + .await?; + let calls = Arc::new(AtomicUsize::new(0)); + let counting = Arc::new(CountingSlowStore { + inner: inner_slow, + get_part_calls: calls.clone(), + }); + let fast_store = Store::new(MemoryStore::new(&MemorySpec::default())); + let slow_store = Store::new(counting); + let fast_slow_store = Store::new(FastSlowStore::new( + &FastSlowSpec { + fast: StoreSpec::Memory(MemorySpec::default()), + slow: StoreSpec::Memory(MemorySpec::default()), + fast_direction: StoreDirection::default(), + slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: SIZE as u64, + }, + fast_store.clone(), + slow_store, + )); + + let mut joins = Vec::with_capacity(READERS); + for _ in 0..READERS { + let store = fast_slow_store.clone(); + joins.push(tokio::spawn(async move { + let _ignored = store.get_part_unchunked(digest, 0, None).await?; + Ok::<_, Error>(()) + })); + } + for j in joins { + j.await + .map_err(|e| make_err!(Code::Internal, "join failed: {e}"))??; + } + + assert_eq!( + calls.load(Ordering::Acquire), + READERS, + "size == threshold should bypass dedup but observed dedup", + ); + Ok(()) +} + #[derive(MetricsComponent)] struct InstrumentedSlowStore { digest: DigestInfo, @@ -805,6 +1080,7 @@ fn make_fast_slow_with_instrumented_slow( slow: StoreSpec::Memory(MemorySpec::default()), fast_direction: StoreDirection::default(), slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: 0, }, fast, Store::new(slow.clone()), diff --git a/nativelink-store/tests/redis_store_test.rs b/nativelink-store/tests/redis_store_test.rs index 17e66a005..32a8a4789 100644 --- a/nativelink-store/tests/redis_store_test.rs +++ b/nativelink-store/tests/redis_store_test.rs @@ -23,8 +23,9 @@ use nativelink_config::stores::{RedisMode, RedisSpec}; use nativelink_error::{Code, Error, ResultExt, make_err}; use nativelink_macro::nativelink_test; use nativelink_redis_tester::{ - ReadOnlyRedis, add_lua_script, add_to_response, fake_redis_sentinel_master_stream, - fake_redis_sentinel_stream, fake_redis_stream, make_fake_redis_with_responses, + ReadOnlyRedis, SubscriptionManagerNotify, add_lua_script, add_to_response, + fake_redis_sentinel_master_stream, fake_redis_sentinel_stream, fake_redis_stream, + make_fake_redis_with_responses, }; use nativelink_store::cas_utils::ZERO_BYTE_DIGESTS; use nativelink_store::redis_store::{ @@ -36,8 +37,9 @@ use nativelink_util::common::DigestInfo; use nativelink_util::health_utils::HealthStatus; use nativelink_util::store_trait::{ FalseValue, SchedulerCurrentVersionProvider, SchedulerIndexProvider, SchedulerStore, - SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider, StoreKey, - StoreLike, TrueValue, UploadSizeInfo, + SchedulerStoreDataProvider, SchedulerStoreDecodeTo, SchedulerStoreKeyProvider, + SchedulerSubscription, SchedulerSubscriptionManager, StoreKey, StoreLike, TrueValue, + UploadSizeInfo, }; use pretty_assertions::assert_eq; use redis::{PushInfo, RedisError, Value}; @@ -1036,6 +1038,8 @@ fn test_search_by_index() -> Result<(), Error> { redis::cmd("FT.AGGREGATE") .arg("test:_content_prefix_sort_key_3e762c15") .arg("@content_prefix:{ Searchable }") + .arg("TIMEOUT") + .arg(10000_u64) .arg("LOAD") .arg(2) .arg("data") @@ -1126,7 +1130,7 @@ fn test_search_by_index_failure() -> Result<(), Error> { "Client: TEST - Client: unexpected command", "Error with ft_create in RedisStore::search_by_index_prefix(test:_content_prefix_sort_key_3e762c15)", "---", "Client: TEST - Client: unexpected command", "Error with second ft_aggregate in RedisStore::search_by_index_prefix(test:_content_prefix_sort_key_3e762c15)"].iter().map(ToString::to_string).collect())); assert!(logs_contain( - "Error calling ft.aggregate e=TEST - Client: unexpected command index=\"test:_content_prefix_sort_key_3e762c15\" query=\"*\" options=FtAggregateOptions { load: [\"data\", \"version\"], cursor: FtAggregateCursor { count: 1500, max_idle: 30000 }, sort_by: [\"@sort_key\"] } all_args=[\"FT.AGGREGATE\", \"test:_content_prefix_sort_key_3e762c15\", \"*\", \"LOAD\", \"2\", \"data\", \"version\", \"WITHCURSOR\", \"COUNT\", \"1500\", \"MAXIDLE\", \"30000\", \"SORTBY\", \"2\", \"@sort_key\", \"ASC\"]" + "Error calling ft.aggregate e=TEST - Client: unexpected command index=\"test:_content_prefix_sort_key_3e762c15\" query=\"*\" options=FtAggregateOptions { load: [\"data\", \"version\"], cursor: FtAggregateCursor { count: 1500, max_idle: 30000 }, sort_by: [\"@sort_key\"] } all_args=[\"FT.AGGREGATE\", \"test:_content_prefix_sort_key_3e762c15\", \"*\", \"TIMEOUT\", \"10000\", \"LOAD\", \"2\", \"data\", \"version\", \"WITHCURSOR\", \"COUNT\", \"1500\", \"MAXIDLE\", \"30000\", \"SORTBY\", \"2\", \"@sort_key\", \"ASC\"]" )); Ok(()) @@ -1139,6 +1143,8 @@ fn test_search_by_index_with_sort_key() -> Result<(), Error> { redis::cmd("FT.AGGREGATE") .arg("test:_content_prefix_sort_key_3e762c15") .arg("@content_prefix:{ Searchable }") + .arg("TIMEOUT") + .arg(10000_u64) .arg("LOAD") .arg(2) .arg("data") @@ -1222,6 +1228,8 @@ fn test_search_by_index_resp3() -> Result<(), Error> { redis::cmd("FT.AGGREGATE") .arg("test:_content_prefix_sort_key_3e762c15") .arg("@content_prefix:{ Searchable }") + .arg("TIMEOUT") + .arg(10000_u64) .arg("LOAD") .arg(2) .arg("data") @@ -1328,6 +1336,8 @@ fn test_search_by_index_skips_int_from_cursor_read() -> Result<(), Error> { redis::cmd("FT.AGGREGATE") .arg("test:_content_prefix_sort_key_3e762c15") .arg("@content_prefix:{ Searchable }") + .arg("TIMEOUT") + .arg(10000_u64) .arg("LOAD") .arg(2) .arg("data") @@ -1469,6 +1479,173 @@ async fn send_messages_to_subscription_channel() -> Result<(), Error> { Ok(()) } +// --------------------------------------------------------------------------- +// Regression tests for `RedisSubscription::Drop` map-desync. +// +// Pre-fix, `Drop` dropped its `watch::Receiver` *before* taking the +// `subscribed_keys` write lock. With two `RedisSubscription`s sharing +// a publisher (e.g. two `WaitExecution` clients on the same operation), +// both threads could race past the receiver-drop and contend for the +// lock holding stale `receiver_count() == 0` views — the loser would +// remove an entry that the winner had already removed (or, worse, +// remove an entry that a fresh `subscribe(same_key)` had already +// re-inserted). The visible symptom in production was a spurious +// log: +// "Key {key} was not found in subscribed keys when checking if it +// should be removed" +// firing whenever multiple watchers on a hot operation_id were torn +// down concurrently. The post-fix predicate ("count == 1 with my +// receiver still alive, evaluated under the write lock") makes the +// race structurally impossible. +// +// The post-fix `Drop` now reaches its warning path *only* if the +// `subscribed_keys` map gets mutated by a code path that doesn't take +// the lock — which is what we want it to flag, not the noise the +// pre-fix code generated. So these tests assert that the post-fix +// warning is silent under normal subscribe/drop traffic, including +// concurrent drops. + +/// Test key provider that just wraps a string. Reused across the +/// subscription regression tests below. +#[derive(Clone)] +struct TestSubKey(String); + +impl SchedulerStoreKeyProvider for TestSubKey { + type Versioned = FalseValue; + fn get_key(&self) -> StoreKey<'static> { + StoreKey::Str(std::borrow::Cow::Owned(self.0.clone())) + } +} + +/// Sanity: a single subscriber that drops cleanly produces no warning +/// and no error. +#[nativelink_test] +async fn redis_subscription_single_drop_is_silent() -> Result<(), Error> { + let (_tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let manager = RedisSubscriptionManager::new(rx); + + let sub = manager.subscribe(TestSubKey("solo-key".to_string()))?; + drop(sub); + sleep(Duration::from_millis(10)).await; + + assert!( + !logs_contain("key absent from subscribed_keys under write lock"), + "single-subscriber drop unexpectedly logged the absence warning", + ); + assert!(!logs_contain("ERROR")); + + drop(manager); + Ok(()) +} + +/// Two subscribers share a publisher. Dropping the first must leave +/// the publisher entry in place so the second's `changed()` keeps +/// firing on `notify_for_test`. Pre-fix this test was *also* clean +/// because the bug only fires on concurrent drops; this is the +/// non-racing baseline. +#[nativelink_test] +async fn redis_subscription_drop_one_of_two_keeps_publisher() -> Result<(), Error> { + let (_tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let manager = RedisSubscriptionManager::new(rx); + + let key = "shared-key"; + let sub_a = manager.subscribe(TestSubKey(key.to_string()))?; + let mut sub_b = manager.subscribe(TestSubKey(key.to_string()))?; + + // Drop the first; the second's subscription must still resolve + // when we notify on the same key. + drop(sub_a); + + manager.notify_for_test(key.to_string()); + timeout(Duration::from_secs(2), sub_b.changed()) + .await + .expect("sub_b.changed() did not fire — publisher entry was dropped prematurely")?; + + assert!( + !logs_contain("key absent from subscribed_keys under write lock"), + "absence warning fired during single drop with another receiver alive", + ); + drop(sub_b); + drop(manager); + Ok(()) +} + +/// **Race regression.** Two subscribers share a publisher; both are +/// dropped concurrently from separate tasks. Pre-fix this would +/// occasionally fire the absence warning (the receiver-drop happened +/// before the lock, so both threads' `receiver_count()` could read 0 +/// inside the lock, and the loser found the entry already removed). +/// We loop the scenario many times to keep the race window addressed +/// even on machines where the timing is favourable to the bug. +#[nativelink_test] +async fn redis_subscription_concurrent_drops_no_absence_warn() -> Result<(), Error> { + let (_tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let manager = RedisSubscriptionManager::new(rx); + + const ITERATIONS: usize = 200; + for i in 0..ITERATIONS { + let key = format!("race-key-{i}"); + let sub_a = manager.subscribe(TestSubKey(key.clone()))?; + let sub_b = manager.subscribe(TestSubKey(key.clone()))?; + + // `spawn_blocking` puts each Drop on its own thread; with the + // pre-fix code's "drop receiver, then take lock" sequence, + // both threads can race into the lock with already-decremented + // counts. With the post-fix "take lock, then evaluate, then + // drop receiver" sequence, the lock serialises the decision + // and the warning never fires. + let h_a = tokio::task::spawn_blocking(move || drop(sub_a)); + let h_b = tokio::task::spawn_blocking(move || drop(sub_b)); + h_a.await.unwrap(); + h_b.await.unwrap(); + } + sleep(Duration::from_millis(50)).await; + + assert!( + !logs_contain("key absent from subscribed_keys under write lock"), + "concurrent drops produced the absence warning at least once across {ITERATIONS} \ + iterations — the Drop ordering regressed", + ); + assert!(!logs_contain("ERROR")); + + drop(manager); + Ok(()) +} + +/// Subscribe → drop both → re-subscribe to the same key. Post-fix the +/// re-subscribe must create a fresh publisher (the old entry was +/// removed by the last drop). We confirm the fresh publisher is +/// connected by notifying and waiting for `changed()`. +#[nativelink_test] +async fn redis_subscription_resubscribe_after_drop_creates_fresh_publisher() -> Result<(), Error> { + let (_tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let manager = RedisSubscriptionManager::new(rx); + + let key = "cycle-key"; + let sub_a = manager.subscribe(TestSubKey(key.to_string()))?; + let sub_b = manager.subscribe(TestSubKey(key.to_string()))?; + drop(sub_a); + drop(sub_b); + + // Re-subscribe to the same key. If the previous drops left the + // map in an inconsistent state (stale publisher kept, or a + // partially-deconstructed entry), this either reuses a dead + // publisher (changed() never fires) or panics inside the + // patricia map. + let mut sub_c = manager.subscribe(TestSubKey(key.to_string()))?; + manager.notify_for_test(key.to_string()); + timeout(Duration::from_secs(2), sub_c.changed()) + .await + .expect("re-subscribe after drops produced a dead publisher")?; + + assert!(!logs_contain( + "key absent from subscribed_keys under write lock" + )); + drop(sub_c); + drop(manager); + Ok(()) +} + async fn core_test_update_data_unversioned_with_expiry(expire_response: i64) { let redis_span = info_span!("redis"); let mut responses = add_lua_version_script(fake_redis_stream()); diff --git a/nativelink-util/src/action_messages.rs b/nativelink-util/src/action_messages.rs index eaabc37e6..e10b34e02 100644 --- a/nativelink-util/src/action_messages.rs +++ b/nativelink-util/src/action_messages.rs @@ -843,6 +843,85 @@ impl From<&ActionStage> for execution_stage::Value { } } +/// `google.rpc.PreconditionFailure` (defined inline to avoid pulling +/// `tonic-types` and the additional proto-generated module). Bazel's +/// `RemoteExecutionService` reads this proto out of +/// `ExecuteResponse.status.details` for `FAILED_PRECONDITION` results +/// and, for `MISSING` violations, automatically re-uploads the named +/// blobs and retries the Execute call. Mirrors the schema in +/// `google/rpc/error_details.proto`. +mod precondition_failure { + #[derive(Clone, PartialEq, ::prost::Message)] + pub(super) struct PreconditionFailure { + #[prost(message, repeated, tag = "1")] + pub(super) violations: ::prost::alloc::vec::Vec, + } + #[derive(Clone, PartialEq, ::prost::Message)] + pub(super) struct Violation { + #[prost(string, tag = "1")] + pub(super) r#type: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub(super) subject: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub(super) description: ::prost::alloc::string::String, + } + pub(super) const TYPE_URL: &str = "type.googleapis.com/google.rpc.PreconditionFailure"; + pub(super) const VIOLATION_TYPE_MISSING: &str = "MISSING"; +} + +/// Marker substring emitted by `FastSlowStore::populate_and_maybe_stream` +/// when a digest is missing from both tiers. Matching on the string is +/// the only signal we have because `nativelink_error::Error` doesn't +/// carry typed metadata and the worker has already wrapped the inner +/// error with `make_err!(Code::FailedPrecondition, "{}", ...)` by the +/// time we see it here. +const MISSING_BLOB_MARKER: &str = "not found in either fast or slow store"; + +/// Best-effort parse of a `(hash, size)` pair out of a missing-blob +/// error message. The format is `Object {hash}-{size} not found in +/// either fast or slow store ...`. Returns `None` if the message +/// doesn't match — caller falls back to the plain `Error -> Status` +/// conversion. +fn extract_missing_blob_digest(err: &Error) -> Option<(String, i64)> { + for msg in &err.messages { + if !msg.contains(MISSING_BLOB_MARKER) { + continue; + } + let after_object = msg.find("Object ").map(|i| &msg[i + "Object ".len()..])?; + let digest_str = after_object.split(" not found").next()?; + let (hash, size) = digest_str.rsplit_once('-')?; + let size: i64 = size.parse().ok()?; + return Some((hash.to_string(), size)); + } + None +} + +/// Build a `google.rpc.Status` of code `FAILED_PRECONDITION` whose +/// `details` carry a `google.rpc.PreconditionFailure` listing the +/// missing CAS blob. Bazel uses this exact shape to drive automatic +/// re-upload + retry of the missing blob. +fn missing_blob_failed_precondition_status(err: &Error, hash: &str, size: i64) -> Status { + let pf = precondition_failure::PreconditionFailure { + violations: vec![precondition_failure::Violation { + r#type: precondition_failure::VIOLATION_TYPE_MISSING.to_string(), + // REv2-mandated subject format for missing-blob violations. + subject: format!("blobs/{hash}/{size}"), + description: err.message_string(), + }], + }; + let mut buf: Vec = Vec::with_capacity(pf.encoded_len()); + pf.encode(&mut buf).unwrap_or(()); + let any = Any { + type_url: precondition_failure::TYPE_URL.to_string(), + value: buf, + }; + Status { + code: Code::FailedPrecondition as i32, + message: err.message_string(), + details: vec![any], + } +} + pub fn to_execute_response(action_result: ActionResult) -> ExecuteResponse { fn logs_from(server_logs: HashMap) -> HashMap { let mut logs = HashMap::with_capacity(server_logs.len()); @@ -858,11 +937,26 @@ pub fn to_execute_response(action_result: ActionResult) -> ExecuteResponse { logs } + // If the action failed because a CAS blob is missing — most often a + // `Directory` proto in the input tree (the Execute pre-check only + // validates the top-level Action, command_digest, and + // input_root_digest; nested Directories are fetched lazily by the + // worker) — surface the failure as `FAILED_PRECONDITION` with a + // `PreconditionFailure` detail naming the digest. Bazel sees the + // detail, re-uploads the missing blob, and retries automatically; + // without the detail it gives up and the build fails. let status = Some( action_result .error .clone() - .map_or_else(Status::default, Into::into), + .map(|err| { + if let Some((hash, size)) = extract_missing_blob_digest(&err) { + missing_blob_failed_precondition_status(&err, &hash, size) + } else { + err.into() + } + }) + .unwrap_or_default(), ); let message = action_result.message.clone(); ExecuteResponse { diff --git a/nativelink-util/src/connection_manager.rs b/nativelink-util/src/connection_manager.rs index eaa5d0d99..f92772a63 100644 --- a/nativelink-util/src/connection_manager.rs +++ b/nativelink-util/src/connection_manager.rs @@ -22,7 +22,7 @@ use futures::Future; use futures::stream::{FuturesUnordered, StreamExt, unfold}; use nativelink_config::stores::Retry; use nativelink_error::{Code, Error, make_err}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; use tonic::transport::{Channel, Endpoint, channel}; use tracing::{debug, error, info, warn}; @@ -95,8 +95,30 @@ struct ConnectionManagerWorker { endpoints: Vec<(ConnectionIndex, Endpoint)>, /// The channel used to communicate between a Connection and the worker. connection_tx: mpsc::UnboundedSender, - /// The number of connections that are currently allowed to be made. - available_connections: usize, + /// Semaphore that gates the maximum number of in-flight `Connection` + /// objects. + /// + /// **Why a semaphore and not a `usize` counter.** Earlier versions + /// tracked permits with `available_connections: usize`, decrementing + /// in `provide_channel` and re-incrementing on `ConnectionRequest::Dropped`. + /// In production we observed the counter underflowing + /// (`u64::MAX − N`) while `waiting_connections` climbed unbounded — a + /// permit-leak whose surface symptom was the worker process growing + /// memory until the kernel `OOMKilled` it (exit 137), with the + /// in-flight action stranded as `Executing` in Redis. The exact + /// leaking code path was hard to pin down because the protocol *looks* + /// balanced on paper (one decrement per `provide_channel`, one + /// increment per `Dropped` event from `Connection::drop`); somewhere + /// in the worker / tonic interaction, a `Dropped` was occasionally + /// not being delivered or processed. + /// + /// `tokio::sync::Semaphore`'s `OwnedSemaphorePermit` makes leakage + /// **structurally impossible**: the permit is acquired in + /// `provide_channel`, handed to the `Connection` as a private field, + /// and released exactly once when the `Connection` is dropped (Rust's + /// RAII guarantee). No code path — panic, task cancellation, dropped + /// receiver, anything — can release the permit twice or leak it. + semaphore: Arc, /// Channels that are currently being connected. connecting_channels: FuturesUnordered + Send>>>, /// Connected channels that are available for use. @@ -136,14 +158,22 @@ impl ConnectionManager { .collect(); if max_concurrent_requests == 0 { - max_concurrent_requests = usize::MAX; + // `Semaphore::MAX_PERMITS` is `usize::MAX >> 3` in tokio; + // any value at or above is treated by `Semaphore` as + // effectively unbounded for our purposes. + max_concurrent_requests = Semaphore::MAX_PERMITS; + } else { + // Defensive cap: callers shouldn't be passing values larger + // than `MAX_PERMITS`, but if they do, clamp rather than + // panic. + max_concurrent_requests = max_concurrent_requests.min(Semaphore::MAX_PERMITS); } if connections_per_endpoint == 0 { connections_per_endpoint = 1; } let worker = ConnectionManagerWorker { endpoints, - available_connections: max_concurrent_requests, + semaphore: Arc::new(Semaphore::new(max_concurrent_requests)), connection_tx, connecting_channels: FuturesUnordered::new(), available_channels: VecDeque::new(), @@ -309,49 +339,75 @@ impl ConnectionManagerWorker { // This must never be made async otherwise the select may cancel it. fn handle_worker(&mut self, reason: String, tx: oneshot::Sender) { - if let Some(channel) = (self.available_connections > 0) - .then_some(()) - .and_then(|()| self.available_channels.pop_front()) + // Permit is acquired here so the count is held by the + // `Connection` that gets handed back to the requester. RAII on + // the permit means we don't have to track this in our own + // counter and there is no path that can leak it. + let maybe_permit = self.semaphore.clone().try_acquire_owned().ok(); + if let Some(permit) = maybe_permit + && let Some(channel) = self.available_channels.pop_front() { debug!(reason, "ConnectionManager: request running"); - self.provide_channel(channel, tx); + self.provide_channel(channel, tx, permit); } else { debug!( - available_connections = self.available_connections, + available_permits = self.semaphore.available_permits(), available_channels = self.available_channels.len(), waiting_connections = self.waiting_connections.len(), reason, "ConnectionManager: no connection available, request queued", ); self.waiting_connections.push_back((reason, tx)); + // `maybe_permit` (if any) is dropped here, releasing the + // permit back to the semaphore — we only consume it when we + // successfully matched it with a channel. } } - fn provide_channel(&mut self, channel: EstablishedChannel, tx: oneshot::Sender) { - // We decrement here because we create Connection, this will signal when - // it is Dropped and therefore increment this again. - self.available_connections -= 1; + fn provide_channel( + &mut self, + channel: EstablishedChannel, + tx: oneshot::Sender, + permit: OwnedSemaphorePermit, + ) { + // The permit lives on the `Connection`. When the `Connection` + // is dropped (normal completion, transport error, oneshot + // receiver gone — any path), the permit is released to the + // semaphore exactly once. No code in this struct touches the + // permit count directly. drop(tx.send(Connection { tx: self.connection_tx.clone(), pending_channel: Some(channel.channel.clone()), channel, + _permit: permit, })); } fn maybe_available_connection(&mut self) { - while self.available_connections > 0 - && !self.waiting_connections.is_empty() - && !self.available_channels.is_empty() - { - if let Some(channel) = self.available_channels.pop_front() { - if let Some((reason, tx)) = self.waiting_connections.pop_front() { - debug!(reason, "ConnectionManager: channel available, running"); - self.provide_channel(channel, tx); - } else { - // This should never happen, but better than an unwrap. - self.available_channels.push_front(channel); - } - } + // Pump as many waiting requests as we have (permit, channel) + // pairs available. Each loop iteration must acquire a fresh + // permit so it can be moved into the resulting `Connection`. + while !self.waiting_connections.is_empty() && !self.available_channels.is_empty() { + let Some(permit) = self.semaphore.clone().try_acquire_owned().ok() else { + // No more permits — leave the request queued; future + // permit releases will re-trigger this loop via the + // `Dropped` path. + break; + }; + let Some(channel) = self.available_channels.pop_front() else { + drop(permit); + break; + }; + let Some((reason, tx)) = self.waiting_connections.pop_front() else { + // This should never happen given the loop guard, but + // putting the channel back rather than dropping it + // matches the previous defensive behaviour. + self.available_channels.push_front(channel); + drop(permit); + break; + }; + debug!(reason, "ConnectionManager: channel available, running"); + self.provide_channel(channel, tx, permit); } } @@ -359,10 +415,15 @@ impl ConnectionManagerWorker { fn handle_connection(&mut self, request: ConnectionRequest) { match request { ConnectionRequest::Dropped(maybe_channel) => { + // The Connection's `_permit` was released when the + // Connection dropped (RAII), *before* this message was + // processed — so by the time we get here a permit is + // already available. We just need to put any + // outstanding pending-channel back into the pool and + // try to wake a waiter. if let Some(channel) = maybe_channel { self.available_channels.push_back(channel); } - self.available_connections += 1; self.maybe_available_connection(); } ConnectionRequest::Connected(channel) => { @@ -394,7 +455,8 @@ impl ConnectionManagerWorker { /// re-connecting the underlying channel on error. It depends on users /// reporting all errors. /// NOTE: This should never be cloneable because its lifetime is linked to the -/// `ConnectionManagerWorker::available_connections`. +/// semaphore permit it carries — `_permit` is released exactly once, +/// when the `Connection` drops. #[derive(Debug)] pub struct Connection { /// Communication with `ConnectionManagerWorker` to inform about transport @@ -406,6 +468,13 @@ pub struct Connection { pending_channel: Option, /// The identifier to send to `tx`. channel: EstablishedChannel, + /// Semaphore permit gating the maximum number of concurrent + /// `Connection` objects. Held purely for its `Drop` side effect + /// (releases the permit back to the worker's semaphore). Never + /// read; the leading underscore is the conventional marker. + /// Replaces the prior manual `available_connections: usize` + /// counter that was observed underflowing in production. + _permit: OwnedSemaphorePermit, } impl Drop for Connection { diff --git a/nativelink-util/src/health_utils.rs b/nativelink-util/src/health_utils.rs index ebb4d1abf..c0a3a6b64 100644 --- a/nativelink-util/src/health_utils.rs +++ b/nativelink-util/src/health_utils.rs @@ -201,30 +201,45 @@ pub trait HealthStatusReporter { /// Health status reporter implementation for the health registry that provides a stream /// of health status descriptions. +/// +/// Indicator checks run **in parallel**: each indicator's +/// `check_health` does real I/O against its store (write/has/read +/// roundtrip per [`StoreDriver::check_health`]), so iterating +/// indicators serially makes the total response time `N * timeout`. +/// Under load that easily exceeds Kubernetes liveness probe budgets, +/// causing kubelet to kill an otherwise-healthy pod whose only sin +/// was that one congested store made the probe handler queue behind +/// it. Parallel execution caps the total at ~`timeout` regardless of +/// how many stores are registered. impl HealthStatusReporter for HealthRegistry { fn health_status_report( &self, timeout_limit: &Duration, ) -> Pin + Send + '_>> { - let local_timeout_limit = Arc::new(*timeout_limit); + let local_timeout_limit = *timeout_limit; Box::pin( - futures::stream::iter( - self.indicators - .iter() - .zip(core::iter::repeat(local_timeout_limit)), - ) - .then(|((namespace, indicator), internal_timeout)| async move { - let status_res = - timeout(*internal_timeout, indicator.check_health(namespace.clone())).await; - HealthStatusDescription { - namespace: namespace.clone(), - status: status_res.unwrap_or_else(|_| { - let struct_name = indicator.struct_name(); - warn!(struct_name, "Timeout during health check"); - HealthStatus::Timeout { struct_name } - }), - } - }), + futures::stream::iter(self.indicators.iter().map( + move |(namespace, indicator)| async move { + let status_res = timeout( + local_timeout_limit, + indicator.check_health(namespace.clone()), + ) + .await; + HealthStatusDescription { + namespace: namespace.clone(), + status: status_res.unwrap_or_else(|_| { + let struct_name = indicator.struct_name(); + warn!(struct_name, "Timeout during health check"); + HealthStatus::Timeout { struct_name } + }), + } + }, + )) + // Drive every indicator's check concurrently rather than + // serially. The order of the resulting descriptions is + // not part of the API contract; collect-into-Vec callers + // already ignore order. + .buffer_unordered(usize::MAX), ) } } diff --git a/nativelink-util/tests/connection_manager_test.rs b/nativelink-util/tests/connection_manager_test.rs new file mode 100644 index 000000000..f8ae581e7 --- /dev/null +++ b/nativelink-util/tests/connection_manager_test.rs @@ -0,0 +1,260 @@ +// Copyright 2026 The NativeLink Authors. All rights reserved. +// +// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// See LICENSE file for details +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Tests for `ConnectionManager`'s permit accounting. +//! +//! The bug these tests exist to prevent: in production we observed +//! `available_connections: 18446744073709551589` (`u64::MAX − 26`) while +//! `waiting_connections` climbed unbounded, ultimately killing the worker +//! process via `OOMKilled` (exit 137). Switching from a manual `usize` +//! counter to `Arc` with `OwnedSemaphorePermit` makes the leak +//! structurally impossible — these tests pin that property by exercising +//! the full request-acquire-release cycle through the public API many +//! times over a tight permit budget. With a leak, the cycle eventually +//! blocks forever; without one, every iteration completes inside the +//! per-call timeout. + +use core::pin::Pin; +use core::time::Duration; +use std::sync::Arc; + +use nativelink_config::stores::Retry; +use nativelink_error::Error; +use nativelink_macro::nativelink_test; +use nativelink_proto::google::bytestream::byte_stream_server::{ByteStream, ByteStreamServer}; +use nativelink_proto::google::bytestream::{ + QueryWriteStatusRequest, QueryWriteStatusResponse, ReadRequest, ReadResponse, WriteRequest, + WriteResponse, +}; +use nativelink_util::background_spawn; +use nativelink_util::connection_manager::ConnectionManager; +use pretty_assertions::assert_eq; +use tokio::time::timeout; +use tokio_stream::Stream; +use tonic::transport::server::TcpIncoming; +use tonic::transport::{Endpoint, Server}; +use tonic::{Request, Response, Status, Streaming}; + +/// Trivial `ByteStream` service that errors on every method. We don't +/// actually call it — its sole purpose is to give the `tonic::Server` +/// an inner `Service` so `serve_with_incoming` accepts the listener. +/// `tonic::Channel`'s `poll_ready` only requires that the underlying +/// HTTP/2 connection can be established and stay open, which a vanilla +/// tonic Server with any service registered satisfies. +#[derive(Clone)] +struct FakeByteStream; + +#[tonic::async_trait] +impl ByteStream for FakeByteStream { + type ReadStream = Pin> + Send + 'static>>; + + async fn read( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("fake")) + } + + async fn write( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("fake")) + } + + async fn query_write_status( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("fake")) + } +} + +/// Stand up a tonic Server on `127.0.0.1:0` with the trivial +/// `FakeByteStream` registered. Returns an `Endpoint` pointing at it, +/// which `ConnectionManager` will drive through its full +/// `provide_channel` / `Connection` lifecycle paths. +async fn fake_grpc_server_endpoint() -> Endpoint { + let listener = TcpIncoming::bind("127.0.0.1:0".parse().unwrap()).unwrap(); + let port = listener.local_addr().unwrap().port(); + background_spawn!("connection_manager_test_server", async move { + Server::builder() + .add_service(ByteStreamServer::new(FakeByteStream)) + .serve_with_incoming(listener) + .await + .unwrap(); + }); + Endpoint::from_shared(format!("http://127.0.0.1:{port}")).unwrap() +} + +/// Identity jitter so retry timing stays predictable in tests. +fn no_jitter() -> Arc Duration + Send + Sync> { + Arc::new(|d| d) +} + +/// Loop the request-acquire-drop cycle far more times than the permit +/// budget. Pre-fix, `available_connections` would underflow within the +/// first few iterations under any error path; post-fix, the +/// `OwnedSemaphorePermit` released by `Connection::drop` keeps the budget +/// balanced indefinitely. A 5-second per-call timeout is the failure +/// signal: the test fails if any single acquire blocks beyond it. +/// +/// `connections_per_endpoint` matches `MAX_CONCURRENT` so the **permit** +/// is the gate being tested, not the channel pool. +#[nativelink_test] +async fn permits_released_on_drop_no_leak() -> Result<(), Error> { + const MAX_CONCURRENT: usize = 2; + const ITERATIONS: usize = 100; + + let endpoint = fake_grpc_server_endpoint().await; + let cm = ConnectionManager::new( + vec![endpoint], + /* connections_per_endpoint = */ MAX_CONCURRENT, + MAX_CONCURRENT, + Retry::default(), + no_jitter(), + ); + + for i in 0..ITERATIONS { + let c1 = timeout(Duration::from_secs(5), cm.connection(format!("iter-{i}-a"))) + .await + .unwrap_or_else(|_| panic!("iter {i}: first acquire blocked >5s — permit leak"))?; + let c2 = timeout(Duration::from_secs(5), cm.connection(format!("iter-{i}-b"))) + .await + .unwrap_or_else(|_| panic!("iter {i}: second acquire blocked >5s — permit leak"))?; + drop(c1); + drop(c2); + } + + Ok(()) +} + +/// When the requester drops the future returned by `connection(...)` +/// *before* the worker has handed the `Connection` over, the underlying +/// `oneshot::Sender::send` returns `Err(Connection)` inside +/// `provide_channel`. Pre-fix that path was particularly brittle: it +/// decremented the counter and depended on the synchronously-generated +/// `Connection::drop` → `ConnectionRequest::Dropped` round-trip +/// reaching the worker's queue *and* getting processed. Post-fix the +/// `OwnedSemaphorePermit` releases the moment the local `Connection` +/// goes out of scope on the worker's stack — no inter-task message +/// required. We verify that permit availability returns to baseline +/// even when caller futures are aborted in flight. +#[nativelink_test] +async fn aborted_caller_future_does_not_leak_permits() -> Result<(), Error> { + const MAX_CONCURRENT: usize = 2; + + let endpoint = fake_grpc_server_endpoint().await; + let cm = Arc::new(ConnectionManager::new( + vec![endpoint], + /* connections_per_endpoint = */ MAX_CONCURRENT, + MAX_CONCURRENT, + Retry::default(), + no_jitter(), + )); + + // Round 1: spawn `MAX_CONCURRENT * 5` acquire futures and abort + // them while still holding their `Connection`. Each task that + // managed to take a permit must release it when its task is + // aborted (Connection::drop runs on field drop). The queued tasks + // (those that hadn't taken a permit yet) drop their oneshot + // receiver, and when the worker later tries to deliver to them the + // tx.send fails and the un-deliverable Connection is dropped on + // the worker's stack — releasing its permit too. + let mut handles = Vec::new(); + for i in 0..(MAX_CONCURRENT * 5) { + let cm = Arc::clone(&cm); + handles.push(tokio::spawn(async move { + // Bind to `_conn` (not `_`) so the Connection lives until + // task abort; bare `let _ = ...` would drop it immediately + // and defeat the test. + let _conn = cm.connection(format!("aborted-{i}")).await; + futures::future::pending::<()>().await + })); + } + // Give the worker time to populate the channel pool, drain the + // request queue, and let the first MAX_CONCURRENT tasks acquire + // permits. + tokio::time::sleep(Duration::from_millis(100)).await; + for h in handles { + h.abort(); + } + // Yield long enough for the abort-driven drops to propagate the + // permit release back to the semaphore, *and* for the worker to + // process the cascade of `Dropped` messages that drain stale + // entries from `waiting_connections`. + tokio::time::sleep(Duration::from_millis(500)).await; + + // Round 2: full-budget acquire. Pre-fix the round-1 aborts could + // leave the counter underflowed or saturated, and round-2 acquires + // would block forever; post-fix every permit is back. + let c1 = timeout(Duration::from_secs(5), cm.connection("post-abort-a".into())) + .await + .expect("post-abort acquire 1 blocked >5s — permit leak")?; + let c2 = timeout(Duration::from_secs(5), cm.connection("post-abort-b".into())) + .await + .expect("post-abort acquire 2 blocked >5s — permit leak")?; + drop(c1); + drop(c2); + Ok(()) +} + +/// `MAX_CONCURRENT` simultaneous holders is the steady-state ceiling. +/// One more request must queue (rather than be served by an underflowed +/// counter, which is exactly what production was doing). We confirm +/// queuing by making the extra request race a 200 ms timeout against +/// the drop of one held connection: only after the drop does the queued +/// request resolve. +/// +/// `connections_per_endpoint = MAX_CONCURRENT + 1` so the channel pool +/// is *not* the bottleneck — we want to prove the **permit** is what +/// blocks the third request, not channel availability. +#[nativelink_test] +async fn extra_request_above_max_blocks_until_a_release() -> Result<(), Error> { + const MAX_CONCURRENT: usize = 2; + + let endpoint = fake_grpc_server_endpoint().await; + let cm = Arc::new(ConnectionManager::new( + vec![endpoint], + MAX_CONCURRENT + 1, + MAX_CONCURRENT, + Retry::default(), + no_jitter(), + )); + + let c1 = cm.connection("hold-1".into()).await?; + let c2 = cm.connection("hold-2".into()).await?; + + // Third request must be queued — racing it against a short timeout + // proves it doesn't resolve while permits are exhausted. + let cm_for_third = Arc::clone(&cm); + let third = tokio::spawn(async move { cm_for_third.connection("queued-3".into()).await }); + tokio::time::sleep(Duration::from_millis(200)).await; + assert_eq!( + third.is_finished(), + false, + "third connection resolved while permits were exhausted", + ); + + // Drop one held permit; the queued request should now resolve. + drop(c1); + let c3 = timeout(Duration::from_secs(5), third) + .await + .expect("queued request did not resolve within 5s of permit release") + .unwrap()?; + + drop(c2); + drop(c3); + Ok(()) +} diff --git a/nativelink-worker/tests/local_worker_test.rs b/nativelink-worker/tests/local_worker_test.rs index 23199e988..1a2b83661 100644 --- a/nativelink-worker/tests/local_worker_test.rs +++ b/nativelink-worker/tests/local_worker_test.rs @@ -434,6 +434,7 @@ async fn new_local_worker_creates_work_directory_test() -> Result<(), Error> { slow: StoreSpec::Memory(MemorySpec::default()), fast_direction: StoreDirection::default(), slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: 0, }, Store::new( ::new(&FilesystemSpec { @@ -475,6 +476,7 @@ async fn new_local_worker_removes_work_directory_before_start_test() -> Result<( slow: StoreSpec::Memory(MemorySpec::default()), fast_direction: StoreDirection::default(), slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: 0, }, Store::new( ::new(&FilesystemSpec { diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs index ab6a91f9c..413eedb8c 100644 --- a/nativelink-worker/tests/running_actions_manager_test.rs +++ b/nativelink-worker/tests/running_actions_manager_test.rs @@ -137,6 +137,7 @@ mod tests { slow: StoreSpec::Memory(slow_config), fast_direction: StoreDirection::default(), slow_direction: StoreDirection::default(), + bypass_dedup_threshold_bytes: 0, }, Store::new(fast_store.clone()), Store::new(slow_store.clone()),