Skip to content

Commit edf04e1

Browse files
authored
Merge pull request #4950 from WhySoBad/fix-blocking-io-manager
Fix panic when multiple threads block on same fd
2 parents 6428be1 + 6d4d7a6 commit edf04e1

7 files changed

Lines changed: 187 additions & 35 deletions

File tree

src/tools/miri/src/concurrency/blocking_io.rs

Lines changed: 110 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
use std::collections::BTreeMap;
12
use std::io;
23
use std::time::Duration;
34

45
use mio::event::Source;
56
use mio::{Events, Interest, Poll, Token};
67
use rustc_data_structures::fx::FxHashMap;
78

9+
use crate::shims::{FdId, FileDescriptionRef};
810
use crate::*;
911

1012
/// Capacity of the event queue which can be polled at a time.
@@ -18,6 +20,14 @@ pub trait WithSource {
1820
fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()>;
1921
}
2022

23+
/// An interest receiver defines the action that should be taken when
24+
/// the associated [`Interest`] is fulfilled.
25+
#[derive(Debug, Hash, PartialEq, Clone, Copy, Eq, PartialOrd, Ord)]
26+
pub enum InterestReceiver {
27+
/// The specified thread should be unblocked.
28+
UnblockThread(ThreadId),
29+
}
30+
2131
/// Manager for managing blocking host I/O in a non-blocking manner.
2232
/// We use [`Poll`] to poll for new I/O events from the OS for sources
2333
/// registered using this manager.
@@ -34,9 +44,10 @@ pub struct BlockingIoManager {
3444
/// This is not part of the state and only stored to avoid allocating a
3545
/// new buffer for every poll.
3646
events: Events,
37-
/// Map between threads which are currently blocked and the
38-
/// underlying I/O source.
39-
sources: FxHashMap<ThreadId, Box<dyn WithSource>>,
47+
/// Map from source ids to the actual sources and their registered receivers
48+
/// together with their associated interests.
49+
sources:
50+
BTreeMap<FdId, (FileDescriptionRef<dyn WithSource>, FxHashMap<InterestReceiver, Interest>)>,
4051
}
4152

