Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions driver/src/client/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,8 @@ pub(crate) struct TestOptions {
pub(crate) hello_cb: Option<EventHandler<crate::cmap::Command>>,

pub(crate) jitter: Option<f64>,

pub(crate) topology_worker_shutdown_delay: Option<Duration>,
}

pub(crate) type TestEventSender = tokio::sync::mpsc::Sender<
Expand Down
42 changes: 42 additions & 0 deletions driver/src/sdam/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,3 +349,45 @@ fn ipv6_invalid_me() {
};
assert!(!desc.invalid_me().unwrap());
}

/// Regression test: Client::shutdown() must not deadlock.
///
/// The topology worker's shutdown sequence closes monitors and waits for them to exit.
/// Meanwhile, a monitor in streaming mode may complete a hello and call
/// topology_updater.update(), which waits for acknowledgment from the topology worker.
/// Since the worker has already exited its processing loop, neither side makes progress.
///
/// The fix is to drop the topology worker's update_receiver before closing monitors,
/// so that the monitor's update() call fails immediately.
#[tokio::test(flavor = "multi_thread")]
async fn shutdown_does_not_deadlock() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to reproduce this situation deterministically?
Something like:

  1. Wait for update to be called, and block it
  2. Call shutdown during a blocked update
  3. Unblock the update

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not simply or cleanly, I think. The best way I can think of to implement a version that doesn't rely on sleep timing would be to thread a testing Barrier into both the topology monitor and worker, but it would need to be conditionally enabled by the test right before the shutdown call to avoid blocking on pre-test hearbeats, and that sounds iffy to me.

In practice, the timing test here is effectively deterministic; it fails on the first iteration of the loop.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining that update can't be blocked by the event listener (driver user).

let mut options = get_client_options().await.clone();
options.heartbeat_freq = Some(Duration::from_millis(500));
options.test_options_mut().topology_worker_shutdown_delay = Some(Duration::from_millis(600));

// Add a command event handler to introduce callback overhead on every command
// (including the monitor's streaming hello). This widens the race window between
// the monitor's topology_updater.update() and the topology worker's shutdown.
use crate::event::{command::CommandEvent, EventHandler};
options.command_event_handler = Some(EventHandler::callback(|_event: CommandEvent| {
// Simulate cross-language callback overhead.
std::thread::yield_now();
}));

for i in 0..5 {
let client = Client::with_options(options.clone()).unwrap();

let _ = client
.database("admin")
.run_command(doc! { "ping": 1 })
.await;
tokio::time::sleep(Duration::from_millis(20)).await;

let result = tokio::time::timeout(Duration::from_secs(5), async {
client.shutdown().immediate(true).await;
})
.await;

assert!(result.is_ok(), "shutdown() deadlocked on iteration {}", i);
}
}
22 changes: 22 additions & 0 deletions driver/src/sdam/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,28 @@ impl TopologyWorker {
// indicate to the topology watchers that the topology is no longer alive
drop(self.publisher);

// Drop the update receiver so monitors' topology_updater.update() calls fail
// immediately instead of waiting for acknowledgment from this (now-exited) loop.
// Without this, close_monitor().await deadlocks: we wait for the monitor to exit, but
// the monitor waits for us to acknowledge its update.
drop(self.update_receiver);

#[cfg(test)]
if let Some(dur) = self
.options
.test_options
.as_ref()
.and_then(|to| to.topology_worker_shutdown_delay)
{
// In tests, sleep briefly to give any in-flight monitor hello responses a chance to
// complete and call topology_updater.update(). This widens the race window to
// verify the deadlock fix (dropping update_receiver before closing monitors). Sleep
// longer than heartbeat_freq (maxAwaitTimeMS) to ensure the monitor completes a
// hello and calls topology_updater.update() during this window. Without the fix
// above, this would deadlock.
tokio::time::sleep(dur).await;
}

// Close all the monitors.
let mut close_futures = FuturesUnordered::new();
for (address, server) in self.servers.into_iter() {
Expand Down