From 9758ca2afd240a11b115337f188da00d58dd8f0f Mon Sep 17 00:00:00 2001 From: Akash Raj Date: Mon, 5 Jan 2026 16:58:59 +0530 Subject: [PATCH] Fix deadlock with concurrent identical actions Address issue #2001 where workers deadlock when numerous identical actions run simultaneously. Root causes and fixes: 1. File permit exhaustion: download_to_directory used unbounded parallelism. Added MAX_CONCURRENT_FILE_OPS=64 with buffer_unordered. 2. Race condition: create_and_add_action checked for duplicates AFTER async work. Added early registration with placeholder BEFORE async work. Fixes #2001 --- .../src/running_actions_manager.rs | 92 +++++++++++++++---- 1 file changed, 72 insertions(+), 20 deletions(-) diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index df312e01f..0f6d1e877 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -91,6 +91,15 @@ const EXIT_CODE_FOR_SIGNAL: i32 = 9; const DEFAULT_HISTORICAL_RESULTS_STRATEGY: UploadCacheResultsStrategy = UploadCacheResultsStrategy::FailuresOnly; +/// Maximum number of concurrent file operations during input download. +/// This prevents deadlock from file permit exhaustion when many identical +/// actions run simultaneously (see issue #2001). +/// +/// The value is chosen to balance parallelism with permit availability: +/// - High enough to maintain good download performance +/// - Low enough to leave headroom for other concurrent operations +const MAX_CONCURRENT_FILE_OPS: usize = 64; + /// Valid string reasons for a failure. /// Note: If these change, the documentation should be updated. #[derive(Debug, Deserialize)] @@ -110,9 +119,12 @@ struct SideChannelInfo { failure: Option, } -/// Aggressively download the digests of files and make a local folder from it. This function -/// will spawn unbounded number of futures to try and get these downloaded. The store itself -/// should be rate limited if spawning too many requests at once is an issue. +/// Download the digests of files and make a local folder from them. +/// +/// This function limits concurrent file operations to `MAX_CONCURRENT_FILE_OPS` +/// to prevent deadlock from file permit exhaustion when many identical actions +/// run simultaneously (see issue #2001). +/// /// We require the `FilesystemStore` to be the `fast` store of `FastSlowStore`. This is for /// efficiency reasons. We will request the `FastSlowStore` to populate the entry then we will /// assume the `FilesystemStore` has the file available immediately after and hardlink the file @@ -130,7 +142,10 @@ pub fn download_to_directory<'a>( let directory = get_and_decode_digest::(cas_store, digest.into()) .await .err_tip(|| "Converting digest to Directory")?; - let mut futures = FuturesUnordered::new(); + + // Collect all futures into a Vec first, then process with bounded concurrency + // to prevent file permit exhaustion deadlock (issue #2001) + let mut all_futures: Vec>> = Vec::new(); for file in directory.files { let digest: DigestInfo = file @@ -147,7 +162,7 @@ pub fn download_to_directory<'a>( if file.is_executable { unix_mode = Some(unix_mode.unwrap_or(0o444) | 0o111); } - futures.push( + all_futures.push( cas_store .populate_fast_store(digest.into()) .and_then(move |()| async move { @@ -160,7 +175,6 @@ pub fn download_to_directory<'a>( .get_file_entry_for_digest(&digest) .await .err_tip(|| "During hard link")?; - // TODO: add a test for #2051: deadlock with large number of files let src_path = file_entry.get_file_path_locked(|src| async move { Ok(PathBuf::from(src)) }).await?; fs::hard_link(&src_path, &dest) .await @@ -224,7 +238,7 @@ pub fn download_to_directory<'a>( .try_into() .err_tip(|| "In Directory::file::digest")?; let new_directory_path = format!("{}/{}", current_directory, directory.name); - futures.push( + all_futures.push( async move { fs::create_dir(&new_directory_path) .await @@ -246,7 +260,7 @@ pub fn download_to_directory<'a>( #[cfg(target_family = "unix")] for symlink_node in directory.symlinks { let dest = format!("{}/{}", current_directory, symlink_node.name); - futures.push( + all_futures.push( async move { fs::symlink(&symlink_node.target, &dest).await.err_tip(|| { format!( @@ -260,7 +274,15 @@ pub fn download_to_directory<'a>( ); } - while futures.try_next().await?.is_some() {} + // Process all futures with bounded concurrency to prevent permit exhaustion + // This is the key fix for issue #2001: by limiting concurrent file operations, + // we prevent all permits from being exhausted simultaneously when many identical + // actions are running. + futures::stream::iter(all_futures) + .buffer_unordered(MAX_CONCURRENT_FILE_OPS) + .try_collect::>() + .await?; + Ok(()) } .boxed() @@ -2132,8 +2154,43 @@ impl RunningActionsManager for RunningActionsManagerImpl { .queued_timestamp .and_then(|time| time.try_into().ok()) .unwrap_or(SystemTime::UNIX_EPOCH); - let operation_id = start_execute + let operation_id: OperationId = start_execute .operation_id.as_str().into(); + + // EARLY REGISTRATION: Prevent race condition (issue #2001) + // Register a placeholder BEFORE doing any async work to ensure + // duplicate operations are rejected immediately. This prevents + // the scenario where two identical actions slip through because + // both pass the duplicate check before either registers. + { + let mut running_actions = self.running_actions.lock(); + if let Some(existing_weak) = running_actions.get(&operation_id) { + if existing_weak.upgrade().is_some() { + return Err(make_err!( + Code::AlreadyExists, + "Action with operation_id {} is already running", + operation_id + )); + } + } + // Insert placeholder to prevent duplicates during async work + running_actions.insert(operation_id.clone(), Weak::new()); + } + + // Cleanup placeholder on error using a guard + let operation_id_for_cleanup = operation_id.clone(); + let self_for_cleanup = self.clone(); + let cleanup_guard = guard((), move |()| { + // Remove placeholder if we fail before creating the real action + let mut running_actions = self_for_cleanup.running_actions.lock(); + if let Some(weak) = running_actions.get(&operation_id_for_cleanup) { + // Only remove if it's still our placeholder (Weak::new()) + if weak.upgrade().is_none() { + running_actions.remove(&operation_id_for_cleanup); + } + } + }); + let action_info = self.create_action_info(start_execute, queued_timestamp).await?; debug!( ?action_info, @@ -2175,18 +2232,13 @@ impl RunningActionsManager for RunningActionsManagerImpl { timeout, self.clone(), )); + + // Defuse the cleanup guard - we succeeded, so don't remove placeholder + ScopeGuard::into_inner(cleanup_guard); + + // Replace placeholder with real action reference { let mut running_actions = self.running_actions.lock(); - // Check if action already exists and is still alive - if let Some(existing_weak) = running_actions.get(&operation_id) { - if let Some(_existing_action) = existing_weak.upgrade() { - return Err(make_err!( - Code::AlreadyExists, - "Action with operation_id {} is already running", - operation_id - )); - } - } running_actions.insert(operation_id, Arc::downgrade(&running_action)); } Ok(running_action)