diff --git a/crates/pet/src/jsonrpc.rs b/crates/pet/src/jsonrpc.rs index 8e4703ac..9ba97787 100644 --- a/crates/pet/src/jsonrpc.rs +++ b/crates/pet/src/jsonrpc.rs @@ -38,7 +38,7 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use serde_json::{self, Value}; use std::collections::BTreeMap; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; use std::{ ops::Deref, @@ -233,6 +233,8 @@ struct RefreshExecution { result: RefreshResult, perf: RefreshPerformance, reporter: Arc, + configuration: Arc>, + refresh_generation: u64, conda_locator: Arc, poetry_locator: Arc, conda_executable: Option, @@ -312,15 +314,112 @@ fn finish_refresh_errors(completion_guard: &mut RefreshCompletionGuard<'_>, mess } } +fn sync_refresh_locator_state_if_current( + configuration: &RwLock, + refresh_generation: u64, + sync: F, +) -> Result<(), u64> +where + F: FnOnce(), +{ + let state = configuration.read().unwrap(); + if state.generation != refresh_generation { + return Err(state.generation); + } + + sync(); + Ok(()) +} + +struct GenerationGuardedReporter { + reporter: Arc, + configuration: Arc>, + refresh_generation: u64, +} + +impl GenerationGuardedReporter { + fn new( + reporter: Arc, + configuration: Arc>, + refresh_generation: u64, + ) -> Self { + Self { + reporter, + configuration, + refresh_generation, + } + } + + fn report_if_current(&self, report: F, on_stale: S) + where + F: FnOnce(&dyn Reporter), + S: FnOnce(), + { + let state = self.configuration.read().unwrap(); + if state.generation == self.refresh_generation { + report(self.reporter.as_ref()); + return; + } + + drop(state); + on_stale(); + } +} + +impl Reporter for GenerationGuardedReporter { + fn report_manager(&self, manager: &pet_core::manager::EnvManager) { + self.report_if_current( + |reporter| reporter.report_manager(manager), + || { + trace!( + "Skipping manager notification for stale generation {}", + self.refresh_generation + ) + }, + ); + } + + fn report_environment(&self, env: &PythonEnvironment) { + self.report_if_current( + |reporter| reporter.report_environment(env), + || { + trace!( + "Skipping environment notification for stale generation {}: {:?}", + self.refresh_generation, + env.executable + .clone() + .unwrap_or(env.prefix.clone().unwrap_or_default()) + ) + }, + ); + } + + fn report_telemetry(&self, event: &TelemetryEvent) { + self.report_if_current( + |reporter| reporter.report_telemetry(event), + || { + trace!( + "Skipping telemetry notification for stale generation {}: {:?}", + self.refresh_generation, + event + ) + }, + ); + } +} + pub struct Context { - configuration: RwLock, + configuration: Arc>, locators: Arc>>, conda_locator: Arc, os_environment: Arc, refresh_coordinator: RefreshCoordinator, } -static MISSING_ENVS_REPORTED: AtomicBool = AtomicBool::new(false); +const MISSING_ENVS_AVAILABLE: u64 = u64::MAX; +const MISSING_ENVS_COMPLETED: u64 = u64::MAX - 1; + +static MISSING_ENVS_REPORTING_STATE: AtomicU64 = AtomicU64::new(MISSING_ENVS_AVAILABLE); pub fn start_jsonrpc_server() { // Initialize tracing for performance profiling (controlled by RUST_LOG env var) @@ -335,7 +434,7 @@ pub fn start_jsonrpc_server() { let context = Context { locators: create_locators(conda_locator.clone(), poetry_locator.clone(), &environment), conda_locator, - configuration: RwLock::new(ConfigurationState::default()), + configuration: Arc::new(RwLock::new(ConfigurationState::default())), os_environment: Arc::new(environment), refresh_coordinator: RefreshCoordinator::default(), }; @@ -560,15 +659,81 @@ fn refresh_state_sync_scope(search_scope: Option<&SearchScope>) -> RefreshStateS } } +fn is_current_generation( + configuration: &RwLock, + refresh_generation: u64, +) -> bool { + configuration.read().unwrap().generation == refresh_generation +} + +fn try_begin_missing_env_reporting( + configuration: &RwLock, + refresh_generation: u64, +) -> bool { + loop { + let current_state = MISSING_ENVS_REPORTING_STATE.load(Ordering::Acquire); + if current_state == MISSING_ENVS_COMPLETED { + return false; + } + if current_state != MISSING_ENVS_AVAILABLE && current_state >= refresh_generation { + return false; + } + + if MISSING_ENVS_REPORTING_STATE + .compare_exchange( + current_state, + refresh_generation, + Ordering::AcqRel, + Ordering::Acquire, + ) + .is_ok() + { + if is_current_generation(configuration, refresh_generation) { + return true; + } + + release_missing_env_reporting_if_stale(configuration, refresh_generation); + return false; + } + } +} + +fn release_missing_env_reporting_if_stale( + configuration: &RwLock, + refresh_generation: u64, +) { + if !is_current_generation(configuration, refresh_generation) { + let _ = MISSING_ENVS_REPORTING_STATE.compare_exchange( + refresh_generation, + MISSING_ENVS_AVAILABLE, + Ordering::AcqRel, + Ordering::Acquire, + ); + } +} + +fn complete_missing_env_reporting(refresh_generation: u64) { + let _ = MISSING_ENVS_REPORTING_STATE.compare_exchange( + refresh_generation, + MISSING_ENVS_COMPLETED, + Ordering::AcqRel, + Ordering::Acquire, + ); +} + fn execute_refresh( context: &Context, refresh_options: &RefreshOptions, configuration_state: &ConfigurationState, ) -> RefreshExecution { let refresh_locators = create_refresh_locators(context.os_environment.deref()); - let reporter = Arc::new(CacheReporter::new(Arc::new(jsonrpc::create_reporter( - refresh_options.search_kind, - )))); + let reporter = Arc::new(CacheReporter::new(Arc::new( + GenerationGuardedReporter::new( + Arc::new(jsonrpc::create_reporter(refresh_options.search_kind)), + context.configuration.clone(), + configuration_state.generation, + ), + ))); let (config, search_scope) = build_refresh_config(refresh_options, configuration_state.config.clone()); @@ -608,12 +773,24 @@ fn execute_refresh( trace!("Finished refreshing environments in {:?}", summary.total); // Refresh runs on a transient locator graph, so apply each locator's refresh-state - // contract back into the long-lived shared locator graph. - sync_refresh_locator_state( - context.locators.as_ref(), - refresh_locators.locators.as_ref(), - search_scope.as_ref(), - ); + // contract back into the long-lived shared locator graph only if the generation + // still matches the configuration snapshot this refresh started with. + if let Err(current_generation) = sync_refresh_locator_state_if_current( + context.configuration.as_ref(), + configuration_state.generation, + || { + sync_refresh_locator_state( + context.locators.as_ref(), + refresh_locators.locators.as_ref(), + search_scope.as_ref(), + ); + }, + ) { + warn!( + "Skipping refresh state sync for stale generation {} because current generation is {}", + configuration_state.generation, current_generation + ); + } let perf = RefreshPerformance { total: summary.total.as_millis(), @@ -635,6 +812,8 @@ fn execute_refresh( result: RefreshResult::new(summary.total), perf, reporter, + configuration: context.configuration.clone(), + refresh_generation: configuration_state.generation, conda_locator: refresh_locators.conda_locator, poetry_locator: refresh_locators.poetry_locator, conda_executable: configuration_state.config.conda_executable.clone(), @@ -647,24 +826,36 @@ fn report_refresh_follow_up(execution: RefreshExecution) { .reporter .report_telemetry(&TelemetryEvent::RefreshPerformance(execution.perf)); - if MISSING_ENVS_REPORTED - .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .ok() - .unwrap_or_default() - { + if try_begin_missing_env_reporting( + execution.configuration.as_ref(), + execution.refresh_generation, + ) { let conda_locator = execution.conda_locator.clone(); let conda_executable = execution.conda_executable.clone(); - let reporter_ref = execution.reporter.clone(); - thread::spawn(move || { - conda_locator.find_and_report_missing_envs(reporter_ref.as_ref(), conda_executable); - Some(()) - }); - let poetry_locator = execution.poetry_locator.clone(); let poetry_executable = execution.poetry_executable.clone(); let reporter_ref = execution.reporter.clone(); + let configuration = execution.configuration.clone(); + let refresh_generation = execution.refresh_generation; thread::spawn(move || { + if !is_current_generation(configuration.as_ref(), refresh_generation) { + release_missing_env_reporting_if_stale(configuration.as_ref(), refresh_generation); + return Some(()); + } + + conda_locator.find_and_report_missing_envs(reporter_ref.as_ref(), conda_executable); + if !is_current_generation(configuration.as_ref(), refresh_generation) { + release_missing_env_reporting_if_stale(configuration.as_ref(), refresh_generation); + return Some(()); + } + poetry_locator.find_and_report_missing_envs(reporter_ref.as_ref(), poetry_executable); + if is_current_generation(configuration.as_ref(), refresh_generation) { + complete_missing_env_reporting(refresh_generation); + } else { + release_missing_env_reporting_if_stale(configuration.as_ref(), refresh_generation); + } + Some(()) }); } @@ -710,10 +901,6 @@ pub fn handle_refresh(context: Arc, id: u32, params: Value) { &context.refresh_coordinator, &refresh_key, ); - send_refresh_replies_for_waiters( - &completion_guard, - &refresh_result, - ); finish_refresh_replies(&mut completion_guard, &refresh_result); report_refresh_follow_up(execution); } @@ -929,18 +1116,16 @@ pub(crate) fn build_refresh_config( if let Some(ref search_paths) = refresh_options.search_paths { // Clear workspace directories when explicit search paths are provided. config.workspace_directories = None; - // Expand any glob patterns in the search paths - let expanded_paths = expand_glob_patterns(search_paths); // These workspace folders are only for this refresh. config.workspace_directories = Some( - expanded_paths + search_paths .iter() .filter(|p| p.is_dir()) .cloned() .collect(), ); config.executables = Some( - expanded_paths + search_paths .iter() .filter(|p| p.is_file()) .cloned() @@ -961,12 +1146,59 @@ pub(crate) fn build_refresh_config( mod tests { use super::*; use pet_conda::manager::CondaManager; + use pet_core::manager::EnvManager; use pet_core::manager::EnvManagerType; use pet_core::RefreshStatePersistence; use std::path::PathBuf; use std::sync::mpsc; + use std::sync::Mutex; use std::thread; + #[derive(Default)] + struct RecordingReporter { + environments: Mutex>, + managers: Mutex>, + telemetry: Mutex>, + } + + static MISSING_ENVS_TEST_LOCK: Mutex<()> = Mutex::new(()); + + struct LockCheckingReporter { + configuration: Arc>, + reported: Mutex, + } + + impl Reporter for RecordingReporter { + fn report_manager(&self, manager: &EnvManager) { + self.managers.lock().unwrap().push(manager.clone()); + } + + fn report_environment(&self, env: &PythonEnvironment) { + self.environments.lock().unwrap().push(env.clone()); + } + + fn report_telemetry(&self, event: &TelemetryEvent) { + self.telemetry.lock().unwrap().push(event.clone()); + } + } + + impl Reporter for LockCheckingReporter { + fn report_manager(&self, _manager: &EnvManager) { + assert!(self.configuration.try_write().is_err()); + *self.reported.lock().unwrap() = true; + } + + fn report_environment(&self, _env: &PythonEnvironment) { + assert!(self.configuration.try_write().is_err()); + *self.reported.lock().unwrap() = true; + } + + fn report_telemetry(&self, _event: &TelemetryEvent) { + assert!(self.configuration.try_write().is_err()); + *self.reported.lock().unwrap() = true; + } + } + fn make_refresh_key(generation: u64, options: RefreshOptions) -> RefreshKey { RefreshKey::new(&options, generation) } @@ -1010,6 +1242,136 @@ mod tests { ); } + #[test] + fn test_sync_refresh_locator_state_if_current_matches_generation() { + let configuration = RwLock::new(ConfigurationState { + generation: 4, + config: Configuration::default(), + }); + let mut synced = false; + + let result = sync_refresh_locator_state_if_current(&configuration, 4, || { + assert!(configuration.try_write().is_err()); + synced = true; + }); + + assert!(result.is_ok()); + assert!(synced); + } + + #[test] + fn test_generation_guarded_reporter_drops_stale_notifications() { + let configuration = Arc::new(RwLock::new(ConfigurationState { + generation: 1, + config: Configuration::default(), + })); + let inner = Arc::new(RecordingReporter::default()); + let reporter = GenerationGuardedReporter::new(inner.clone(), configuration.clone(), 1); + + let environment = PythonEnvironment::new( + Some(PathBuf::from("/tmp/python")), + Some(PythonEnvironmentKind::Venv), + Some(PathBuf::from("/tmp")), + None, + Some("3.11.0".to_string()), + ); + let manager = EnvManager { + executable: PathBuf::from("/tmp/conda"), + version: Some("24.1.0".to_string()), + tool: EnvManagerType::Conda, + }; + let telemetry = TelemetryEvent::RefreshPerformance(RefreshPerformance { + total: 1, + locators: BTreeMap::new(), + breakdown: BTreeMap::new(), + }); + + reporter.report_environment(&environment); + reporter.report_manager(&manager); + reporter.report_telemetry(&telemetry); + + assert_eq!(inner.environments.lock().unwrap().len(), 1); + assert_eq!(inner.managers.lock().unwrap().len(), 1); + assert_eq!(inner.telemetry.lock().unwrap().len(), 1); + + configuration.write().unwrap().generation = 2; + + reporter.report_environment(&environment); + reporter.report_manager(&manager); + reporter.report_telemetry(&telemetry); + + assert_eq!(inner.environments.lock().unwrap().len(), 1); + assert_eq!(inner.managers.lock().unwrap().len(), 1); + assert_eq!(inner.telemetry.lock().unwrap().len(), 1); + } + + #[test] + fn test_generation_guarded_reporter_holds_lock_while_reporting() { + let configuration = Arc::new(RwLock::new(ConfigurationState { + generation: 7, + config: Configuration::default(), + })); + let inner = Arc::new(LockCheckingReporter { + configuration: configuration.clone(), + reported: Mutex::new(false), + }); + let reporter = GenerationGuardedReporter::new(inner.clone(), configuration, 7); + + reporter.report_telemetry(&TelemetryEvent::RefreshPerformance(RefreshPerformance { + total: 1, + locators: BTreeMap::new(), + breakdown: BTreeMap::new(), + })); + + assert!(*inner.reported.lock().unwrap()); + } + + #[test] + fn test_stale_generation_does_not_begin_missing_env_reporting() { + let _guard = MISSING_ENVS_TEST_LOCK.lock().unwrap(); + MISSING_ENVS_REPORTING_STATE.store(MISSING_ENVS_AVAILABLE, Ordering::Release); + let configuration = RwLock::new(ConfigurationState { + generation: 2, + config: Configuration::default(), + }); + + assert!(!try_begin_missing_env_reporting(&configuration, 1)); + assert_eq!( + MISSING_ENVS_REPORTING_STATE.load(Ordering::Acquire), + MISSING_ENVS_AVAILABLE + ); + } + + #[test] + fn test_stale_generation_releases_missing_env_reporting_slot() { + let _guard = MISSING_ENVS_TEST_LOCK.lock().unwrap(); + MISSING_ENVS_REPORTING_STATE.store(2, Ordering::Release); + let configuration = RwLock::new(ConfigurationState { + generation: 3, + config: Configuration::default(), + }); + + release_missing_env_reporting_if_stale(&configuration, 2); + + assert_eq!( + MISSING_ENVS_REPORTING_STATE.load(Ordering::Acquire), + MISSING_ENVS_AVAILABLE + ); + } + + #[test] + fn test_newer_generation_can_claim_missing_env_reporting_after_older_reservation() { + let _guard = MISSING_ENVS_TEST_LOCK.lock().unwrap(); + MISSING_ENVS_REPORTING_STATE.store(1, Ordering::Release); + let configuration = RwLock::new(ConfigurationState { + generation: 2, + config: Configuration::default(), + }); + + assert!(try_begin_missing_env_reporting(&configuration, 2)); + assert_eq!(MISSING_ENVS_REPORTING_STATE.load(Ordering::Acquire), 2); + } + #[test] fn test_refresh_coordinator_joins_identical_requests() { let coordinator = RefreshCoordinator::default(); @@ -1246,6 +1608,22 @@ mod tests { assert!(!shared.environments.contains_key(&fresh_env_path)); } + #[test] + fn test_stale_generation_does_not_sync_refresh_state() { + let configuration = RwLock::new(ConfigurationState { + generation: 2, + config: Configuration::default(), + }); + let mut synced = false; + + let result = sync_refresh_locator_state_if_current(&configuration, 1, || { + synced = true; + }); + + assert_eq!(result, Err(2)); + assert!(!synced); + } + #[test] fn test_refresh_coordinator_does_not_join_different_generations() { let coordinator = RefreshCoordinator::default(); @@ -1496,4 +1874,28 @@ mod tests { "search_scope should be None when no options provided" ); } + + #[test] + fn test_search_paths_use_already_canonicalized_paths() { + let temp_dir = tempfile::tempdir().unwrap(); + let workspace_dir = temp_dir.path().join("workspace"); + let executable = temp_dir.path().join("python.exe"); + std::fs::create_dir(&workspace_dir).unwrap(); + std::fs::write(&executable, b"python").unwrap(); + + let config = Configuration::default(); + let refresh_options = RefreshOptions { + search_kind: None, + search_paths: Some(vec![workspace_dir.clone(), executable.clone()]), + }; + + let (result_config, search_scope) = build_refresh_config(&refresh_options, config); + + assert_eq!( + result_config.workspace_directories, + Some(vec![workspace_dir]) + ); + assert_eq!(result_config.executables, Some(vec![executable])); + assert!(matches!(search_scope, Some(SearchScope::Workspace))); + } }