Skip to content

Commit 1ea7bee

Browse files
committed
address comments
1 parent a18af3c commit 1ea7bee

2 files changed

Lines changed: 666 additions & 88 deletions

File tree

crates/pet/src/jsonrpc.rs

Lines changed: 97 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,8 +1366,7 @@ mod tests {
13661366
use pet_core::LocatorKind;
13671367
use pet_core::RefreshStatePersistence;
13681368
use std::path::PathBuf;
1369-
use std::sync::mpsc;
1370-
use std::sync::Mutex;
1369+
use std::sync::{mpsc, Barrier, Mutex};
13711370
use std::thread;
13721371

13731372
#[derive(Default)]
@@ -2603,38 +2602,61 @@ mod tests {
26032602
// held.
26042603
started_rx.recv_timeout(Duration::from_secs(5)).unwrap();
26052604

2606-
// Exercise the three read sites called out in #461 from another
2607-
// thread and bound their completion time to prove they did not
2608-
// block on the configure thread.
2609-
let read_start = Instant::now();
2605+
let (read_done_tx, read_done_rx) = mpsc::channel();
2606+
let read_worker = {
2607+
let configuration = configuration.clone();
2608+
thread::spawn(move || {
2609+
// Exercise the three read sites called out in #461 from another
2610+
// thread. If a regression holds `configuration.write()` during
2611+
// Phase B, this worker blocks and the main test thread can fail
2612+
// with a bounded timeout instead of deadlocking.
2613+
let read_start = Instant::now();
2614+
{
2615+
// `execute_refresh` snapshot read path.
2616+
let _state = configuration.read().unwrap();
2617+
}
2618+
// `sync_refresh_locator_state_if_current` read path.
2619+
let _ = sync_refresh_locator_state_if_current(configuration.as_ref(), 0, || {});
2620+
// `GenerationGuardedReporter::report_if_current` read path. The
2621+
// generation here matches the still-old generation (0) so the
2622+
// inner report is invoked, exercising the full read-lock-then-call
2623+
// path.
2624+
let inner = Arc::new(RecordingReporter::default());
2625+
let reporter = GenerationGuardedReporter::new(inner.clone(), configuration, 0);
2626+
let env = PythonEnvironment::new(
2627+
Some(PathBuf::from("/tmp/python")),
2628+
Some(PythonEnvironmentKind::Venv),
2629+
Some(PathBuf::from("/tmp")),
2630+
None,
2631+
Some("3.11.0".to_string()),
2632+
);
2633+
reporter.report_environment(&env);
2634+
2635+
let _ = read_done_tx.send((
2636+
read_start.elapsed(),
2637+
inner.environments.lock().unwrap().len(),
2638+
));
2639+
})
2640+
};
2641+
2642+
let (read_elapsed, reported_count) = match read_done_rx.recv_timeout(Duration::from_secs(2))
26102643
{
2611-
// `execute_refresh` snapshot read path.
2612-
let _state = configuration.read().unwrap();
2613-
}
2614-
// `sync_refresh_locator_state_if_current` read path.
2615-
let _ = sync_refresh_locator_state_if_current(configuration.as_ref(), 0, || {});
2616-
// `GenerationGuardedReporter::report_if_current` read path. The
2617-
// generation here matches the still-old generation (0) so the
2618-
// inner report is invoked, exercising the full read-lock-then-call
2619-
// path.
2620-
let inner = Arc::new(RecordingReporter::default());
2621-
let reporter = GenerationGuardedReporter::new(inner.clone(), configuration.clone(), 0);
2622-
let env = PythonEnvironment::new(
2623-
Some(PathBuf::from("/tmp/python")),
2624-
Some(PythonEnvironmentKind::Venv),
2625-
Some(PathBuf::from("/tmp")),
2626-
None,
2627-
Some("3.11.0".to_string()),
2628-
);
2629-
reporter.report_environment(&env);
2630-
let read_elapsed = read_start.elapsed();
2644+
Ok(result) => result,
2645+
Err(error) => {
2646+
let _ = release_tx.send(());
2647+
let _ = done_rx.recv_timeout(Duration::from_secs(5));
2648+
worker.join().unwrap();
2649+
panic!("concurrent refresh reads should not block on configure: {error}");
2650+
}
2651+
};
2652+
read_worker.join().unwrap();
26312653
assert!(
26322654
read_elapsed < Duration::from_secs(2),
26332655
"concurrent refresh reads should not block on configure (took {read_elapsed:?})"
26342656
);
26352657
// The reporter should also have actually reported because the
26362658
// generation has not yet advanced.
2637-
assert_eq!(inner.environments.lock().unwrap().len(), 1);
2659+
assert_eq!(reported_count, 1);
26382660

26392661
// Release the locator and confirm configure completes and publishes.
26402662
release_tx.send(()).unwrap();
@@ -2659,9 +2681,8 @@ mod tests {
26592681
struct OrderingLocator {
26602682
id: u32,
26612683
events: Arc<Mutex<Vec<Event>>>,
2662-
// Each call sleeps briefly to amplify any opportunity for
2663-
// interleaving.
2664-
sleep: Duration,
2684+
entered: mpsc::Sender<u32>,
2685+
release: Arc<Mutex<mpsc::Receiver<()>>>,
26652686
}
26662687

26672688
impl Locator for OrderingLocator {
@@ -2670,7 +2691,8 @@ mod tests {
26702691
}
26712692
fn configure(&self, _config: &Configuration) {
26722693
self.events.lock().unwrap().push(Event::Enter(self.id));
2673-
thread::sleep(self.sleep);
2694+
self.entered.send(self.id).unwrap();
2695+
self.release.lock().unwrap().recv().unwrap();
26742696
self.events.lock().unwrap().push(Event::Exit(self.id));
26752697
}
26762698
fn supported_categories(&self) -> Vec<PythonEnvironmentKind> {
@@ -2685,20 +2707,18 @@ mod tests {
26852707
let configuration = Arc::new(RwLock::new(ConfigurationState::default()));
26862708
let configure_in_progress = Arc::new(Mutex::new(()));
26872709
let events = Arc::new(Mutex::new(Vec::<Event>::new()));
2710+
let (entered_tx, entered_rx) = mpsc::channel();
2711+
let (release_tx, release_rx) = mpsc::channel();
2712+
let release = Arc::new(Mutex::new(release_rx));
2713+
let start_barrier = Arc::new(Barrier::new(3));
26882714

26892715
let make_locators = |id: u32| -> Arc<Vec<Arc<dyn Locator>>> {
2690-
Arc::new(vec![
2691-
Arc::new(OrderingLocator {
2692-
id,
2693-
events: events.clone(),
2694-
sleep: Duration::from_millis(50),
2695-
}) as Arc<dyn Locator>,
2696-
Arc::new(OrderingLocator {
2697-
id,
2698-
events: events.clone(),
2699-
sleep: Duration::from_millis(50),
2700-
}) as Arc<dyn Locator>,
2701-
])
2716+
Arc::new(vec![Arc::new(OrderingLocator {
2717+
id,
2718+
events: events.clone(),
2719+
entered: entered_tx.clone(),
2720+
release: release.clone(),
2721+
}) as Arc<dyn Locator>])
27022722
};
27032723

27042724
let locators_a = make_locators(1);
@@ -2707,7 +2727,9 @@ mod tests {
27072727
let worker_a = {
27082728
let configuration = configuration.clone();
27092729
let configure_in_progress = configure_in_progress.clone();
2730+
let start_barrier = start_barrier.clone();
27102731
thread::spawn(move || {
2732+
start_barrier.wait();
27112733
apply_configure_options(
27122734
configuration.as_ref(),
27132735
&configure_in_progress,
@@ -2726,13 +2748,12 @@ mod tests {
27262748
.unwrap();
27272749
})
27282750
};
2729-
// Slight delay to make the interleaving window more interesting if
2730-
// the implementation were broken.
2731-
thread::sleep(Duration::from_millis(5));
27322751
let worker_b = {
27332752
let configuration = configuration.clone();
27342753
let configure_in_progress = configure_in_progress.clone();
2754+
let start_barrier = start_barrier.clone();
27352755
thread::spawn(move || {
2756+
start_barrier.wait();
27362757
apply_configure_options(
27372758
configuration.as_ref(),
27382759
&configure_in_progress,
@@ -2752,61 +2773,52 @@ mod tests {
27522773
})
27532774
};
27542775

2776+
start_barrier.wait();
2777+
let first_id = entered_rx.recv_timeout(Duration::from_secs(5)).unwrap();
2778+
assert!(configure_in_progress.try_lock().is_err());
2779+
if let Ok(second_id) = entered_rx.recv_timeout(Duration::from_millis(250)) {
2780+
let _ = release_tx.send(());
2781+
let _ = release_tx.send(());
2782+
worker_a.join().unwrap();
2783+
worker_b.join().unwrap();
2784+
panic!(
2785+
"configure calls interleaved: {first_id} and {second_id} were both inside locator.configure()"
2786+
);
2787+
}
2788+
2789+
release_tx.send(()).unwrap();
2790+
let second_id = entered_rx.recv_timeout(Duration::from_secs(5)).unwrap();
2791+
assert_ne!(first_id, second_id);
2792+
release_tx.send(()).unwrap();
2793+
27552794
worker_a.join().unwrap();
27562795
worker_b.join().unwrap();
27572796

27582797
let events = events.lock().unwrap();
2798+
assert_eq!(events.len(), 4, "two configures × enter+exit");
27592799
assert_eq!(
2760-
events.len(),
2761-
8,
2762-
"two configures × two locators × enter+exit"
2800+
events.as_slice(),
2801+
&[
2802+
Event::Enter(first_id),
2803+
Event::Exit(first_id),
2804+
Event::Enter(second_id),
2805+
Event::Exit(second_id),
2806+
]
27632807
);
2764-
// No interleaving: the id seen on each Enter must match the id seen
2765-
// on the immediately following Exit, and once a configure starts
2766-
// (Enter then Exit pair), the same id must continue running until
2767-
// it has done all its work.
2768-
for chunk in events.chunks_exact(2) {
2769-
match (chunk[0], chunk[1]) {
2770-
(Event::Enter(a), Event::Exit(b)) => assert_eq!(a, b),
2771-
other => panic!("unexpected event pair: {other:?}"),
2772-
}
2773-
}
2774-
// The first four events belong to one configure, the next four to
2775-
// the other — never interleaved.
2776-
let first_id = match events[0] {
2777-
Event::Enter(id) => id,
2778-
_ => unreachable!(),
2779-
};
2780-
for ev in &events[..4] {
2781-
match ev {
2782-
Event::Enter(id) | Event::Exit(id) => assert_eq!(*id, first_id),
2783-
}
2784-
}
2785-
let second_id = match events[4] {
2786-
Event::Enter(id) => id,
2787-
_ => unreachable!(),
2788-
};
2789-
assert_ne!(first_id, second_id);
2790-
for ev in &events[4..] {
2791-
match ev {
2792-
Event::Enter(id) | Event::Exit(id) => assert_eq!(*id, second_id),
2793-
}
2794-
}
27952808

27962809
// Both publishes occurred.
27972810
let state = configuration.read().unwrap();
27982811
assert_eq!(state.generation, 2);
27992812
}
28002813

2801-
/// Test for #461: rollback after a panicking locator must not hold the
2802-
/// `configuration` write lock.
2814+
/// Test for #461: rollback after a panicking locator must leave the
2815+
/// published config unchanged and release configure serialization.
28032816
#[test]
2804-
fn test_configure_panic_rolls_back_without_holding_lock() {
2817+
fn test_configure_panic_leaves_state_unchanged_and_releases_mutex() {
28052818
// The panic locator panics when `workspace_directories` is set.
28062819
// Place a recording locator first so it is configured, then panics
28072820
// happen on the second locator; rollback must re-configure the
2808-
// first locator with the previous (empty) config, all without
2809-
// holding `configuration.write()`.
2821+
// first locator with the previous (empty) config.
28102822
let configuration = Arc::new(RwLock::new(ConfigurationState::default()));
28112823
let configure_in_progress = Arc::new(Mutex::new(()));
28122824
let recording = Arc::new(RecordingConfigureLocator {
@@ -2820,9 +2832,6 @@ mod tests {
28202832
panic_locator.clone() as Arc<dyn Locator>,
28212833
]);
28222834

2823-
// Concurrent reader: continuously confirms it can acquire the read
2824-
// lock without blocking. We just check the result here in the same
2825-
// thread (after the call returns) because the panic path is fast.
28262835
let result = apply_configure_options(
28272836
configuration.as_ref(),
28282837
&configure_in_progress,

0 commit comments

Comments
 (0)