Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 199 additions & 1 deletion crates/pet/src/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,56 @@ impl RefreshCoordinator {
*state = RefreshCoordinatorState::Idle;
self.changed.notify_all();
}
RefreshCoordinatorState::Running(active) if active.key == *key => {
// Recovery path: if begin_completion() panicked, the state was
// restored to Running before the unwind. Transition to Idle so
// waiters are not stuck forever.
*state = RefreshCoordinatorState::Idle;
self.changed.notify_all();
}
RefreshCoordinatorState::Idle => {}
_ => {}
_ => {
// Mismatched key — another refresh owns this state. Log and
// leave it alone; the owning refresh will clean up.
error!(
"force_complete_request called with mismatched key; current state not owned by caller"
);
}
}
}
}

/// Safety guard created when a refresh thread takes ownership of the `Running`
/// state. If the thread exits the `Start` arm without ever constructing a
/// `RefreshCompletionGuard` (e.g., because `begin_completion` panics), this
/// guard calls `force_complete_request` to transition the coordinator back to
/// `Idle`, preventing a permanent deadlock.
struct RefreshSafetyGuard<'a> {
coordinator: &'a RefreshCoordinator,
key: RefreshKey,
disarmed: bool,
}

impl<'a> RefreshSafetyGuard<'a> {
fn new(coordinator: &'a RefreshCoordinator, key: RefreshKey) -> Self {
Self {
coordinator,
key,
disarmed: false,
}
}

/// Disarm the safety guard once a `RefreshCompletionGuard` takes over
/// responsibility for the state transition.
fn disarm(&mut self) {
self.disarmed = true;
}
}