4253
impl BlockingIoManager {
@@ -46,7 +57,7 @@ impl BlockingIoManager {
4657
let manager = Self {
4758
poll: communicate.then_some(Poll::new()?),
4859
events: Events::with_capacity(IO_EVENT_CAPACITY),
49-
sources: FxHashMap::default(),
60+
sources: BTreeMap::default(),
5061
};
5162
Ok(manager)
5263
}
@@ -59,8 +70,12 @@ impl BlockingIoManager {
5970
/// specified duration.
6071
/// - If the timeout is [`None`] the poll blocks indefinitely until an event occurs.
6172
///
62-
/// Returns all threads that are ready because they received an I/O event.
63-
pub fn poll(&mut self, timeout: Option<Duration>) -> Result<Vec<ThreadId>, io::Error> {
73+
/// Returns the interest receivers for all file descriptions which received an I/O event together
74+
/// with the file description they were registered for.
75+
pub fn poll(
76+
&mut self,
77+
timeout: Option<Duration>,
78+
) -> Result<Vec<(InterestReceiver, FileDescriptionRef<dyn WithSource>)>, io::Error> {
6479
let poll =
6580
self.poll.as_mut().expect("Blocking I/O should not be called with isolation enabled");
6681

@@ -70,56 +85,120 @@ impl BlockingIoManager {
7085
let ready = self
7186
.events
7287
.iter()
73-
.map(|event| {
88+
.flat_map(|event| {
7489
let token = event.token();
75-
ThreadId::new_unchecked(token.0.try_into().unwrap())
90+
// We know all tokens are valid `FdId`.
91+
let fd_id = FdId::new_unchecked(token.0);
92+
let (source, interests) =
93+
self.sources.get(&fd_id).expect("Source should be registered");
94+
assert_eq!(source.id(), fd_id);
95+
// Because we allow spurious wake-ups, we mark all interests as ready even
96+
// though some may not have been fulfilled.
97+
interests.keys().map(move |receiver| (*receiver, source.clone()))
7698
})
7799
.collect::<Vec<_>>();
78100

79-
// Deregister all ready sources as we only want to receive one event per thread.
80-
ready.iter().for_each(|thread_id| self.deregister(*thread_id));
101+
// Deregister all ready sources as we only want to receive one event per receiver.
102+
ready.iter().for_each(|(receiver, source)| self.deregister(source.id(), *receiver));
81103

82104
Ok(ready)
83105
}
84106

85-
/// Register a blocking I/O source for a thread together with it's poll interests.
86-
///
87-
/// The source will be deregistered automatically once an event for it is received.
107+
/// Register an interest for a blocking I/O source.
88108
///
89109
/// As the OS can always produce spurious wake-ups, it's the callers responsibility to
90110
/// verify the requested I/O interests are really ready and to register again if they're not.
91-
pub fn register(&mut self, source: Box<dyn WithSource>, thread: ThreadId, interests: Interest) {
111+
///
112+
/// It's assumed that no interest is already registered for this source with the same reason!
113+
pub fn register(
114+
&mut self,
115+
source_fd: FileDescriptionRef<dyn WithSource>,
116+
receiver: InterestReceiver,
117+
interest: Interest,
118+
) {
92119
let poll =
93120
self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
94121

95-
let token = Token(thread.to_u32().to_usize());
122+
let id = source_fd.id();
123+
let token = Token(id.to_usize());
124+
125+
let Some((_, current_interests)) = self.sources.get_mut(&id) else {
126+
// The source is not yet registered.
127+
128+
// Treat errors from registering as fatal. On UNIX hosts this can only
129+
// fail due to system resource errors (e.g. ENOMEM or ENOSPC).
130+
source_fd
131+
.with_source(&mut |source| poll.registry().register(source, token, interest))
132+
.unwrap();
133+
134+
self.sources.insert(id, (source_fd, FxHashMap::from_iter([(receiver, interest)])));
135+
return;
136+
};
137+
138+
// The source is already registered. We need to check whether we need to
139+
// reregister because the provided interest contains new interests for the source.
96140

97-
// Treat errors from registering as fatal. On UNIX hosts this can only
141+
let old_interest =
142+
interest_union(current_interests).expect("Source should contain at least one interest");
143+
144+
current_interests
145+
.try_insert(receiver, interest)
146+
.unwrap_or_else(|_| panic!("Receiver should be unique"));
147+
148+
let new_interest = old_interest.add(interest);
149+
150+
// Reregister the source since the overall interests might have changed.
151+
152+
// Treat errors from reregistering as fatal. On UNIX hosts this can only
98153
// fail due to system resource errors (e.g. ENOMEM or ENOSPC).
99-
source
100-
.with_source(&mut |source| source.register(poll.registry(), token, interests))
154+
source_fd
155+
.with_source(&mut |source| poll.registry().reregister(source, token, new_interest))
101156
.unwrap();
102-
self.sources
103-
.try_insert(thread, source)
104-
.unwrap_or_else(|_| panic!("A thread cannot be registered twice at the same time"));
105157
}
106158

107-
/// Deregister the event source for a thread. Returns the kind of I/O the thread was
108-
/// blocked on.
109-
fn deregister(&mut self, thread: ThreadId) {
159+
/// Deregister an interest from a blocking I/O source.
160+
///
161+
/// The receiver is assumed to be registered for the provided source!
162+
pub fn deregister(&mut self, source_id: FdId, receiver: InterestReceiver) {
110163
let poll =
111164
self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled");
112165

113-
let Some(source) = self.sources.remove(&thread) else {
114-
panic!("Attempt to deregister a token which isn't registered")
166+
let token = Token(source_id.to_usize());
167+
let (fd, current_interests) =
168+
self.sources.get_mut(&source_id).expect("Source should be registered");
169+
170+
current_interests
171+
.remove(&receiver)
172+
.unwrap_or_else(|| panic!("Receiver should be registered for source"));
173+
174+
let Some(new_interest) = interest_union(current_interests) else {
175+
// There are no longer any interests in this source.
176+
// We can thus deregister the source from the poll.
177+
178+
// Treat errors from deregistering as fatal. On UNIX hosts this can only
179+
// fail due to system resource errors (e.g. ENOMEM or ENOSPC).
180+
fd.with_source(&mut |source| poll.registry().deregister(source)).unwrap();
181+
self.sources.remove(&source_id);
182+
return;
115183
};
116184

117-
// Treat errors from deregistering as fatal. On UNIX hosts this can only
185+
// Reregister the source since the overall interests might have changed.
186+
187+
// Treat errors from reregistering as fatal. On UNIX hosts this can only
118188
// fail due to system resource errors (e.g. ENOMEM or ENOSPC).
119-
source.with_source(&mut |source| source.deregister(poll.registry())).unwrap();
189+
fd.with_source(&mut |source| poll.registry().reregister(source, token, new_interest))
190+
.unwrap();
120191
}
121192
}
122193

194+
/// Get the union of all interests for a source. Returns `None` if the map is empty.
195+
fn interest_union(interests: &FxHashMap<InterestReceiver, Interest>) -> Option<Interest> {
196+
interests
197+
.values()
198+
.copied()
199+
.fold(None, |acc, interest| acc.map(|acc: Interest| acc.add(interest)).or(Some(interest)))
200+
}
201+
123202
impl<'tcx> EvalContextExt<'tcx> for MiriInterpCx<'tcx> {}
124203
pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
125204
/// Block the current thread until some interests on an I/O source
@@ -132,15 +211,15 @@ pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
132211
#[inline]
133212
fn block_thread_for_io(
134213
&mut self,
135-
source: impl WithSource + 'static,
214+
source_fd: FileDescriptionRef<dyn WithSource>,
136215
interests: Interest,
137216
timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>,
138217
callback: DynUnblockCallback<'tcx>,
139218
) {
140219
let this = self.eval_context_mut();
141220
this.machine.blocking_io.register(
142-
Box::new(source),
143-
this.machine.threads.active_thread(),
221+
source_fd,
222+
InterestReceiver::UnblockThread(this.machine.threads.active_thread()),
144223
interests,
145224
);
146225
this.block_thread(BlockReason::IO, timeout, callback);

src/tools/miri/src/concurrency/thread.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use rustc_span::{DUMMY_SP, Span};
1818
use rustc_target::spec::Os;
1919

2020
use crate::concurrency::GlobalDataRaceHandler;
21+
use crate::concurrency::blocking_io::InterestReceiver;
2122
use crate::shims::tls;
2223
use crate::*;
2324

@@ -822,7 +823,12 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
822823
Err(e) => panic!("unexpected error while polling: {e}"),
823824
};
824825

825-
ready.into_iter().try_for_each(|thread_id| this.unblock_thread(thread_id, BlockReason::IO))
826+
ready.into_iter().try_for_each(|(receiver, _source)| {
827+
match receiver {
828+
InterestReceiver::UnblockThread(thread_id) =>
829+
this.unblock_thread(thread_id, BlockReason::IO),
830+
}
831+
})
826832
}
827833

828834
/// Find all threads with expired timeouts, unblock them and execute their timeout callbacks.

src/tools/miri/src/shims/files.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,17 @@ use crate::*;
1919
#[derive(Debug, Copy, Clone, Default, Eq, PartialEq, Ord, PartialOrd)]
2020
pub struct FdId(usize);
2121

22+
impl FdId {
23+
pub fn to_usize(self) -> usize {
24+
self.0
25+
}
26+
27+
/// Create a new fd id from a `usize` without checking if this fd exists.
28+
pub fn new_unchecked(id: usize) -> Self {
29+
Self(id)
30+
}
31+
}
32+
2233
#[derive(Debug, Clone)]
2334
struct FdIdWith<T: ?Sized> {
2435
id: FdId,

src/tools/miri/src/shims/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub mod time;
2323
pub mod tls;
2424
pub mod unwind;
2525

26-
pub use self::files::FdTable;
26+
pub use self::files::{FdId, FdTable, FileDescriptionRef};
2727
#[cfg(all(feature = "native-lib", unix))]
2828
pub use self::native_lib::trace::{init_sv, register_retcode_sv};
2929
pub use self::unix::{DirTable, EpollInterestTable};

src/tools/miri/src/shims/unix/fd.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::shims::unix::*;
1515
use crate::*;
1616

1717
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
18-
pub(crate) enum FlockOp {
18+
pub enum FlockOp {
1919
SharedLock { nonblocking: bool },
2020
ExclusiveLock { nonblocking: bool },
2121
Unlock,

src/tools/miri/src/shims/unix/socket.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1388,7 +1388,7 @@ impl VisitProvenance for FileDescriptionRef<Socket> {
13881388
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {}
13891389
}
13901390

1391-
impl WithSource for FileDescriptionRef<Socket> {
1391+
impl WithSource for Socket {
13921392
fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()> {
13931393
let mut state = self.state.borrow_mut();
13941394
match &mut *state {
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
//@ignore-target: windows # No libc socket on Windows
2+
//@compile-flags: -Zmiri-disable-isolation -Zmiri-fixed-schedule
3+
4+
#[path = "../../utils/libc.rs"]
5+
mod libc_utils;
6+
use std::thread;
7+
8+
use libc_utils::*;
9+
10+
// This tests that the blocking I/O implementation works when multiple threads block on the
11+
// same fd at the same time.
12+
13+
fn main() {
14+
let (server_sockfd, addr) = net::make_listener_ipv4(0).unwrap();
15+
let client_sockfd =
16+
unsafe { errno_result(libc::socket(libc::AF_INET, libc::SOCK_STREAM, 0)).unwrap() };
17+
18+
// Spawn the server thread.
19+
let server_thread = thread::spawn(move || {
20+
let (peerfd, _) = net::accept_ipv4(server_sockfd).unwrap();
21+
22+
// Yield back to reader threads to ensure that we have
23+
// two threads being blocked on the same fd at the same time.
24+
thread::yield_now();
25+
26+
let mut buffer = [22u8; 128];
27+
let bytes_written = unsafe {
28+
errno_result(net::send_all(peerfd, buffer.as_mut_ptr().cast(), buffer.len(), 0))
29+
.unwrap()
30+
};
31+
assert_eq!(bytes_written as usize, 128);
32+
});
33+
34+
net::connect_ipv4(client_sockfd, addr);
35+
36+
let reader_thread = thread::spawn(move || {
37+
let mut buffer = [0u8; 8];
38+
let bytes_read = unsafe {
39+
errno_result(net::recv_all(client_sockfd, buffer.as_mut_ptr().cast(), buffer.len(), 0))
40+
.unwrap()
41+
};
42+
assert_eq!(bytes_read, 8);
43+
assert_eq!(&buffer, &[22u8; 8]);
44+
});
45+
46+
let mut buffer = [0u8; 8];
47+
let bytes_read = unsafe {
48+
errno_result(net::recv_all(client_sockfd, buffer.as_mut_ptr().cast(), buffer.len(), 0))
49+
.unwrap()
50+
};
51+
assert_eq!(bytes_read, 8);
52+
assert_eq!(&buffer, &[22u8; 8]);
53+
54+
reader_thread.join().unwrap();
55+
server_thread.join().unwrap();
56+
}

0 commit comments

Comments
 (0)