diff --git a/crates/pet-conda/src/lib.rs b/crates/pet-conda/src/lib.rs index 61a311d9..a444d388 100644 --- a/crates/pet-conda/src/lib.rs +++ b/crates/pet-conda/src/lib.rs @@ -16,7 +16,7 @@ use pet_core::{ os_environment::Environment, python_environment::{PythonEnvironment, PythonEnvironmentKind}, reporter::Reporter, - Locator, LocatorKind, + Locator, LocatorKind, RefreshStatePersistence, RefreshStateSyncScope, }; use pet_fs::path::norm_case; use rayon::prelude::*; @@ -216,11 +216,39 @@ impl Locator for Conda { fn get_kind(&self) -> LocatorKind { LocatorKind::Conda } - fn configure(&self, config: &pet_core::Configuration) { - if let Some(ref conda_exe) = config.conda_executable { - let mut conda_executable = self.conda_executable.write().unwrap(); - conda_executable.replace(conda_exe.clone()); + fn refresh_state(&self) -> RefreshStatePersistence { + RefreshStatePersistence::SyncedDiscoveryState + } + fn sync_refresh_state_from(&self, source: &dyn Locator, scope: &RefreshStateSyncScope) { + let source = source.as_any().downcast_ref::().unwrap_or_else(|| { + panic!("attempted to sync Conda state from {:?}", source.get_kind()) + }); + + match scope { + RefreshStateSyncScope::Full => {} + RefreshStateSyncScope::GlobalFiltered(kind) + if self.supported_categories().contains(kind) => {} + RefreshStateSyncScope::GlobalFiltered(_) | RefreshStateSyncScope::Workspace => { + return; + } } + + self.environments.clear(); + self.environments + .insert_many(source.environments.clone_map()); + + self.managers.clear(); + self.managers.insert_many(source.managers.clone_map()); + + self.mamba_managers.clear(); + self.mamba_managers + .insert_many(source.mamba_managers.clone_map()); + } + fn configure(&self, config: &pet_core::Configuration) { + self.conda_executable + .write() + .unwrap() + .clone_from(&config.conda_executable); } fn supported_categories(&self) -> Vec { vec![PythonEnvironmentKind::Conda] diff --git a/crates/pet-core/src/lib.rs b/crates/pet-core/src/lib.rs index c3cffdf2..9829131f 100644 --- a/crates/pet-core/src/lib.rs +++ b/crates/pet-core/src/lib.rs @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -use std::path::PathBuf; +use std::{any::Any, path::PathBuf}; use env::PythonEnv; use manager::EnvManager; @@ -61,7 +61,26 @@ pub enum LocatorKind { WindowsStore, } -pub trait Locator: Send + Sync { +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RefreshStatePersistence { + /// The locator keeps no mutable state across requests. + Stateless, + /// The locator keeps configured inputs only; refresh must not copy them back. + ConfiguredOnly, + /// The locator keeps cache-like state, but later requests can repopulate it on demand. + SelfHydratingCache, + /// The locator keeps refresh-discovered state that later requests depend on for correctness. + SyncedDiscoveryState, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RefreshStateSyncScope { + Full, + GlobalFiltered(PythonEnvironmentKind), + Workspace, +} + +pub trait Locator: Any + Send + Sync { /// Returns the name of the locator. fn get_kind(&self) -> LocatorKind; /// Configures the locator with the given configuration. @@ -100,6 +119,21 @@ pub trait Locator: Send + Sync { fn configure(&self, _config: &Configuration) { // } + /// Describes what mutable state, if any, must survive a refresh boundary. + /// + /// Refresh runs execute against transient locator graphs and then invoke + /// `sync_refresh_state_from()` on the long-lived shared locators. + fn refresh_state(&self) -> RefreshStatePersistence { + RefreshStatePersistence::Stateless + } + /// Copies correctness-critical post-refresh state from a transient locator into this + /// long-lived shared locator. + /// + /// Override this only when `refresh_state()` returns + /// `RefreshStatePersistence::SyncedDiscoveryState`. + fn sync_refresh_state_from(&self, _source: &dyn Locator, _scope: &RefreshStateSyncScope) { + // + } /// Returns a list of supported categories for this locator. fn supported_categories(&self) -> Vec; /// Given a Python executable, and some optional data like prefix, @@ -112,3 +146,9 @@ pub trait Locator: Send + Sync { /// Finds all environments specific to this locator. fn find(&self, reporter: &dyn Reporter); } + +impl dyn Locator { + pub fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/crates/pet-linux-global-python/src/lib.rs b/crates/pet-linux-global-python/src/lib.rs index 1b0297e2..67f23759 100644 --- a/crates/pet-linux-global-python/src/lib.rs +++ b/crates/pet-linux-global-python/src/lib.rs @@ -15,7 +15,7 @@ use pet_core::{ env::PythonEnv, python_environment::{PythonEnvironment, PythonEnvironmentBuilder, PythonEnvironmentKind}, reporter::Reporter, - Locator, LocatorKind, + Locator, LocatorKind, RefreshStatePersistence, }; use pet_fs::path::resolve_symlink; use pet_python_utils::{env::ResolvedPythonEnv, executable::find_executables}; @@ -62,6 +62,9 @@ impl Locator for LinuxGlobalPython { fn get_kind(&self) -> LocatorKind { LocatorKind::LinuxGlobal } + fn refresh_state(&self) -> RefreshStatePersistence { + RefreshStatePersistence::SelfHydratingCache + } fn supported_categories(&self) -> Vec { vec![PythonEnvironmentKind::LinuxGlobal] } diff --git a/crates/pet-pipenv/src/lib.rs b/crates/pet-pipenv/src/lib.rs index 4bc91fc4..12ceb192 100644 --- a/crates/pet-pipenv/src/lib.rs +++ b/crates/pet-pipenv/src/lib.rs @@ -11,7 +11,7 @@ use pet_core::LocatorKind; use pet_core::{ python_environment::{PythonEnvironment, PythonEnvironmentBuilder, PythonEnvironmentKind}, reporter::Reporter, - Configuration, Locator, + Configuration, Locator, RefreshStatePersistence, }; use pet_fs::path::norm_case; use pet_python_utils::executable::find_executables; @@ -418,10 +418,15 @@ impl Locator for PipEnv { LocatorKind::PipEnv } + fn refresh_state(&self) -> RefreshStatePersistence { + RefreshStatePersistence::ConfiguredOnly + } + fn configure(&self, config: &Configuration) { - if let Some(exe) = &config.pipenv_executable { - self.pipenv_executable.write().unwrap().replace(exe.clone()); - } + self.pipenv_executable + .write() + .unwrap() + .clone_from(&config.pipenv_executable); } fn supported_categories(&self) -> Vec { diff --git a/crates/pet-poetry/src/lib.rs b/crates/pet-poetry/src/lib.rs index d6db7909..54c37081 100644 --- a/crates/pet-poetry/src/lib.rs +++ b/crates/pet-poetry/src/lib.rs @@ -11,7 +11,8 @@ use pet_core::{ os_environment::Environment, python_environment::{PythonEnvironment, PythonEnvironmentKind}, reporter::Reporter, - Configuration, Locator, LocatorKind, LocatorResult, + Configuration, Locator, LocatorKind, LocatorResult, RefreshStatePersistence, + RefreshStateSyncScope, }; use pet_virtualenv::is_virtualenv; use regex::Regex; @@ -137,12 +138,48 @@ impl Poetry { } } fn clear(&self) { - self.poetry_executable.write().unwrap().take(); self.search_result.write().unwrap().take(); } pub fn from(environment: &dyn Environment) -> Poetry { Poetry::new(environment) } + + pub fn sync_search_result_from(&self, source: &Poetry) { + let search_result = source.search_result.read().unwrap().clone(); + self.search_result + .write() + .unwrap() + .clone_from(&search_result); + } + + pub fn merge_search_result_from(&self, source: &Poetry) { + let source_search_result = source.search_result.read().unwrap().clone(); + let Some(source_search_result) = source_search_result else { + return; + }; + + let mut merged = self + .search_result + .read() + .unwrap() + .clone() + .unwrap_or(LocatorResult { + managers: vec![], + environments: vec![], + }); + merged.managers.extend(source_search_result.managers); + merged.managers.sort(); + merged.managers.dedup(); + + merged + .environments + .extend(source_search_result.environments); + merged.environments.sort(); + merged.environments.dedup(); + + self.search_result.write().unwrap().replace(merged); + } + fn find_with_cache(&self) -> Option { // First check if we have cached results { @@ -217,17 +254,39 @@ impl Locator for Poetry { fn get_kind(&self) -> LocatorKind { LocatorKind::Poetry } + fn refresh_state(&self) -> RefreshStatePersistence { + RefreshStatePersistence::SyncedDiscoveryState + } + fn sync_refresh_state_from(&self, source: &dyn Locator, scope: &RefreshStateSyncScope) { + let source = source.as_any().downcast_ref::().unwrap_or_else(|| { + panic!( + "attempted to sync Poetry state from {:?}", + source.get_kind() + ) + }); + match scope { + RefreshStateSyncScope::Full => self.sync_search_result_from(source), + RefreshStateSyncScope::GlobalFiltered(kind) + if self.supported_categories().contains(kind) => + { + self.sync_search_result_from(source) + } + RefreshStateSyncScope::Workspace => self.merge_search_result_from(source), + RefreshStateSyncScope::GlobalFiltered(_) => {} + } + } fn configure(&self, config: &Configuration) { + let mut ws_dirs = self.workspace_directories.write().unwrap(); + ws_dirs.clear(); if let Some(workspace_directories) = &config.workspace_directories { - let mut ws_dirs = self.workspace_directories.write().unwrap(); - ws_dirs.clear(); if !workspace_directories.is_empty() { ws_dirs.extend(workspace_directories.clone()); } } - if let Some(exe) = &config.poetry_executable { - self.poetry_executable.write().unwrap().replace(exe.clone()); - } + self.poetry_executable + .write() + .unwrap() + .clone_from(&config.poetry_executable); } fn supported_categories(&self) -> Vec { @@ -294,3 +353,137 @@ impl Locator for Poetry { } } } + +#[cfg(test)] +mod tests { + use super::*; + use pet_core::os_environment::EnvironmentApi; + + #[test] + fn test_sync_search_result_from_replaces_cached_result() { + let environment = EnvironmentApi::new(); + let target = Poetry::from(&environment); + let source = Poetry::from(&environment); + + target + .search_result + .write() + .unwrap() + .replace(LocatorResult { + managers: vec![], + environments: vec![PythonEnvironment { + name: Some("stale".to_string()), + kind: Some(PythonEnvironmentKind::Poetry), + ..Default::default() + }], + }); + + source + .search_result + .write() + .unwrap() + .replace(LocatorResult { + managers: vec![], + environments: vec![PythonEnvironment { + name: Some("fresh".to_string()), + kind: Some(PythonEnvironmentKind::Poetry), + ..Default::default() + }], + }); + + target.sync_search_result_from(&source); + + let result = target.search_result.read().unwrap().clone(); + assert_eq!( + result.unwrap().environments[0].name.as_deref(), + Some("fresh") + ); + } + + #[test] + fn test_workspace_scope_merges_search_results() { + let environment = EnvironmentApi::new(); + let target = Poetry::from(&environment); + let source = Poetry::from(&environment); + + target + .search_result + .write() + .unwrap() + .replace(LocatorResult { + managers: vec![], + environments: vec![PythonEnvironment { + name: Some("existing".to_string()), + kind: Some(PythonEnvironmentKind::Poetry), + ..Default::default() + }], + }); + + source + .search_result + .write() + .unwrap() + .replace(LocatorResult { + managers: vec![], + environments: vec![PythonEnvironment { + name: Some("workspace".to_string()), + kind: Some(PythonEnvironmentKind::Poetry), + ..Default::default() + }], + }); + + target.sync_refresh_state_from(&source, &RefreshStateSyncScope::Workspace); + + let result = target.search_result.read().unwrap().clone().unwrap(); + let mut names = result + .environments + .iter() + .map(|environment| environment.name.clone().unwrap()) + .collect::>(); + names.sort(); + + assert_eq!(names, vec!["existing".to_string(), "workspace".to_string()]); + } + + #[test] + fn test_clear_preserves_configured_poetry_executable() { + let environment = EnvironmentApi::new(); + let poetry = Poetry::from(&environment); + let configured = PathBuf::from("/configured/poetry"); + + poetry.configure(&Configuration { + poetry_executable: Some(configured.clone()), + ..Default::default() + }); + poetry + .search_result + .write() + .unwrap() + .replace(LocatorResult { + managers: vec![], + environments: vec![], + }); + + poetry.clear(); + + assert_eq!( + poetry.poetry_executable.read().unwrap().clone(), + Some(configured) + ); + assert!(poetry.search_result.read().unwrap().is_none()); + } + + #[test] + fn test_configure_clears_poetry_executable_when_unset() { + let environment = EnvironmentApi::new(); + let poetry = Poetry::from(&environment); + + poetry.configure(&Configuration { + poetry_executable: Some(PathBuf::from("/configured/poetry")), + ..Default::default() + }); + poetry.configure(&Configuration::default()); + + assert!(poetry.poetry_executable.read().unwrap().is_none()); + } +} diff --git a/crates/pet-pyenv/src/lib.rs b/crates/pet-pyenv/src/lib.rs index da4549ff..90900306 100644 --- a/crates/pet-pyenv/src/lib.rs +++ b/crates/pet-pyenv/src/lib.rs @@ -19,7 +19,7 @@ use pet_core::{ os_environment::Environment, python_environment::{PythonEnvironment, PythonEnvironmentKind}, reporter::Reporter, - Locator, LocatorKind, + Locator, LocatorKind, RefreshStatePersistence, }; use pet_python_utils::executable::find_executable; @@ -84,6 +84,9 @@ impl Locator for PyEnv { fn get_kind(&self) -> LocatorKind { LocatorKind::PyEnv } + fn refresh_state(&self) -> RefreshStatePersistence { + RefreshStatePersistence::SelfHydratingCache + } fn supported_categories(&self) -> Vec { vec![ PythonEnvironmentKind::Pyenv, diff --git a/crates/pet-pyenv/tests/pyenv_test.rs b/crates/pet-pyenv/tests/pyenv_test.rs index 69706f31..bf7fbe94 100644 --- a/crates/pet-pyenv/tests/pyenv_test.rs +++ b/crates/pet-pyenv/tests/pyenv_test.rs @@ -546,3 +546,45 @@ fn resolve_pyenv_environment() { assert!(result.is_some()); assert_eq!(result.unwrap().kind, Some(PythonEnvironmentKind::Conda)); } + +#[test] +#[cfg(unix)] +fn pyenv_refresh_state_self_hydrates_without_sync() { + use crate::common::create_test_environment; + use common::resolve_test_path; + use pet_conda::Conda; + use pet_core::{ + env::PythonEnv, python_environment::PythonEnvironmentKind, Locator, RefreshStatePersistence, + }; + use pet_pyenv::PyEnv; + use pet_reporter::{cache::CacheReporter, collect}; + use std::{collections::HashMap, sync::Arc}; + + let home = resolve_test_path(&["unix", "pyenv", "user_home"]); + let homebrew_bin = resolve_test_path(&["unix", "pyenv", "home", "opt", "homebrew", "bin"]); + let environment = + create_test_environment(HashMap::new(), Some(home.clone()), vec![homebrew_bin], None); + + let shared_conda = Arc::new(Conda::from(&environment)); + let shared = PyEnv::from(&environment, shared_conda.clone()); + let refreshed = PyEnv::from(&environment, shared_conda); + + assert_eq!( + shared.refresh_state(), + RefreshStatePersistence::SelfHydratingCache + ); + + let reporter = Arc::new(collect::create_reporter()); + refreshed.find(&CacheReporter::new(reporter)); + + let executable = + resolve_test_path(&[home.to_str().unwrap(), ".pyenv/versions/3.9.9/bin/python"]); + let prefix = resolve_test_path(&[home.to_str().unwrap(), ".pyenv/versions/3.9.9"]); + + let resolved = shared.try_from(&PythonEnv::new(executable, Some(prefix.clone()), None)); + + assert!(resolved.is_some()); + let resolved = resolved.unwrap(); + assert_eq!(resolved.kind, Some(PythonEnvironmentKind::Pyenv)); + assert_eq!(resolved.prefix, Some(prefix)); +} diff --git a/crates/pet-uv/src/lib.rs b/crates/pet-uv/src/lib.rs index 66e7612d..77d16385 100644 --- a/crates/pet-uv/src/lib.rs +++ b/crates/pet-uv/src/lib.rs @@ -12,7 +12,7 @@ use pet_core::{ python_environment::{PythonEnvironment, PythonEnvironmentBuilder, PythonEnvironmentKind}, pyvenv_cfg::PyVenvCfg, reporter::Reporter, - Configuration, Locator, LocatorKind, + Configuration, Locator, LocatorKind, RefreshStatePersistence, }; use pet_fs::path::norm_case; use pet_python_utils::executable::{find_executable, find_executables}; @@ -85,6 +85,10 @@ impl Locator for Uv { LocatorKind::Uv } + fn refresh_state(&self) -> RefreshStatePersistence { + RefreshStatePersistence::ConfiguredOnly + } + fn supported_categories(&self) -> Vec { vec![ PythonEnvironmentKind::Uv, @@ -93,12 +97,12 @@ impl Locator for Uv { } fn configure(&self, config: &Configuration) { + let mut ws = self + .workspace_directories + .lock() + .expect("workspace_directories mutex poisoned"); + ws.clear(); if let Some(workspace_directories) = config.workspace_directories.as_ref() { - let mut ws = self - .workspace_directories - .lock() - .expect("workspace_directories mutex poisoned"); - ws.clear(); ws.extend(workspace_directories.iter().cloned()); } } diff --git a/crates/pet-windows-registry/src/lib.rs b/crates/pet-windows-registry/src/lib.rs index 86cab9b8..aab44348 100644 --- a/crates/pet-windows-registry/src/lib.rs +++ b/crates/pet-windows-registry/src/lib.rs @@ -8,7 +8,7 @@ use pet_core::{ env::PythonEnv, python_environment::{PythonEnvironment, PythonEnvironmentKind}, reporter::Reporter, - Locator, LocatorKind, LocatorResult, + Locator, LocatorKind, LocatorResult, RefreshStatePersistence, RefreshStateSyncScope, }; use pet_virtualenv::is_virtualenv; use std::sync::{Arc, Mutex}; @@ -52,12 +52,48 @@ impl WindowsRegistry { .expect("search_result mutex poisoned"); search_result.take(); } + + fn sync_search_result_from(&self, source: &WindowsRegistry) { + let search_result = source + .search_result + .lock() + .expect("search_result mutex poisoned") + .clone(); + self.search_result + .lock() + .expect("search_result mutex poisoned") + .clone_from(&search_result); + } } impl Locator for WindowsRegistry { fn get_kind(&self) -> LocatorKind { LocatorKind::WindowsRegistry } + fn refresh_state(&self) -> RefreshStatePersistence { + RefreshStatePersistence::SyncedDiscoveryState + } + fn sync_refresh_state_from(&self, source: &dyn Locator, scope: &RefreshStateSyncScope) { + let source = source + .as_any() + .downcast_ref::() + .unwrap_or_else(|| { + panic!( + "attempted to sync WindowsRegistry state from {:?}", + source.get_kind() + ) + }); + + match scope { + RefreshStateSyncScope::Full => self.sync_search_result_from(source), + RefreshStateSyncScope::GlobalFiltered(kind) + if self.supported_categories().contains(kind) => + { + self.sync_search_result_from(source) + } + RefreshStateSyncScope::GlobalFiltered(_) | RefreshStateSyncScope::Workspace => {} + } + } fn supported_categories(&self) -> Vec { vec![ PythonEnvironmentKind::WindowsRegistry, @@ -104,3 +140,72 @@ impl Locator for WindowsRegistry { // } } + +#[cfg(test)] +mod tests { + use super::*; + use pet_conda::Conda; + use pet_core::os_environment::EnvironmentApi; + + #[test] + fn test_full_refresh_sync_replaces_registry_cache() { + let environment = EnvironmentApi::new(); + let shared = WindowsRegistry::from(Arc::new(Conda::from(&environment))); + let refreshed = WindowsRegistry::from(Arc::new(Conda::from(&environment))); + + shared.search_result.lock().unwrap().replace(LocatorResult { + managers: vec![], + environments: vec![PythonEnvironment { + name: Some("stale".to_string()), + ..Default::default() + }], + }); + refreshed + .search_result + .lock() + .unwrap() + .replace(LocatorResult { + managers: vec![], + environments: vec![PythonEnvironment { + name: Some("fresh".to_string()), + ..Default::default() + }], + }); + + shared.sync_refresh_state_from(&refreshed, &RefreshStateSyncScope::Full); + + let result = shared.search_result.lock().unwrap().clone().unwrap(); + assert_eq!(result.environments[0].name.as_deref(), Some("fresh")); + } + + #[test] + fn test_workspace_scope_does_not_replace_registry_cache() { + let environment = EnvironmentApi::new(); + let shared = WindowsRegistry::from(Arc::new(Conda::from(&environment))); + let refreshed = WindowsRegistry::from(Arc::new(Conda::from(&environment))); + + shared.search_result.lock().unwrap().replace(LocatorResult { + managers: vec![], + environments: vec![PythonEnvironment { + name: Some("stale".to_string()), + ..Default::default() + }], + }); + refreshed + .search_result + .lock() + .unwrap() + .replace(LocatorResult { + managers: vec![], + environments: vec![PythonEnvironment { + name: Some("fresh".to_string()), + ..Default::default() + }], + }); + + shared.sync_refresh_state_from(&refreshed, &RefreshStateSyncScope::Workspace); + + let result = shared.search_result.lock().unwrap().clone().unwrap(); + assert_eq!(result.environments[0].name.as_deref(), Some("stale")); + } +} diff --git a/crates/pet-windows-store/src/lib.rs b/crates/pet-windows-store/src/lib.rs index 77ccafc6..e37ed401 100644 --- a/crates/pet-windows-store/src/lib.rs +++ b/crates/pet-windows-store/src/lib.rs @@ -12,7 +12,9 @@ use pet_core::env::PythonEnv; use pet_core::python_environment::{PythonEnvironment, PythonEnvironmentKind}; use pet_core::reporter::Reporter; use pet_core::LocatorKind; -use pet_core::{os_environment::Environment, Locator}; +use pet_core::{ + os_environment::Environment, Locator, RefreshStatePersistence, RefreshStateSyncScope, +}; use std::path::Path; use std::sync::{Arc, RwLock}; @@ -52,12 +54,41 @@ impl WindowsStore { fn clear(&self) { self.environments.write().unwrap().take(); } + + fn sync_environments_from(&self, source: &WindowsStore) { + let environments = source.environments.read().unwrap().clone(); + self.environments.write().unwrap().clone_from(&environments); + } } impl Locator for WindowsStore { fn get_kind(&self) -> LocatorKind { LocatorKind::WindowsStore } + fn refresh_state(&self) -> RefreshStatePersistence { + RefreshStatePersistence::SyncedDiscoveryState + } + fn sync_refresh_state_from(&self, source: &dyn Locator, scope: &RefreshStateSyncScope) { + let source = source + .as_any() + .downcast_ref::() + .unwrap_or_else(|| { + panic!( + "attempted to sync WindowsStore state from {:?}", + source.get_kind() + ) + }); + + match scope { + RefreshStateSyncScope::Full => self.sync_environments_from(source), + RefreshStateSyncScope::GlobalFiltered(kind) + if self.supported_categories().contains(kind) => + { + self.sync_environments_from(source) + } + RefreshStateSyncScope::GlobalFiltered(_) | RefreshStateSyncScope::Workspace => {} + } + } fn supported_categories(&self) -> Vec { vec![PythonEnvironmentKind::WindowsStore] } @@ -141,3 +172,67 @@ impl Locator for WindowsStore { // } } + +#[cfg(test)] +mod tests { + use super::*; + use pet_core::os_environment::EnvironmentApi; + + #[test] + fn test_full_refresh_sync_replaces_store_cache() { + let environment = EnvironmentApi::new(); + let shared = WindowsStore::from(&environment); + let refreshed = WindowsStore::from(&environment); + + shared + .environments + .write() + .unwrap() + .replace(vec![PythonEnvironment { + name: Some("stale".to_string()), + ..Default::default() + }]); + refreshed + .environments + .write() + .unwrap() + .replace(vec![PythonEnvironment { + name: Some("fresh".to_string()), + ..Default::default() + }]); + + shared.sync_refresh_state_from(&refreshed, &RefreshStateSyncScope::Full); + + let result = shared.environments.read().unwrap().clone().unwrap(); + assert_eq!(result[0].name.as_deref(), Some("fresh")); + } + + #[test] + fn test_workspace_scope_does_not_replace_store_cache() { + let environment = EnvironmentApi::new(); + let shared = WindowsStore::from(&environment); + let refreshed = WindowsStore::from(&environment); + + shared + .environments + .write() + .unwrap() + .replace(vec![PythonEnvironment { + name: Some("stale".to_string()), + ..Default::default() + }]); + refreshed + .environments + .write() + .unwrap() + .replace(vec![PythonEnvironment { + name: Some("fresh".to_string()), + ..Default::default() + }]); + + shared.sync_refresh_state_from(&refreshed, &RefreshStateSyncScope::Workspace); + + let result = shared.environments.read().unwrap().clone().unwrap(); + assert_eq!(result[0].name.as_deref(), Some("stale")); + } +} diff --git a/crates/pet/src/jsonrpc.rs b/crates/pet/src/jsonrpc.rs index 3881975c..8e4703ac 100644 --- a/crates/pet/src/jsonrpc.rs +++ b/crates/pet/src/jsonrpc.rs @@ -6,7 +6,6 @@ use crate::find::find_python_environments_in_workspace_folder_recursive; use crate::find::identify_python_executables_using_locators; use crate::find::SearchScope; use crate::locators::create_locators; -use lazy_static::lazy_static; use log::{error, info, trace, warn}; use pet::initialize_tracing; use pet::resolve::resolve_environment; @@ -19,10 +18,11 @@ use pet_core::telemetry::TelemetryEvent; use pet_core::{ os_environment::{Environment, EnvironmentApi}, reporter::Reporter, - Configuration, Locator, + Configuration, Locator, RefreshStatePersistence, RefreshStateSyncScope, }; use pet_env_var_path::get_search_paths_from_env_variables; use pet_fs::glob::expand_glob_patterns; +use pet_fs::path::norm_case; use pet_jsonrpc::{ send_error, send_reply, server::{start_server, HandlersKeyedByMethodName}, @@ -39,28 +39,285 @@ use serde_json::json; use serde_json::{self, Value}; use std::collections::BTreeMap; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Mutex; use std::time::Duration; use std::{ ops::Deref, + panic::{self, AssertUnwindSafe}, path::PathBuf, - sync::{Arc, RwLock}, + sync::{Arc, Condvar, Mutex, RwLock}, thread, time::{Instant, SystemTime}, }; use tracing::info_span; -lazy_static! { - /// Used to ensure we can have only one refreh at a time. - static ref REFRESH_LOCK: Arc> = Arc::new(Mutex::new(())); +#[derive(Debug, Clone, Default)] +struct ConfigurationState { + generation: u64, + config: Configuration, } -pub struct Context { - configuration: RwLock, +#[derive(Debug, Clone, PartialEq, Eq)] +struct RefreshKey { + options: RefreshOptions, + config_generation: u64, +} + +impl RefreshKey { + fn new(options: &RefreshOptions, config_generation: u64) -> Self { + Self { + options: options.clone(), + config_generation, + } + } +} + +#[derive(Debug)] +struct ActiveRefresh { + key: RefreshKey, + request_ids: Vec, +} + +#[derive(Debug, Default)] +enum RefreshCoordinatorState { + #[default] + Idle, + Running(ActiveRefresh), + Completing(ActiveRefresh), +} + +#[derive(Debug, Default)] +struct RefreshCoordinator { + state: Mutex, + changed: Condvar, +} + +enum RefreshRegistration { + Start, + Joined, + Wait, +} + +impl RefreshCoordinator { + fn register_request(&self, request_id: u32, key: RefreshKey) -> RefreshRegistration { + let mut state = self + .state + .lock() + .expect("refresh coordinator mutex poisoned"); + match &mut *state { + RefreshCoordinatorState::Idle => { + *state = RefreshCoordinatorState::Running(ActiveRefresh { + key, + request_ids: vec![request_id], + }); + RefreshRegistration::Start + } + RefreshCoordinatorState::Running(active) if active.key == key => { + active.request_ids.push(request_id); + RefreshRegistration::Joined + } + RefreshCoordinatorState::Completing(active) if active.key == key => { + active.request_ids.push(request_id); + RefreshRegistration::Joined + } + RefreshCoordinatorState::Running(_) | RefreshCoordinatorState::Completing(_) => { + RefreshRegistration::Wait + } + } + } + + fn wait_until_idle(&self) { + let state = self + .state + .lock() + .expect("refresh coordinator mutex poisoned"); + let _guard = self + .changed + .wait_while(state, |state| { + !matches!(state, RefreshCoordinatorState::Idle) + }) + .expect("refresh coordinator condvar poisoned"); + } + + fn begin_completion(&self, key: &RefreshKey) { + let mut state = self + .state + .lock() + .expect("refresh coordinator mutex poisoned"); + match std::mem::replace(&mut *state, RefreshCoordinatorState::Idle) { + RefreshCoordinatorState::Running(active) if active.key == *key => { + *state = RefreshCoordinatorState::Completing(active); + } + RefreshCoordinatorState::Running(active) => { + *state = RefreshCoordinatorState::Running(active); + panic!("attempted to finish refresh with unexpected key"); + } + RefreshCoordinatorState::Completing(active) => { + *state = RefreshCoordinatorState::Completing(active); + panic!("attempted to begin refresh completion while already completing") + } + RefreshCoordinatorState::Idle => { + panic!("attempted to finish refresh while coordinator was idle") + } + } + } + + fn drain_completing_request_ids(&self, key: &RefreshKey) -> Vec { + let mut state = self + .state + .lock() + .expect("refresh coordinator mutex poisoned"); + match &mut *state { + RefreshCoordinatorState::Completing(active) if active.key == *key => { + std::mem::take(&mut active.request_ids) + } + RefreshCoordinatorState::Completing(_) => { + panic!("attempted to drain completion requests with unexpected key") + } + RefreshCoordinatorState::Running(_) => { + panic!("attempted to drain completion requests before beginning completion") + } + RefreshCoordinatorState::Idle => Vec::new(), + } + } + + fn complete_request(&self, key: &RefreshKey) -> bool { + let mut state = self + .state + .lock() + .expect("refresh coordinator mutex poisoned"); + match &mut *state { + RefreshCoordinatorState::Completing(active) if active.key == *key => { + if active.request_ids.is_empty() { + *state = RefreshCoordinatorState::Idle; + self.changed.notify_all(); + true + } else { + false + } + } + RefreshCoordinatorState::Completing(_) => { + panic!("attempted to complete refresh with unexpected key") + } + RefreshCoordinatorState::Running(_) => { + panic!("attempted to complete refresh before beginning completion") + } + RefreshCoordinatorState::Idle => { + panic!("attempted to complete refresh while coordinator was idle") + } + } + } + + fn force_complete_request(&self, key: &RefreshKey) { + let mut state = self + .state + .lock() + .expect("refresh coordinator mutex poisoned"); + match &*state { + RefreshCoordinatorState::Completing(active) if active.key == *key => { + *state = RefreshCoordinatorState::Idle; + self.changed.notify_all(); + } + RefreshCoordinatorState::Idle => {} + _ => {} + } + } +} + +struct RefreshLocators { locators: Arc>>, conda_locator: Arc, poetry_locator: Arc, +} + +struct RefreshExecution { + result: RefreshResult, + perf: RefreshPerformance, + reporter: Arc, + conda_locator: Arc, + poetry_locator: Arc, + conda_executable: Option, + poetry_executable: Option, +} + +struct RefreshCompletionGuard<'a> { + coordinator: &'a RefreshCoordinator, + key: RefreshKey, + completed: bool, +} + +impl<'a> RefreshCompletionGuard<'a> { + fn begin(coordinator: &'a RefreshCoordinator, key: &RefreshKey) -> Self { + coordinator.begin_completion(key); + Self { + coordinator, + key: key.clone(), + completed: false, + } + } + + fn drain_request_ids(&self) -> Vec { + self.coordinator.drain_completing_request_ids(&self.key) + } + + fn finish_if_no_pending(&mut self) -> bool { + let completed = self.coordinator.complete_request(&self.key); + if completed { + self.completed = true; + } + completed + } +} + +impl Drop for RefreshCompletionGuard<'_> { + fn drop(&mut self) { + if !self.completed { + self.coordinator.force_complete_request(&self.key); + } + } +} + +fn send_refresh_replies_for_waiters( + completion_guard: &RefreshCompletionGuard<'_>, + result: &RefreshResult, +) { + for request_id in completion_guard.drain_request_ids() { + send_reply(request_id, Some(result.clone())); + } +} + +fn send_refresh_errors_for_waiters(completion_guard: &RefreshCompletionGuard<'_>, message: &str) { + for request_id in completion_guard.drain_request_ids() { + send_error(Some(request_id), -4, message.to_string()); + } +} + +fn finish_refresh_replies( + completion_guard: &mut RefreshCompletionGuard<'_>, + result: &RefreshResult, +) { + loop { + send_refresh_replies_for_waiters(completion_guard, result); + if completion_guard.finish_if_no_pending() { + return; + } + } +} + +fn finish_refresh_errors(completion_guard: &mut RefreshCompletionGuard<'_>, message: &str) { + loop { + send_refresh_errors_for_waiters(completion_guard, message); + if completion_guard.finish_if_no_pending() { + return; + } + } +} + +pub struct Context { + configuration: RwLock, + locators: Arc>>, + conda_locator: Arc, os_environment: Arc, + refresh_coordinator: RefreshCoordinator, } static MISSING_ENVS_REPORTED: AtomicBool = AtomicBool::new(false); @@ -78,9 +335,9 @@ pub fn start_jsonrpc_server() { let context = Context { locators: create_locators(conda_locator.clone(), poetry_locator.clone(), &environment), conda_locator, - poetry_locator, - configuration: RwLock::new(Configuration::default()), + configuration: RwLock::new(ConfigurationState::default()), os_environment: Arc::new(environment), + refresh_coordinator: RefreshCoordinator::default(), }; let mut handlers = HandlersKeyedByMethodName::new(Arc::new(context)); @@ -160,24 +417,28 @@ pub fn handle_configure(context: Arc, id: u32, params: Value) { ); } - let mut cfg = context.configuration.write().unwrap(); - cfg.workspace_directories = workspace_directories; - cfg.conda_executable = configure_options.conda_executable; - cfg.environment_directories = environment_directories; - cfg.pipenv_executable = configure_options.pipenv_executable; - cfg.poetry_executable = configure_options.poetry_executable; - // We will not support changing the cache directories once set. - // No point, supporting such a use case. - if let Some(cache_directory) = configure_options.cache_directory { - set_cache_directory(cache_directory.clone()); - cfg.cache_directory = Some(cache_directory); - } - trace!("Configuring locators: {:?}", cfg); - drop(cfg); - let config = context.configuration.read().unwrap().clone(); - for locator in context.locators.iter() { - locator.configure(&config); - } + let config = { + let mut state = context.configuration.write().unwrap(); + state.config.workspace_directories = workspace_directories; + state.config.conda_executable = configure_options.conda_executable; + state.config.environment_directories = environment_directories; + state.config.pipenv_executable = configure_options.pipenv_executable; + state.config.poetry_executable = configure_options.poetry_executable; + // We will not support changing the cache directories once set. + // No point, supporting such a use case. + if let Some(cache_directory) = configure_options.cache_directory { + set_cache_directory(cache_directory.clone()); + state.config.cache_directory = Some(cache_directory); + } + state.generation += 1; + trace!( + "Configuring locators with generation {}: {:?}", + state.generation, + state.config + ); + state.config.clone() + }; + configure_locators(&context.locators, &config); info!("Configure completed in {:?}", now.elapsed()); send_reply(id, None::<()>); }); @@ -189,7 +450,7 @@ pub fn handle_configure(context: Arc, id: u32, params: Value) { } } -#[derive(Debug, Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct RefreshOptions { /// If provided, then limit the search to this kind of environments. @@ -214,18 +475,204 @@ impl RefreshResult { } } -pub fn handle_refresh(context: Arc, id: u32, params: Value) { - let params = match params { +fn normalize_refresh_params(params: Value) -> Value { + match params { Value::Null => json!({}), - Value::Array(_) => json!({}), + Value::Array(values) if values.is_empty() => json!({}), _ => params, + } +} + +fn canonicalize_refresh_options(mut options: RefreshOptions) -> RefreshOptions { + if let Some(search_paths) = options.search_paths.take() { + let mut expanded = expand_glob_patterns(&search_paths) + .into_iter() + .map(norm_case) + .collect::>(); + expanded.sort(); + expanded.dedup(); + options.search_paths = Some(expanded); + } + + options +} + +fn parse_refresh_options(params: Value) -> Result { + serde_json::from_value::>(normalize_refresh_params(params)) + .map(|options| canonicalize_refresh_options(options.unwrap_or_default())) +} + +fn configure_locators(locators: &Arc>>, config: &Configuration) { + for locator in locators.iter() { + locator.configure(config); + } +} + +fn create_refresh_locators(environment: &dyn Environment) -> RefreshLocators { + let conda_locator = Arc::new(Conda::from(environment)); + let poetry_locator = Arc::new(Poetry::from(environment)); + let locators = create_locators(conda_locator.clone(), poetry_locator.clone(), environment); + + RefreshLocators { + locators, + conda_locator, + poetry_locator, + } +} + +fn sync_refresh_locator_state( + target_locators: &[Arc], + source_locators: &[Arc], + search_scope: Option<&SearchScope>, +) { + let sync_scope = refresh_state_sync_scope(search_scope); + + assert_eq!( + target_locators.len(), + source_locators.len(), + "refresh locator graphs drifted" + ); + + for (target, source) in target_locators.iter().zip(source_locators.iter()) { + assert_eq!( + target.get_kind(), + source.get_kind(), + "refresh locator order drifted" + ); + + if !matches!(target.refresh_state(), RefreshStatePersistence::Stateless) { + trace!( + "Applying refresh state contract for locator {:?}: {:?}", + target.get_kind(), + target.refresh_state() + ); + } + + target.sync_refresh_state_from(source.as_ref(), &sync_scope); + } +} + +fn refresh_state_sync_scope(search_scope: Option<&SearchScope>) -> RefreshStateSyncScope { + match search_scope { + Some(SearchScope::Workspace) => RefreshStateSyncScope::Workspace, + Some(SearchScope::Global(kind)) => RefreshStateSyncScope::GlobalFiltered(*kind), + None => RefreshStateSyncScope::Full, + } +} + +fn execute_refresh( + context: &Context, + refresh_options: &RefreshOptions, + configuration_state: &ConfigurationState, +) -> RefreshExecution { + let refresh_locators = create_refresh_locators(context.os_environment.deref()); + let reporter = Arc::new(CacheReporter::new(Arc::new(jsonrpc::create_reporter( + refresh_options.search_kind, + )))); + + let (config, search_scope) = + build_refresh_config(refresh_options, configuration_state.config.clone()); + if refresh_options.search_paths.is_some() { + trace!( + "Expanded search paths to {} workspace dirs, {} executables", + config + .workspace_directories + .as_ref() + .map(|v| v.len()) + .unwrap_or(0), + config.executables.as_ref().map(|v| v.len()).unwrap_or(0) + ); + } + + configure_locators(&refresh_locators.locators, &config); + + trace!( + "Start refreshing environments, generation: {}, config: {:?}", + configuration_state.generation, + config + ); + let summary = find_and_report_envs( + reporter.as_ref(), + config, + &refresh_locators.locators, + context.os_environment.deref(), + search_scope.clone(), + ); + let summary = summary.lock().expect("summary mutex poisoned"); + for locator in summary.locators.iter() { + info!("Locator {:?} took {:?}", locator.0, locator.1); + } + for item in summary.breakdown.iter() { + info!("Locator {} took {:?}", item.0, item.1); + } + trace!("Finished refreshing environments in {:?}", summary.total); + + // Refresh runs on a transient locator graph, so apply each locator's refresh-state + // contract back into the long-lived shared locator graph. + sync_refresh_locator_state( + context.locators.as_ref(), + refresh_locators.locators.as_ref(), + search_scope.as_ref(), + ); + + let perf = RefreshPerformance { + total: summary.total.as_millis(), + locators: summary + .locators + .clone() + .iter() + .map(|(k, v)| (format!("{k:?}"), v.as_millis())) + .collect::>(), + breakdown: summary + .breakdown + .clone() + .iter() + .map(|(k, v)| (k.to_string(), v.as_millis())) + .collect::>(), }; - match serde_json::from_value::>(params.clone()) { + + RefreshExecution { + result: RefreshResult::new(summary.total), + perf, + reporter, + conda_locator: refresh_locators.conda_locator, + poetry_locator: refresh_locators.poetry_locator, + conda_executable: configuration_state.config.conda_executable.clone(), + poetry_executable: configuration_state.config.poetry_executable.clone(), + } +} + +fn report_refresh_follow_up(execution: RefreshExecution) { + execution + .reporter + .report_telemetry(&TelemetryEvent::RefreshPerformance(execution.perf)); + + if MISSING_ENVS_REPORTED + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .ok() + .unwrap_or_default() + { + let conda_locator = execution.conda_locator.clone(); + let conda_executable = execution.conda_executable.clone(); + let reporter_ref = execution.reporter.clone(); + thread::spawn(move || { + conda_locator.find_and_report_missing_envs(reporter_ref.as_ref(), conda_executable); + Some(()) + }); + + let poetry_locator = execution.poetry_locator.clone(); + let poetry_executable = execution.poetry_executable.clone(); + let reporter_ref = execution.reporter.clone(); + thread::spawn(move || { + poetry_locator.find_and_report_missing_envs(reporter_ref.as_ref(), poetry_executable); + Some(()) + }); + } +} + +pub fn handle_refresh(context: Arc, id: u32, params: Value) { + match parse_refresh_options(params.clone()) { Ok(refresh_options) => { - let refresh_options = refresh_options.unwrap_or(RefreshOptions { - search_kind: None, - search_paths: None, - }); // Start in a new thread, we can have multiple requests. thread::spawn(move || { let _span = info_span!("handle_refresh", @@ -234,110 +681,61 @@ pub fn handle_refresh(context: Arc, id: u32, params: Value) { ) .entered(); - // Ensure we can have only one refresh at a time. - let lock = REFRESH_LOCK.lock().expect("REFRESH_LOCK mutex poisoned"); - - let config = context.configuration.read().unwrap().clone(); - let reporter = Arc::new(CacheReporter::new(Arc::new(jsonrpc::create_reporter( - refresh_options.search_kind, - )))); - - let (config, search_scope) = build_refresh_config(&refresh_options, config); - if refresh_options.search_paths.is_some() { - trace!( - "Expanded search paths to {} workspace dirs, {} executables", - config - .workspace_directories - .as_ref() - .map(|v| v.len()) - .unwrap_or(0), - config.executables.as_ref().map(|v| v.len()).unwrap_or(0) - ); - } - - // Configure the locators with the (possibly modified) config. - for locator in context.locators.iter() { - locator.configure(&config); - } + loop { + let configuration_state = context.configuration.read().unwrap().clone(); + let refresh_key = + RefreshKey::new(&refresh_options, configuration_state.generation); - trace!("Start refreshing environments, config: {:?}", config); - let summary = find_and_report_envs( - reporter.as_ref(), - config, - &context.locators, - context.os_environment.deref(), - search_scope, - ); - let summary = summary.lock().expect("summary mutex poisoned"); - for locator in summary.locators.iter() { - info!("Locator {:?} took {:?}", locator.0, locator.1); - } - for item in summary.breakdown.iter() { - info!("Locator {} took {:?}", item.0, item.1); - } - trace!("Finished refreshing environments in {:?}", summary.total); - send_reply(id, Some(RefreshResult::new(summary.total))); - - let perf = RefreshPerformance { - total: summary.total.as_millis(), - locators: summary - .locators - .clone() - .iter() - .map(|(k, v)| (format!("{k:?}"), v.as_millis())) - .collect::>(), - breakdown: summary - .breakdown - .clone() - .iter() - .map(|(k, v)| (k.to_string(), v.as_millis())) - .collect::>(), - }; - reporter.report_telemetry(&TelemetryEvent::RefreshPerformance(perf)); - // Find an report missing envs for the first launch of this process. - if MISSING_ENVS_REPORTED - .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .ok() - .unwrap_or_default() - { - // By now all conda envs have been found - // Spawn conda in a separate thread. - // & see if we can find more environments by spawning conda. - // But we will not wait for this to complete. - let conda_locator = context.conda_locator.clone(); - let conda_executable = context - .configuration - .read() - .unwrap() - .conda_executable - .clone(); - let reporter_ref = reporter.clone(); - thread::spawn(move || { - conda_locator - .find_and_report_missing_envs(reporter_ref.as_ref(), conda_executable); - Some(()) - }); + match context + .refresh_coordinator + .register_request(id, refresh_key.clone()) + { + RefreshRegistration::Joined => return, + RefreshRegistration::Wait => { + context.refresh_coordinator.wait_until_idle(); + } + RefreshRegistration::Start => { + let refresh_result = panic::catch_unwind(AssertUnwindSafe(|| { + execute_refresh( + context.as_ref(), + &refresh_options, + &configuration_state, + ) + })); - // By now all poetry envs have been found - // Spawn poetry exe in a separate thread. - // & see if we can find more environments by spawning poetry. - // But we will not wait for this to complete. - let poetry_locator = context.poetry_locator.clone(); - let poetry_executable = context - .configuration - .read() - .unwrap() - .poetry_executable - .clone(); - let reporter_ref = reporter.clone(); - thread::spawn(move || { - poetry_locator - .find_and_report_missing_envs(reporter_ref.as_ref(), poetry_executable); - Some(()) - }); + match refresh_result { + Ok(execution) => { + let refresh_result = execution.result.clone(); + let mut completion_guard = RefreshCompletionGuard::begin( + &context.refresh_coordinator, + &refresh_key, + ); + send_refresh_replies_for_waiters( + &completion_guard, + &refresh_result, + ); + finish_refresh_replies(&mut completion_guard, &refresh_result); + report_refresh_follow_up(execution); + } + Err(_) => { + error!( + "Refresh panicked for generation {} and options {:?}", + configuration_state.generation, refresh_options + ); + let mut completion_guard = RefreshCompletionGuard::begin( + &context.refresh_coordinator, + &refresh_key, + ); + finish_refresh_errors( + &mut completion_guard, + "Refresh failed unexpectedly", + ); + } + } + return; + } + } } - - drop(lock); }); } Err(e) => { @@ -447,6 +845,7 @@ pub fn handle_find(context: Arc, id: u32, params: Value) { .configuration .read() .unwrap() + .config .clone() .environment_directories .as_deref() @@ -491,6 +890,7 @@ pub fn handle_conda_telemetry(context: Arc, id: u32, _params: Value) { .configuration .read() .unwrap() + .config .conda_executable .clone(); let info = conda_locator.get_info_for_telemetry(conda_executable); @@ -560,7 +960,442 @@ pub(crate) fn build_refresh_config( #[cfg(test)] mod tests { use super::*; + use pet_conda::manager::CondaManager; + use pet_core::manager::EnvManagerType; + use pet_core::RefreshStatePersistence; use std::path::PathBuf; + use std::sync::mpsc; + use std::thread; + + fn make_refresh_key(generation: u64, options: RefreshOptions) -> RefreshKey { + RefreshKey::new(&options, generation) + } + + #[test] + fn test_parse_refresh_options_normalizes_null_and_array() { + assert_eq!( + parse_refresh_options(Value::Null).unwrap(), + RefreshOptions::default() + ); + assert_eq!( + parse_refresh_options(Value::Array(vec![])).unwrap(), + RefreshOptions::default() + ); + } + + #[test] + fn test_parse_refresh_options_rejects_non_empty_array() { + assert!(parse_refresh_options(json!([{"searchKind": "Conda"}])).is_err()); + } + + #[test] + fn test_parse_refresh_options_canonicalizes_search_paths() { + let temp_dir = tempfile::tempdir().unwrap(); + let alpha = temp_dir.path().join("alpha"); + let beta = temp_dir.path().join("beta"); + std::fs::create_dir(&alpha).unwrap(); + std::fs::create_dir(&beta).unwrap(); + + let options = canonicalize_refresh_options(RefreshOptions { + search_kind: Some(PythonEnvironmentKind::Venv), + search_paths: Some(vec![beta.clone(), temp_dir.path().join("*"), alpha.clone()]), + }); + + assert_eq!( + options, + RefreshOptions { + search_kind: Some(PythonEnvironmentKind::Venv), + search_paths: Some(vec![norm_case(alpha), norm_case(beta)]), + } + ); + } + + #[test] + fn test_refresh_coordinator_joins_identical_requests() { + let coordinator = RefreshCoordinator::default(); + let key = make_refresh_key(3, RefreshOptions::default()); + + assert!(matches!( + coordinator.register_request(1, key.clone()), + RefreshRegistration::Start + )); + assert!(matches!( + coordinator.register_request(2, key.clone()), + RefreshRegistration::Joined + )); + let mut completion_guard = RefreshCompletionGuard::begin(&coordinator, &key); + assert_eq!(completion_guard.drain_request_ids(), vec![1, 2]); + assert!(completion_guard.finish_if_no_pending()); + } + + #[test] + fn test_refresh_coordinator_serializes_incompatible_requests() { + let coordinator = Arc::new(RefreshCoordinator::default()); + let first_key = make_refresh_key(1, RefreshOptions::default()); + let second_key = make_refresh_key( + 1, + RefreshOptions { + search_kind: Some(PythonEnvironmentKind::Venv), + search_paths: None, + }, + ); + + assert!(matches!( + coordinator.register_request(1, first_key.clone()), + RefreshRegistration::Start + )); + + let (waiting_tx, waiting_rx) = mpsc::channel(); + let worker = { + let coordinator = coordinator.clone(); + let second_key = second_key.clone(); + thread::spawn(move || { + let action = coordinator.register_request(2, second_key.clone()); + waiting_tx.send(()).unwrap(); + assert!(matches!(action, RefreshRegistration::Wait)); + + coordinator.wait_until_idle(); + assert!(matches!( + coordinator.register_request(2, second_key.clone()), + RefreshRegistration::Start + )); + let mut completion_guard = RefreshCompletionGuard::begin(&coordinator, &second_key); + let request_ids = completion_guard.drain_request_ids(); + assert!(completion_guard.finish_if_no_pending()); + request_ids + }) + }; + + waiting_rx.recv().unwrap(); + let mut completion_guard = RefreshCompletionGuard::begin(&coordinator, &first_key); + assert_eq!(completion_guard.drain_request_ids(), vec![1]); + assert!(completion_guard.finish_if_no_pending()); + assert_eq!(worker.join().unwrap(), vec![2]); + } + + #[test] + fn test_conda_refresh_state_sync_replaces_shared_caches() { + let environment = EnvironmentApi::new(); + let shared = Conda::from(&environment); + let refreshed = Conda::from(&environment); + + let stale_env_path = PathBuf::from("/stale/env"); + let fresh_env_path = PathBuf::from("/fresh/env"); + let stale_manager_path = PathBuf::from("/stale/conda"); + let fresh_manager_path = PathBuf::from("/fresh/conda"); + let stale_mamba_path = PathBuf::from("/stale/mamba"); + let fresh_mamba_path = PathBuf::from("/fresh/mamba"); + + shared.environments.insert( + stale_env_path.clone(), + PythonEnvironment::new( + Some(stale_env_path.join("python")), + Some(PythonEnvironmentKind::Conda), + Some(stale_env_path.clone()), + None, + Some("3.10.0".to_string()), + ), + ); + shared.managers.insert( + stale_manager_path.clone(), + CondaManager { + executable: stale_manager_path.clone(), + version: Some("23.1.0".to_string()), + conda_dir: Some(PathBuf::from("/stale")), + manager_type: EnvManagerType::Conda, + }, + ); + shared.mamba_managers.insert( + stale_mamba_path.clone(), + CondaManager { + executable: stale_mamba_path.clone(), + version: Some("1.5.0".to_string()), + conda_dir: Some(PathBuf::from("/stale")), + manager_type: EnvManagerType::Mamba, + }, + ); + + refreshed.environments.insert( + fresh_env_path.clone(), + PythonEnvironment::new( + Some(fresh_env_path.join("python")), + Some(PythonEnvironmentKind::Conda), + Some(fresh_env_path.clone()), + None, + Some("3.11.0".to_string()), + ), + ); + refreshed.managers.insert( + fresh_manager_path.clone(), + CondaManager { + executable: fresh_manager_path.clone(), + version: Some("24.1.0".to_string()), + conda_dir: Some(PathBuf::from("/fresh")), + manager_type: EnvManagerType::Conda, + }, + ); + refreshed.mamba_managers.insert( + fresh_mamba_path.clone(), + CondaManager { + executable: fresh_mamba_path.clone(), + version: Some("2.0.0".to_string()), + conda_dir: Some(PathBuf::from("/fresh")), + manager_type: EnvManagerType::Mamba, + }, + ); + + assert_eq!( + shared.refresh_state(), + RefreshStatePersistence::SyncedDiscoveryState + ); + + shared.sync_refresh_state_from(&refreshed, &RefreshStateSyncScope::Full); + + assert_eq!(shared.environments.len(), 1); + assert!(!shared.environments.contains_key(&stale_env_path)); + assert!(shared.environments.contains_key(&fresh_env_path)); + + assert_eq!(shared.managers.len(), 1); + assert!(!shared.managers.contains_key(&stale_manager_path)); + assert!(shared.managers.contains_key(&fresh_manager_path)); + + assert_eq!(shared.mamba_managers.len(), 1); + assert!(!shared.mamba_managers.contains_key(&stale_mamba_path)); + assert!(shared.mamba_managers.contains_key(&fresh_mamba_path)); + } + + #[test] + fn test_workspace_refresh_does_not_replace_shared_conda_state() { + let environment = EnvironmentApi::new(); + let shared = Arc::new(Conda::from(&environment)); + let refreshed = Arc::new(Conda::from(&environment)); + + let stale_env_path = PathBuf::from("/stale/env"); + let fresh_env_path = PathBuf::from("/fresh/env"); + + shared.environments.insert( + stale_env_path.clone(), + PythonEnvironment::new( + Some(stale_env_path.join("python")), + Some(PythonEnvironmentKind::Conda), + Some(stale_env_path.clone()), + None, + Some("3.10.0".to_string()), + ), + ); + refreshed.environments.insert( + fresh_env_path.clone(), + PythonEnvironment::new( + Some(fresh_env_path.join("python")), + Some(PythonEnvironmentKind::Conda), + Some(fresh_env_path.clone()), + None, + Some("3.11.0".to_string()), + ), + ); + + sync_refresh_locator_state( + &[shared.clone() as Arc], + &[refreshed as Arc], + Some(&SearchScope::Workspace), + ); + + assert_eq!(shared.environments.len(), 1); + assert!(shared.environments.contains_key(&stale_env_path)); + assert!(!shared.environments.contains_key(&fresh_env_path)); + } + + #[test] + fn test_kind_filtered_refresh_skips_unrelated_conda_state_sync() { + let environment = EnvironmentApi::new(); + let shared = Arc::new(Conda::from(&environment)); + let refreshed = Arc::new(Conda::from(&environment)); + + let stale_env_path = PathBuf::from("/stale/env"); + let fresh_env_path = PathBuf::from("/fresh/env"); + + shared.environments.insert( + stale_env_path.clone(), + PythonEnvironment::new( + Some(stale_env_path.join("python")), + Some(PythonEnvironmentKind::Conda), + Some(stale_env_path.clone()), + None, + Some("3.10.0".to_string()), + ), + ); + refreshed.environments.insert( + fresh_env_path.clone(), + PythonEnvironment::new( + Some(fresh_env_path.join("python")), + Some(PythonEnvironmentKind::Conda), + Some(fresh_env_path.clone()), + None, + Some("3.11.0".to_string()), + ), + ); + + sync_refresh_locator_state( + &[shared.clone() as Arc], + &[refreshed as Arc], + Some(&SearchScope::Global(PythonEnvironmentKind::Venv)), + ); + + assert_eq!(shared.environments.len(), 1); + assert!(shared.environments.contains_key(&stale_env_path)); + assert!(!shared.environments.contains_key(&fresh_env_path)); + } + + #[test] + fn test_refresh_coordinator_does_not_join_different_generations() { + let coordinator = RefreshCoordinator::default(); + let options = RefreshOptions::default(); + let first_key = make_refresh_key(1, options.clone()); + let second_key = make_refresh_key(2, options); + + assert!(matches!( + coordinator.register_request(10, first_key.clone()), + RefreshRegistration::Start + )); + assert!(matches!( + coordinator.register_request(11, second_key.clone()), + RefreshRegistration::Wait + )); + let mut completion_guard = RefreshCompletionGuard::begin(&coordinator, &first_key); + assert_eq!(completion_guard.drain_request_ids(), vec![10]); + assert!(completion_guard.finish_if_no_pending()); + assert!(matches!( + coordinator.register_request(11, second_key.clone()), + RefreshRegistration::Start + )); + let mut completion_guard = RefreshCompletionGuard::begin(&coordinator, &second_key); + assert_eq!(completion_guard.drain_request_ids(), vec![11]); + assert!(completion_guard.finish_if_no_pending()); + } + + #[test] + fn test_refresh_coordinator_reuses_same_key_during_completion() { + let coordinator = RefreshCoordinator::default(); + let key = make_refresh_key(1, RefreshOptions::default()); + + assert!(matches!( + coordinator.register_request(1, key.clone()), + RefreshRegistration::Start + )); + + let mut completion_guard = RefreshCompletionGuard::begin(&coordinator, &key); + assert_eq!(completion_guard.drain_request_ids(), vec![1]); + + assert!(matches!( + coordinator.register_request(2, key.clone()), + RefreshRegistration::Joined + )); + assert_eq!(completion_guard.drain_request_ids(), vec![2]); + assert!(completion_guard.finish_if_no_pending()); + } + + #[test] + fn test_refresh_coordinator_waits_for_completion_boundary() { + let coordinator = Arc::new(RefreshCoordinator::default()); + let first_key = make_refresh_key(1, RefreshOptions::default()); + let second_key = make_refresh_key( + 1, + RefreshOptions { + search_kind: Some(PythonEnvironmentKind::Venv), + search_paths: None, + }, + ); + + assert!(matches!( + coordinator.register_request(1, first_key.clone()), + RefreshRegistration::Start + )); + let mut completion_guard = RefreshCompletionGuard::begin(&coordinator, &first_key); + assert_eq!(completion_guard.drain_request_ids(), vec![1]); + + let (state_tx, state_rx) = mpsc::channel(); + let worker = { + let coordinator = coordinator.clone(); + let second_key = second_key.clone(); + thread::spawn(move || { + assert!(matches!( + coordinator.register_request(2, second_key.clone()), + RefreshRegistration::Wait + )); + state_tx.send("waiting").unwrap(); + coordinator.wait_until_idle(); + state_tx.send("idle").unwrap(); + assert!(matches!( + coordinator.register_request(2, second_key.clone()), + RefreshRegistration::Start + )); + let mut completion_guard = RefreshCompletionGuard::begin(&coordinator, &second_key); + let request_ids = completion_guard.drain_request_ids(); + assert!(completion_guard.finish_if_no_pending()); + request_ids + }) + }; + + assert_eq!(state_rx.recv().unwrap(), "waiting"); + assert!(state_rx.try_recv().is_err()); + + assert!(completion_guard.finish_if_no_pending()); + + assert_eq!(state_rx.recv().unwrap(), "idle"); + assert_eq!(worker.join().unwrap(), vec![2]); + } + + #[test] + fn test_refresh_completion_guard_releases_waiters_on_unwind() { + let coordinator = Arc::new(RefreshCoordinator::default()); + let first_key = make_refresh_key(1, RefreshOptions::default()); + let second_key = make_refresh_key( + 1, + RefreshOptions { + search_kind: Some(PythonEnvironmentKind::Venv), + search_paths: None, + }, + ); + + assert!(matches!( + coordinator.register_request(1, first_key.clone()), + RefreshRegistration::Start + )); + + let (state_tx, state_rx) = mpsc::channel(); + let waiter = { + let coordinator = coordinator.clone(); + let second_key = second_key.clone(); + thread::spawn(move || { + assert!(matches!( + coordinator.register_request(2, second_key.clone()), + RefreshRegistration::Wait + )); + state_tx.send("waiting").unwrap(); + coordinator.wait_until_idle(); + state_tx.send("idle").unwrap(); + assert!(matches!( + coordinator.register_request(2, second_key.clone()), + RefreshRegistration::Start + )); + let mut completion_guard = RefreshCompletionGuard::begin(&coordinator, &second_key); + let request_ids = completion_guard.drain_request_ids(); + assert!(completion_guard.finish_if_no_pending()); + request_ids + }) + }; + + assert_eq!(state_rx.recv().unwrap(), "waiting"); + let panic_result = panic::catch_unwind(AssertUnwindSafe(|| { + let completion_guard = RefreshCompletionGuard::begin(coordinator.as_ref(), &first_key); + assert_eq!(completion_guard.drain_request_ids(), vec![1]); + panic!("forced completion panic"); + })); + assert!(panic_result.is_err()); + + assert_eq!(state_rx.recv().unwrap(), "idle"); + assert_eq!(waiter.join().unwrap(), vec![2]); + } /// Test for https://github.com/microsoft/python-environment-tools/issues/151 /// Verifies that when searchKind is provided (without searchPaths), diff --git a/crates/pet/tests/jsonrpc_client.rs b/crates/pet/tests/jsonrpc_client.rs new file mode 100644 index 00000000..23837f82 --- /dev/null +++ b/crates/pet/tests/jsonrpc_client.rs @@ -0,0 +1,404 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +use serde::de::DeserializeOwned; +use serde::Deserialize; +use serde_json::{json, Value}; +use std::collections::HashMap; +use std::io::{self, BufRead, BufReader, Read, Write}; +use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio}; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::{mpsc, Arc, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant}; + +static REQUEST_ID: AtomicU32 = AtomicU32::new(1); + +const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +pub struct RefreshResult { + pub duration: u128, +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct EnvironmentNotification { + pub executable: Option, + pub kind: Option, + pub name: Option, + pub prefix: Option, + pub error: Option, +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +pub struct ManagerNotification { + pub tool: Option, + pub executable: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct JsonRpcNotification { + pub method: String, + pub params: Value, +} + +struct ClientState { + pending: Mutex>>>, + notifications: Mutex>, + stderr_lines: Mutex>, +} + +impl ClientState { + fn new() -> Self { + Self { + pending: Mutex::new(HashMap::new()), + notifications: Mutex::new(Vec::new()), + stderr_lines: Mutex::new(Vec::new()), + } + } +} + +struct ClientInner { + child: Mutex, + stdin: Mutex>, + state: Arc, + reader_handle: Mutex>>, + stderr_handle: Mutex>>, +} + +impl Drop for ClientInner { + fn drop(&mut self) { + let _ = self.stdin.lock().unwrap().take(); + + { + let mut child = self.child.lock().unwrap(); + if child.try_wait().ok().flatten().is_none() { + let _ = child.kill(); + let _ = child.wait(); + } + } + + if let Some(handle) = self.reader_handle.lock().unwrap().take() { + let _ = handle.join(); + } + if let Some(handle) = self.stderr_handle.lock().unwrap().take() { + let _ = handle.join(); + } + } +} + +#[derive(Clone)] +pub struct PetJsonRpcClient { + inner: Arc, +} + +impl PetJsonRpcClient { + pub fn spawn() -> Result { + let mut process = Command::new(env!("CARGO_BIN_EXE_pet")) + .arg("server") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .env("PATH", "") + .spawn() + .map_err(|e| format!("Failed to spawn pet server: {e}"))?; + + let stdin = process + .stdin + .take() + .ok_or("Failed to capture pet stdin".to_string())?; + let stdout = process + .stdout + .take() + .ok_or("Failed to capture pet stdout".to_string())?; + let stderr = process + .stderr + .take() + .ok_or("Failed to capture pet stderr".to_string())?; + + let state = Arc::new(ClientState::new()); + let reader_handle = spawn_stdout_reader(stdout, state.clone()); + let stderr_handle = spawn_stderr_reader(stderr, state.clone()); + + Ok(Self { + inner: Arc::new(ClientInner { + child: Mutex::new(process), + stdin: Mutex::new(Some(stdin)), + state, + reader_handle: Mutex::new(Some(reader_handle)), + stderr_handle: Mutex::new(Some(stderr_handle)), + }), + }) + } + + pub fn configure(&self, config: Value) -> Result<(), String> { + self.send_request_value("configure", config, DEFAULT_REQUEST_TIMEOUT) + .map(|_| ()) + } + + pub fn refresh(&self, params: Option) -> Result { + self.send_request( + "refresh", + params.unwrap_or_else(|| json!({})), + DEFAULT_REQUEST_TIMEOUT, + ) + } + + #[allow(dead_code)] + pub fn resolve(&self, executable: &str) -> Result { + self.send_request_value( + "resolve", + json!({ "executable": executable }), + DEFAULT_REQUEST_TIMEOUT, + ) + } + + pub fn clear_notifications(&self) { + self.inner.state.notifications.lock().unwrap().clear(); + } + + pub fn notifications(&self) -> Vec { + self.inner.state.notifications.lock().unwrap().clone() + } + + pub fn notification_count(&self, method: &str) -> usize { + self.notifications() + .into_iter() + .filter(|notification| notification.method == method) + .count() + } + + pub fn wait_for_notification_count( + &self, + method: &str, + expected_count: usize, + timeout: Duration, + ) -> Result<(), String> { + let deadline = Instant::now() + timeout; + while Instant::now() <= deadline { + if self.notification_count(method) >= expected_count { + return Ok(()); + } + thread::sleep(Duration::from_millis(10)); + } + Err(format!( + "Timed out waiting for {expected_count} '{method}' notifications; saw {}. stderr: {}", + self.notification_count(method), + self.stderr_output() + )) + } + + pub fn environment_notifications(&self) -> Vec { + self.notifications() + .into_iter() + .filter(|notification| notification.method == "environment") + .map(|notification| { + serde_json::from_value(notification.params) + .expect("environment notification payload should deserialize") + }) + .collect() + } + + pub fn manager_notifications(&self) -> Vec { + self.notifications() + .into_iter() + .filter(|notification| notification.method == "manager") + .map(|notification| { + serde_json::from_value(notification.params) + .expect("manager notification payload should deserialize") + }) + .collect() + } + + pub fn stderr_output(&self) -> String { + self.inner.state.stderr_lines.lock().unwrap().join("") + } + + fn send_request( + &self, + method: &str, + params: Value, + timeout: Duration, + ) -> Result { + let result = self.send_request_value(method, params, timeout)?; + serde_json::from_value(result) + .map_err(|e| format!("Failed to deserialize response for {method}: {e}")) + } + + fn send_request_value( + &self, + method: &str, + params: Value, + timeout: Duration, + ) -> Result { + let id = REQUEST_ID.fetch_add(1, Ordering::SeqCst); + let request = json!({ + "jsonrpc": "2.0", + "id": id, + "method": method, + "params": params, + }); + let request_text = serde_json::to_string(&request) + .map_err(|e| format!("Failed to serialize {method} request: {e}"))?; + let wire_message = format!( + "Content-Length: {}\r\n\r\n{}", + request_text.len(), + request_text + ); + + let (tx, rx) = mpsc::channel(); + self.inner.state.pending.lock().unwrap().insert(id, tx); + + let write_result = { + let mut stdin_guard = self.inner.stdin.lock().unwrap(); + let stdin = stdin_guard + .as_mut() + .ok_or_else(|| "PET stdin is no longer available".to_string())?; + stdin + .write_all(wire_message.as_bytes()) + .and_then(|_| stdin.flush()) + .map_err(|e| format!("Failed to send {method} request: {e}")) + }; + + if let Err(err) = write_result { + self.inner.state.pending.lock().unwrap().remove(&id); + return Err(err); + } + + match rx.recv_timeout(timeout) { + Ok(result) => result, + Err(mpsc::RecvTimeoutError::Timeout) => { + self.inner.state.pending.lock().unwrap().remove(&id); + Err(format!( + "Timed out waiting for {method} response after {timeout:?}" + )) + } + Err(mpsc::RecvTimeoutError::Disconnected) => Err(format!( + "Response channel disconnected while waiting for {method}; stderr: {}", + self.stderr_output() + )), + } + } +} + +fn spawn_stdout_reader(stdout: ChildStdout, state: Arc) -> JoinHandle<()> { + thread::spawn(move || { + let mut reader = BufReader::new(stdout); + let read_result = loop { + match read_message(&mut reader) { + Ok(Some(message)) => { + if let Some(method) = message.get("method").and_then(|value| value.as_str()) { + state + .notifications + .lock() + .unwrap() + .push(JsonRpcNotification { + method: method.to_string(), + params: message.get("params").cloned().unwrap_or(Value::Null), + }); + continue; + } + + if let Some(id) = message.get("id").and_then(|value| value.as_u64()) { + if let Some(sender) = state.pending.lock().unwrap().remove(&(id as u32)) { + if let Some(error) = message.get("error") { + let _ = sender.send(Err(format!("JSONRPC error: {error:?}"))); + } else { + let _ = sender.send(Ok(message + .get("result") + .cloned() + .unwrap_or(Value::Null))); + } + } + } + } + Ok(None) => break Ok(()), + Err(err) => break Err(err), + } + }; + + let failure = match read_result { + Ok(()) => "PET stdout closed".to_string(), + Err(err) => format!("Failed to read PET stdout: {err}"), + }; + let pending = std::mem::take(&mut *state.pending.lock().unwrap()); + for (_, sender) in pending { + let _ = sender.send(Err(failure.clone())); + } + }) +} + +fn spawn_stderr_reader( + stderr: impl Read + Send + 'static, + state: Arc, +) -> JoinHandle<()> { + thread::spawn(move || { + let mut reader = BufReader::new(stderr); + loop { + let mut line = String::new(); + match reader.read_line(&mut line) { + Ok(0) => return, + Ok(_) => state.stderr_lines.lock().unwrap().push(line), + Err(err) => { + state + .stderr_lines + .lock() + .unwrap() + .push(format!("Failed to read PET stderr: {err}\n")); + return; + } + } + } + }) +} + +fn read_message(reader: &mut BufReader) -> io::Result> { + let mut content_length: Option = None; + loop { + let mut header = String::new(); + let bytes_read = reader.read_line(&mut header)?; + if bytes_read == 0 { + if content_length.is_none() { + return Ok(None); + } + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "unexpected EOF while reading JSONRPC headers", + )); + } + + let trimmed = header.trim(); + if trimmed.is_empty() { + break; + } + + if let Some(length) = trimmed.strip_prefix("Content-Length: ") { + content_length = Some(length.parse().map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid Content-Length header: {e}"), + ) + })?); + } + } + + let content_length = content_length.ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidData, "missing Content-Length header") + })?; + let mut body = vec![0u8; content_length]; + reader.read_exact(&mut body)?; + let body_text = String::from_utf8(body).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid UTF-8 body: {e}"), + ) + })?; + let message = serde_json::from_str::(&body_text).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid JSONRPC payload: {e}"), + ) + })?; + Ok(Some(message)) +} diff --git a/crates/pet/tests/jsonrpc_server_test.rs b/crates/pet/tests/jsonrpc_server_test.rs new file mode 100644 index 00000000..f80b6098 --- /dev/null +++ b/crates/pet/tests/jsonrpc_server_test.rs @@ -0,0 +1,323 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +use pet_fs::path::norm_case; +use serde_json::json; +use std::fs; +use std::path::{Path, PathBuf}; +use std::thread; +use std::time::Duration; +use tempfile::TempDir; + +mod jsonrpc_client; + +use jsonrpc_client::{EnvironmentNotification, PetJsonRpcClient}; + +fn create_fake_workspace(prompt: &str) -> (TempDir, PathBuf, PathBuf) { + let temp_dir = tempfile::tempdir().expect("failed to create temp directory"); + let workspace = temp_dir.path().join("workspace"); + let venv = workspace.join(".venv"); + + #[cfg(windows)] + let bin_dir = venv.join("Scripts"); + #[cfg(unix)] + let bin_dir = venv.join("bin"); + + fs::create_dir_all(&bin_dir).expect("failed to create fake venv directories"); + fs::write( + venv.join("pyvenv.cfg"), + format!("version = 3.11.0\nprompt = {prompt}\n"), + ) + .expect("failed to write pyvenv.cfg"); + fs::write(python_executable_path(&bin_dir), "fake python") + .expect("failed to create fake python executable"); + + (temp_dir, workspace, venv) +} + +fn create_fake_workspace_with_projects( + prompt_prefix: &str, + project_count: usize, +) -> (TempDir, PathBuf, Vec) { + let temp_dir = tempfile::tempdir().expect("failed to create temp directory"); + let workspace = temp_dir.path().join("workspace"); + let mut venvs = Vec::new(); + + for index in 0..project_count { + let venv = workspace.join(format!("env-{index}")); + #[cfg(windows)] + let bin_dir = venv.join("Scripts"); + #[cfg(unix)] + let bin_dir = venv.join("bin"); + + fs::create_dir_all(&bin_dir).expect("failed to create fake venv directories"); + fs::write( + venv.join("pyvenv.cfg"), + format!("version = 3.11.0\nprompt = {prompt_prefix}-{index}\n"), + ) + .expect("failed to write pyvenv.cfg"); + fs::write(python_executable_path(&bin_dir), "fake python") + .expect("failed to create fake python executable"); + venvs.push(venv); + } + + (temp_dir, workspace, venvs) +} + +fn python_executable_path(bin_dir: &Path) -> PathBuf { + #[cfg(windows)] + { + return bin_dir.join("python.exe"); + } + + #[cfg(unix)] + { + bin_dir.join("python") + } +} + +fn cache_dir(root: &TempDir) -> PathBuf { + root.path().join("cache") +} + +fn normalized_notification_path(path: &Option) -> Option { + path.as_ref().map(|path| norm_case(PathBuf::from(path))) +} + +fn assert_single_environment( + environments: &[EnvironmentNotification], + expected_executable: &Path, + expected_prefix: &Path, + expected_name: &str, + stderr: &str, +) { + assert_eq!( + environments.len(), + 1, + "expected exactly one environment notification, got {environments:?}; stderr: {stderr}" + ); + let environment = &environments[0]; + assert_eq!(environment.kind.as_deref(), Some("Venv")); + assert_eq!(environment.name.as_deref(), Some(expected_name)); + assert_eq!( + normalized_notification_path(&environment.executable).as_deref(), + Some(norm_case(expected_executable)).as_deref() + ); + assert_eq!( + normalized_notification_path(&environment.prefix).as_deref(), + Some(norm_case(expected_prefix)).as_deref() + ); + assert_eq!(environment.error, None); +} + +#[test] +fn configure_and_workspace_refresh_report_fake_venv() { + let client = PetJsonRpcClient::spawn().expect("failed to spawn PET server"); + let (temp_dir, workspace, venv) = create_fake_workspace("workspace-env"); + + client + .configure(json!({ + "workspaceDirectories": [workspace.clone()], + "cacheDirectory": cache_dir(&temp_dir), + })) + .expect("configure request failed"); + + client.clear_notifications(); + client + .refresh(Some(json!({ "searchPaths": [workspace.clone()] }))) + .expect("refresh request failed"); + + client + .wait_for_notification_count("telemetry", 1, Duration::from_secs(5)) + .expect("timed out waiting for refresh telemetry"); + let environments = client.environment_notifications(); + assert_single_environment( + &environments, + &python_executable_path(&venv.join(if cfg!(windows) { "Scripts" } else { "bin" })), + &venv, + "workspace-env", + &client.stderr_output(), + ); + assert_eq!( + client.manager_notifications().len(), + 0, + "fake venv refresh should not report any managers" + ); + assert_eq!(client.notification_count("telemetry"), 1); +} + +#[test] +fn concurrent_identical_refresh_requests_share_one_notification_stream() { + let client = PetJsonRpcClient::spawn().expect("failed to spawn PET server"); + let expected_environment_count = 24; + let (temp_dir, workspace, venvs) = + create_fake_workspace_with_projects("shared-env", expected_environment_count); + + client + .configure(json!({ + "workspaceDirectories": [workspace.clone()], + "cacheDirectory": cache_dir(&temp_dir), + })) + .expect("configure request failed"); + + client.clear_notifications(); + let request_params = json!({ "searchPaths": [workspace.clone()] }); + + let mut handles = Vec::new(); + for _ in 0..3 { + let client = client.clone(); + let params = request_params.clone(); + handles.push(thread::spawn(move || client.refresh(Some(params)))); + } + + let refresh_results = handles + .into_iter() + .map(|handle| handle.join().expect("refresh thread panicked")) + .collect::, _>>() + .expect("concurrent refresh request failed"); + + assert_eq!(refresh_results.len(), 3); + for result in refresh_results.windows(2) { + assert_eq!( + result[0].duration, result[1].duration, + "joined refreshes should reuse the same refresh result" + ); + } + + client + .wait_for_notification_count( + "environment", + expected_environment_count, + Duration::from_secs(5), + ) + .expect("timed out waiting for environment notifications"); + client + .wait_for_notification_count("telemetry", 1, Duration::from_secs(5)) + .expect("timed out waiting for refresh telemetry"); + + let environments = client.environment_notifications(); + assert_eq!( + environments.len(), + expected_environment_count, + "expected one environment notification per fake venv; stderr: {}", + client.stderr_output() + ); + let mut names = environments + .iter() + .map(|environment| environment.name.clone().unwrap_or_default()) + .collect::>(); + names.sort(); + let mut expected_names = (0..expected_environment_count) + .map(|index| format!("shared-env-{index}")) + .collect::>(); + expected_names.sort(); + assert_eq!(names, expected_names); + for venv in venvs { + let expected_executable = norm_case(python_executable_path(&venv.join(if cfg!(windows) { + "Scripts" + } else { + "bin" + }))); + assert!( + environments.iter().any(|environment| { + normalized_notification_path(&environment.executable).as_deref() + == Some(expected_executable.as_path()) + }), + "expected to find notification for {:?}; notifications: {:?}; stderr: {}", + expected_executable, + environments, + client.stderr_output() + ); + } + assert_eq!( + client.notification_count("environment"), + expected_environment_count, + "identical refresh requests should emit one environment notification stream" + ); + assert_eq!( + client.notification_count("telemetry"), + 1, + "identical refresh requests should emit one telemetry notification" + ); +} + +#[test] +fn concurrent_distinct_refresh_requests_run_separately() { + let client = PetJsonRpcClient::spawn().expect("failed to spawn PET server"); + let (temp_dir_a, workspace_a, venv_a) = create_fake_workspace("first-env"); + let (temp_dir_b, workspace_b, venv_b) = create_fake_workspace("second-env"); + + client + .configure(json!({ + "workspaceDirectories": [workspace_a.clone(), workspace_b.clone()], + "cacheDirectory": cache_dir(&temp_dir_a), + })) + .expect("configure request failed"); + + let _temp_dir_b = temp_dir_b; + client.clear_notifications(); + + let client_a = client.clone(); + let client_b = client.clone(); + let handle_a = + thread::spawn(move || client_a.refresh(Some(json!({ "searchPaths": [workspace_a] })))); + let handle_b = + thread::spawn(move || client_b.refresh(Some(json!({ "searchPaths": [workspace_b] })))); + + handle_a + .join() + .expect("first refresh thread panicked") + .expect("first refresh failed"); + handle_b + .join() + .expect("second refresh thread panicked") + .expect("second refresh failed"); + + client + .wait_for_notification_count("environment", 2, Duration::from_secs(5)) + .expect("timed out waiting for environment notifications"); + client + .wait_for_notification_count("telemetry", 2, Duration::from_secs(5)) + .expect("timed out waiting for telemetry notifications"); + let mut environments = client.environment_notifications(); + environments.sort_by(|left, right| left.name.cmp(&right.name)); + + assert_eq!( + environments.len(), + 2, + "distinct refreshes should each report their targeted workspace envs; stderr: {}", + client.stderr_output() + ); + + assert_eq!(environments[0].kind.as_deref(), Some("Venv")); + assert_eq!(environments[0].name.as_deref(), Some("first-env")); + assert_eq!( + normalized_notification_path(&environments[0].executable).as_deref(), + Some( + norm_case(python_executable_path(&venv_a.join(if cfg!(windows) { + "Scripts" + } else { + "bin" + }),)) + .as_path() + ) + ); + assert_eq!(environments[1].kind.as_deref(), Some("Venv")); + assert_eq!(environments[1].name.as_deref(), Some("second-env")); + assert_eq!( + normalized_notification_path(&environments[1].executable).as_deref(), + Some( + norm_case(python_executable_path(&venv_b.join(if cfg!(windows) { + "Scripts" + } else { + "bin" + }),)) + .as_path() + ) + ); + assert_eq!( + client.notification_count("telemetry"), + 2, + "distinct refresh requests should emit separate telemetry notifications" + ); +}