impl Drop for RefreshSafetyGuard<'_> {
fn drop(&mut self) {
if !self.disarmed {
self.coordinator.force_complete_request(&self.key);
}
}
}
Expand Down Expand Up @@ -530,6 +578,11 @@ pub fn handle_configure(context: Arc<Context>, id: u32, params: Value) {
state.config.cache_directory = Some(cache_directory);
}
state.generation += 1;
// Reset missing-env reporting so that the next refresh
// after reconfiguration can trigger it again (Fixes #395).
// Done inside the write lock to avoid a TOCTOU window with
// concurrent refresh threads reading the generation.
MISSING_ENVS_REPORTING_STATE.store(MISSING_ENVS_AVAILABLE, Ordering::Release);
trace!(
"Configuring locators with generation {}: {:?}",
state.generation,
Expand Down Expand Up @@ -886,6 +939,15 @@ pub fn handle_refresh(context: Arc<Context>, id: u32, params: Value) {
context.refresh_coordinator.wait_until_idle();
}
RefreshRegistration::Start => {
// Safety guard: if anything in this arm panics
// (including begin_completion), force the
// coordinator back to Idle so waiters are not
// stuck forever.
let mut safety_guard = RefreshSafetyGuard::new(
&context.refresh_coordinator,
refresh_key.clone(),
);

let refresh_result = panic::catch_unwind(AssertUnwindSafe(|| {
execute_refresh(
context.as_ref(),
Expand All @@ -901,6 +963,7 @@ pub fn handle_refresh(context: Arc<Context>, id: u32, params: Value) {
&context.refresh_coordinator,
&refresh_key,
);
safety_guard.disarm();
finish_refresh_replies(&mut completion_guard, &refresh_result);
report_refresh_follow_up(execution);
}
Expand All @@ -913,6 +976,7 @@ pub fn handle_refresh(context: Arc<Context>, id: u32, params: Value) {
&context.refresh_coordinator,
&refresh_key,
);
safety_guard.disarm();
finish_refresh_errors(
&mut completion_guard,
"Refresh failed unexpectedly",
Expand Down Expand Up @@ -1898,4 +1962,138 @@ mod tests {
assert_eq!(result_config.executables, Some(vec![executable]));
assert!(matches!(search_scope, Some(SearchScope::Workspace)));
}

/// Test for #396: force_complete_request recovers from Running state.
/// When begin_completion() cannot be reached (e.g., the thread panics before
/// constructing a RefreshCompletionGuard), force_complete_request must still
/// transition Running → Idle to unblock waiters.
#[test]
fn test_force_complete_request_recovers_from_running_state() {
let coordinator = RefreshCoordinator::default();
let key = make_refresh_key(1, RefreshOptions::default());

// State → Running(key)
assert!(matches!(
coordinator.register_request(1, key.clone()),
RefreshRegistration::Start
));

// Simulate recovery: force_complete_request from Running state.
coordinator.force_complete_request(&key);

// Verify we're back to Idle and can start a new refresh.
assert!(matches!(
coordinator.register_request(2, key.clone()),
RefreshRegistration::Start
));
}

/// Test for #396: RefreshSafetyGuard transitions Running → Idle on drop
/// when begin_completion is never reached.
#[test]
fn test_safety_guard_recovers_running_state_on_drop() {
let coordinator = Arc::new(RefreshCoordinator::default());
let key = make_refresh_key(1, RefreshOptions::default());
let other_key = make_refresh_key(
1,
RefreshOptions {
search_kind: Some(PythonEnvironmentKind::Venv),
search_paths: None,
},
);

assert!(matches!(
coordinator.register_request(1, key.clone()),
RefreshRegistration::Start
));

let (state_tx, state_rx) = mpsc::channel();
let waiter = {
let coordinator = coordinator.clone();
let other_key = other_key.clone();
thread::spawn(move || {
// Different key → returns Wait (not Joined).
assert!(matches!(
coordinator.register_request(2, other_key.clone()),
RefreshRegistration::Wait
));
state_tx.send("waiting").unwrap();
coordinator.wait_until_idle();
state_tx.send("idle").unwrap();
})
};

assert_eq!(state_rx.recv().unwrap(), "waiting");

// Create and immediately drop the safety guard without disarming it.
// This simulates the thread dying before begin_completion.
{
let _guard = RefreshSafetyGuard::new(&coordinator, key.clone());
}

// Waiter should be unblocked.
assert_eq!(state_rx.recv().unwrap(), "idle");
waiter.join().unwrap();
}

/// Test for #396: RefreshSafetyGuard does NOT interfere when disarmed
/// (normal path where RefreshCompletionGuard takes over).
#[test]
fn test_safety_guard_disarmed_does_not_interfere() {
let coordinator = RefreshCoordinator::default();
let key = make_refresh_key(1, RefreshOptions::default());

assert!(matches!(
coordinator.register_request(1, key.clone()),
RefreshRegistration::Start
));

{
let mut safety_guard = RefreshSafetyGuard::new(&coordinator, key.clone());
let mut completion_guard = RefreshCompletionGuard::begin(&coordinator, &key);
safety_guard.disarm();
let ids = completion_guard.drain_request_ids();
assert_eq!(ids, vec![1]);
assert!(completion_guard.finish_if_no_pending());
}

// Should be Idle — can start a new refresh.
assert!(matches!(
coordinator.register_request(2, key.clone()),
RefreshRegistration::Start
));
}

/// Test for #395: configure resets MISSING_ENVS_REPORTING_STATE so that
/// subsequent refreshes can trigger missing-env reporting again.
#[test]
fn test_configure_resets_completed_missing_env_reporting() {
let _guard = MISSING_ENVS_TEST_LOCK.lock().unwrap();

let configuration = Arc::new(RwLock::new(ConfigurationState {
generation: 1,
config: Configuration::default(),
}));

// Simulate a completed first refresh.
MISSING_ENVS_REPORTING_STATE.store(MISSING_ENVS_AVAILABLE, Ordering::Release);
assert!(try_begin_missing_env_reporting(configuration.as_ref(), 1));
complete_missing_env_reporting(1);

// Missing-env reporting is now exhausted.
assert!(!try_begin_missing_env_reporting(configuration.as_ref(), 1));

// Simulate what handle_configure does: bump generation and reset.
{
let mut state = configuration.write().unwrap();
state.generation = 2;
MISSING_ENVS_REPORTING_STATE.store(MISSING_ENVS_AVAILABLE, Ordering::Release);
}

// Missing-env reporting should work again for the new generation.
assert!(try_begin_missing_env_reporting(configuration.as_ref(), 2));

// Cleanup.
MISSING_ENVS_REPORTING_STATE.store(MISSING_ENVS_AVAILABLE, Ordering::Release);
}
}
Loading