diff --git a/crates/pet-virtualenvwrapper/src/environments.rs b/crates/pet-virtualenvwrapper/src/environments.rs index 3a4ef582..69ef4702 100644 --- a/crates/pet-virtualenvwrapper/src/environments.rs +++ b/crates/pet-virtualenvwrapper/src/environments.rs @@ -167,7 +167,7 @@ mod tests { #[test] fn get_project_reads_existing_project_path_from_project_file() { - let project_root = create_test_dir("project-root"); + let project_root = fs::canonicalize(create_test_dir("project-root")).unwrap(); let prefix = create_test_dir("wrapped-env"); let executable = create_virtualenv(&prefix); fs::write( diff --git a/crates/pet/src/jsonrpc.rs b/crates/pet/src/jsonrpc.rs index 4313c67a..f85c5eb7 100644 --- a/crates/pet/src/jsonrpc.rs +++ b/crates/pet/src/jsonrpc.rs @@ -531,7 +531,7 @@ const GLOB_EXPANSION_WARN_THRESHOLD: Duration = Duration::from_secs(5); pub fn handle_configure(context: Arc, id: u32, params: Value) { match serde_json::from_value::(params.clone()) { - Ok(configure_options) => { + Ok(mut configure_options) => { info!("Received configure request"); // Start in a new thread, we can have multiple requests. thread::spawn(move || { @@ -539,33 +539,37 @@ pub fn handle_configure(context: Arc, id: u32, params: Value) { // Expand glob patterns before acquiring the write lock so we // don't block readers/writers while traversing the filesystem. - let workspace_directories = configure_options.workspace_directories.map(|dirs| { - let start = Instant::now(); - let result: Vec = expand_glob_patterns(&dirs) - .into_iter() - .filter(|p| p.is_dir()) - .collect(); - trace!( - "Expanded workspace directory patterns ({:?}) in {:?}", - dirs, - start.elapsed() - ); - result - }); - let environment_directories = - configure_options.environment_directories.map(|dirs| { + let workspace_directories = + configure_options.workspace_directories.take().map(|dirs| { let start = Instant::now(); let result: Vec = expand_glob_patterns(&dirs) .into_iter() .filter(|p| p.is_dir()) .collect(); trace!( - "Expanded environment directory patterns ({:?}) in {:?}", + "Expanded workspace directory patterns ({:?}) in {:?}", dirs, start.elapsed() ); result }); + let environment_directories = + configure_options + .environment_directories + .take() + .map(|dirs| { + let start = Instant::now(); + let result: Vec = expand_glob_patterns(&dirs) + .into_iter() + .filter(|p| p.is_dir()) + .collect(); + trace!( + "Expanded environment directory patterns ({:?}) in {:?}", + dirs, + start.elapsed() + ); + result + }); let glob_elapsed = now.elapsed(); trace!("Glob expansion completed in {:?}", glob_elapsed); if glob_elapsed >= GLOB_EXPANSION_WARN_THRESHOLD { @@ -575,40 +579,28 @@ pub fn handle_configure(context: Arc, id: u32, params: Value) { ); } - let config = { - let mut state = context.configuration.write().unwrap(); - state.config.workspace_directories = workspace_directories; - state.config.conda_executable = configure_options.conda_executable; - state.config.environment_directories = environment_directories; - state.config.pipenv_executable = configure_options.pipenv_executable; - state.config.poetry_executable = configure_options.poetry_executable; - // We will not support changing the cache directories once set. - // No point, supporting such a use case. - if let Some(cache_directory) = configure_options.cache_directory { - set_cache_directory(cache_directory.clone()); - state.config.cache_directory = Some(cache_directory); - } - state.generation += 1; - // Reset missing-env reporting so that the next refresh - // after reconfiguration can trigger it again (Fixes #395). - // Done inside the write lock to avoid a TOCTOU window with - // concurrent refresh threads reading the generation. - MISSING_ENVS_REPORTING_STATE.store(MISSING_ENVS_AVAILABLE, Ordering::Release); - trace!( - "Configuring locators with generation {}: {:?}", - state.generation, - state.config - ); - state.config.clone() - }; - configure_locators(&context.locators, &config); + if let Err(message) = apply_configure_options( + context.configuration.as_ref(), + &context.locators, + configure_options, + workspace_directories, + environment_directories, + ) { + error!("Configure failed: {message}"); + send_error(Some(id), -4, message); + return; + } info!("Configure completed in {:?}", now.elapsed()); send_reply(id, None::<()>); }); } Err(e) => { - send_reply(id, None::); error!("Failed to parse configure options {:?}: {}", params, e); + send_error( + Some(id), + -4, + format!("Failed to parse configure options {params:?}: {e}"), + ); } } } @@ -665,6 +657,120 @@ fn parse_refresh_options(params: Value) -> Result, + locators: &Arc>>, + configure_options: ConfigureOptions, + workspace_directories: Option>, + environment_directories: Option>, +) -> Result<(), String> { + let mut state = configuration.write().unwrap(); + let mut next_config = state.config.clone(); + let next_generation = state.generation + 1; + + next_config.workspace_directories = workspace_directories; + next_config.conda_executable = configure_options.conda_executable; + next_config.environment_directories = environment_directories; + next_config.pipenv_executable = configure_options.pipenv_executable; + next_config.poetry_executable = configure_options.poetry_executable; + // We will not support changing the cache directories once set. + // No point, supporting such a use case. + let cache_directory = configure_options.cache_directory; + if let Some(cache_directory) = cache_directory.clone() { + next_config.cache_directory = Some(cache_directory); + } + + trace!( + "Configuring locators with generation {}: {:?}", + next_generation, + next_config + ); + + let mut configured_locator_count = 0; + for locator in locators.iter() { + if let Err(panic_payload) = panic::catch_unwind(AssertUnwindSafe(|| { + locator.configure(&next_config); + })) { + let panic_message = panic_payload_message(panic_payload.as_ref()); + error!( + "Locator configuration panicked for generation {}: {}", + next_generation, panic_message + ); + rollback_locator_config( + locators, + &state.config, + next_generation, + configured_locator_count + 1, + ); + return Err(format!( + "Locator configuration failed for generation {next_generation}: {panic_message}" + )); + } + configured_locator_count += 1; + } + + if let Some(cache_directory) = cache_directory { + if let Err(panic_payload) = panic::catch_unwind(AssertUnwindSafe(|| { + set_cache_directory(cache_directory); + })) { + let panic_message = panic_payload_message(panic_payload.as_ref()); + error!( + "Cache directory configuration panicked for generation {}: {}", + next_generation, panic_message + ); + rollback_locator_config( + locators, + &state.config, + next_generation, + configured_locator_count, + ); + return Err(format!( + "Cache directory configuration failed for generation {next_generation}: {panic_message}" + )); + } + } + state.config = next_config; + state.generation = next_generation; + // Reset missing-env reporting so that the next refresh after + // reconfiguration can trigger it again (Fixes #395). Done inside the write + // lock to avoid a TOCTOU window with concurrent refresh threads reading the + // generation. + MISSING_ENVS_REPORTING_STATE.store(MISSING_ENVS_AVAILABLE, Ordering::Release); + + Ok(()) +} + +fn rollback_locator_config( + locators: &Arc>>, + previous_config: &Configuration, + failed_generation: u64, + configured_locator_count: usize, +) { + if let Err(panic_payload) = panic::catch_unwind(AssertUnwindSafe(|| { + for locator in locators.iter().take(configured_locator_count) { + locator.configure(previous_config); + } + })) { + error!( + "Rollback after failed locator configuration for generation {} also panicked: {}. Aborting process to avoid continuing with inconsistent locator state.", + failed_generation, + panic_payload_message(panic_payload.as_ref()) + ); + std::process::abort(); + } +} + +fn panic_payload_message(panic_payload: &(dyn std::any::Any + Send)) -> String { + if let Some(message) = panic_payload.downcast_ref::<&str>() { + return (*message).to_string(); + } + if let Some(message) = panic_payload.downcast_ref::() { + return message.clone(); + } + + "unknown panic payload".to_string() +} + fn configure_locators(locators: &Arc>>, config: &Configuration) { for locator in locators.iter() { locator.configure(config); @@ -1244,6 +1350,20 @@ mod tests { reported: Mutex, } + struct BlockingConfigureLocator { + started: mpsc::Sender<()>, + release: Mutex>, + configured_workspace_directories: Mutex>>, + } + + struct PanicConfigureLocator { + configured_workspace_directories: Mutex>>, + } + + struct RecordingConfigureLocator { + configured_workspace_directories: Mutex>>, + } + impl Reporter for RecordingReporter { fn report_manager(&self, manager: &EnvManager) { self.managers.lock().unwrap().push(manager.clone()); @@ -1275,6 +1395,74 @@ mod tests { } } + impl Locator for BlockingConfigureLocator { + fn get_kind(&self) -> LocatorKind { + LocatorKind::Venv + } + + fn configure(&self, config: &Configuration) { + self.started.send(()).unwrap(); + self.release.lock().unwrap().recv().unwrap(); + *self.configured_workspace_directories.lock().unwrap() = + config.workspace_directories.clone(); + } + + fn supported_categories(&self) -> Vec { + vec![PythonEnvironmentKind::Venv] + } + + fn try_from(&self, _env: &pet_core::env::PythonEnv) -> Option { + None + } + + fn find(&self, _reporter: &dyn Reporter) {} + } + + impl Locator for PanicConfigureLocator { + fn get_kind(&self) -> LocatorKind { + LocatorKind::Venv + } + + fn configure(&self, config: &Configuration) { + *self.configured_workspace_directories.lock().unwrap() = + config.workspace_directories.clone(); + if config.workspace_directories.is_some() || config.conda_executable.is_some() { + panic!("configure boom"); + } + } + + fn supported_categories(&self) -> Vec { + vec![PythonEnvironmentKind::Venv] + } + + fn try_from(&self, _env: &pet_core::env::PythonEnv) -> Option { + None + } + + fn find(&self, _reporter: &dyn Reporter) {} + } + + impl Locator for RecordingConfigureLocator { + fn get_kind(&self) -> LocatorKind { + LocatorKind::Venv + } + + fn configure(&self, config: &Configuration) { + *self.configured_workspace_directories.lock().unwrap() = + config.workspace_directories.clone(); + } + + fn supported_categories(&self) -> Vec { + vec![PythonEnvironmentKind::Venv] + } + + fn try_from(&self, _env: &pet_core::env::PythonEnv) -> Option { + None + } + + fn find(&self, _reporter: &dyn Reporter) {} + } + fn make_refresh_key(generation: u64, options: RefreshOptions) -> RefreshKey { RefreshKey::new(&options, generation) } @@ -1402,6 +1590,141 @@ mod tests { assert!(*inner.reported.lock().unwrap()); } + #[test] + fn test_configure_publishes_state_after_shared_locators_are_configured() { + let configuration = Arc::new(RwLock::new(ConfigurationState::default())); + let (started_tx, started_rx) = mpsc::channel(); + let (release_tx, release_rx) = mpsc::channel(); + let (done_tx, done_rx) = mpsc::channel(); + let locator = Arc::new(BlockingConfigureLocator { + started: started_tx, + release: Mutex::new(release_rx), + configured_workspace_directories: Mutex::new(None), + }); + let locators = Arc::new(vec![locator.clone() as Arc]); + let workspace_directories = vec![PathBuf::from("/workspace")]; + + let worker = { + let configuration = configuration.clone(); + let locators = locators.clone(); + let workspace_directories = workspace_directories.clone(); + thread::spawn(move || { + apply_configure_options( + configuration.as_ref(), + &locators, + ConfigureOptions { + workspace_directories: None, + conda_executable: None, + pipenv_executable: None, + poetry_executable: None, + environment_directories: None, + cache_directory: None, + }, + Some(workspace_directories), + None, + ) + .unwrap(); + done_tx.send(()).unwrap(); + }) + }; + + started_rx.recv_timeout(Duration::from_secs(5)).unwrap(); + assert!(configuration.try_read().is_err()); + + release_tx.send(()).unwrap(); + done_rx.recv_timeout(Duration::from_secs(5)).unwrap(); + worker.join().unwrap(); + + let state = configuration.read().unwrap(); + assert_eq!(state.generation, 1); + assert_eq!( + state.config.workspace_directories, + Some(workspace_directories.clone()) + ); + assert_eq!( + *locator.configured_workspace_directories.lock().unwrap(), + Some(workspace_directories) + ); + } + + #[test] + fn test_configure_panic_does_not_publish_state_or_poison_lock() { + let configuration = RwLock::new(ConfigurationState::default()); + let panic_locator = Arc::new(PanicConfigureLocator { + configured_workspace_directories: Mutex::new(None), + }); + let locators = Arc::new(vec![panic_locator.clone() as Arc]); + let workspace_directories = vec![PathBuf::from("/workspace")]; + + let result = apply_configure_options( + &configuration, + &locators, + ConfigureOptions { + workspace_directories: None, + conda_executable: Some(PathBuf::from("/configured/conda")), + pipenv_executable: None, + poetry_executable: None, + environment_directories: None, + cache_directory: None, + }, + Some(workspace_directories), + None, + ); + + assert!(result.unwrap_err().contains("configure boom")); + let state = configuration.read().unwrap(); + assert_eq!(state.generation, 0); + assert!(state.config.workspace_directories.is_none()); + assert!(state.config.conda_executable.is_none()); + assert!(panic_locator + .configured_workspace_directories + .lock() + .unwrap() + .is_none()); + } + + #[test] + fn test_configure_panic_rolls_back_previously_configured_locators() { + let configuration = RwLock::new(ConfigurationState::default()); + let recording_locator = Arc::new(RecordingConfigureLocator { + configured_workspace_directories: Mutex::new(None), + }); + let panic_locator = Arc::new(PanicConfigureLocator { + configured_workspace_directories: Mutex::new(None), + }); + let locators = Arc::new(vec![ + recording_locator.clone() as Arc, + panic_locator.clone() as Arc, + ]); + + let result = apply_configure_options( + &configuration, + &locators, + ConfigureOptions { + workspace_directories: None, + conda_executable: None, + pipenv_executable: None, + poetry_executable: None, + environment_directories: None, + cache_directory: None, + }, + Some(vec![PathBuf::from("/workspace")]), + None, + ); + + assert!(result.is_err()); + assert!(recording_locator + .configured_workspace_directories + .lock() + .unwrap() + .is_none()); + assert!(panic_locator + .configured_workspace_directories + .lock() + .unwrap() + .is_none()); + } + #[test] fn test_stale_generation_does_not_begin_missing_env_reporting() { let _guard = MISSING_ENVS_TEST_LOCK.lock().unwrap();