1+ use std:: collections:: BTreeMap ;
12use std:: io;
23use std:: time:: Duration ;
34
45use mio:: event:: Source ;
56use mio:: { Events , Interest , Poll , Token } ;
67use rustc_data_structures:: fx:: FxHashMap ;
78
9+ use crate :: shims:: { FdId , FileDescriptionRef } ;
810use crate :: * ;
911
1012/// Capacity of the event queue which can be polled at a time.
@@ -18,6 +20,14 @@ pub trait WithSource {
1820 fn with_source ( & self , f : & mut dyn FnMut ( & mut dyn Source ) -> io:: Result < ( ) > ) -> io:: Result < ( ) > ;
1921}
2022
23+ /// An interest receiver defines the action that should be taken when
24+ /// the associated [`Interest`] is fulfilled.
25+ #[ derive( Debug , Hash , PartialEq , Clone , Copy , Eq , PartialOrd , Ord ) ]
26+ pub enum InterestReceiver {
27+ /// The specified thread should be unblocked.
28+ UnblockThread ( ThreadId ) ,
29+ }
30+
2131/// Manager for managing blocking host I/O in a non-blocking manner.
2232/// We use [`Poll`] to poll for new I/O events from the OS for sources
2333/// registered using this manager.
@@ -34,9 +44,10 @@ pub struct BlockingIoManager {
3444 /// This is not part of the state and only stored to avoid allocating a
3545 /// new buffer for every poll.
3646 events : Events ,
37- /// Map between threads which are currently blocked and the
38- /// underlying I/O source.
39- sources : FxHashMap < ThreadId , Box < dyn WithSource > > ,
47+ /// Map from source ids to the actual sources and their registered receivers
48+ /// together with their associated interests.
49+ sources :
50+ BTreeMap < FdId , ( FileDescriptionRef < dyn WithSource > , FxHashMap < InterestReceiver , Interest > ) > ,
4051}
4152
4253impl BlockingIoManager {
@@ -46,7 +57,7 @@ impl BlockingIoManager {
4657 let manager = Self {
4758 poll : communicate. then_some ( Poll :: new ( ) ?) ,
4859 events : Events :: with_capacity ( IO_EVENT_CAPACITY ) ,
49- sources : FxHashMap :: default ( ) ,
60+ sources : BTreeMap :: default ( ) ,
5061 } ;
5162 Ok ( manager)
5263 }
@@ -59,8 +70,12 @@ impl BlockingIoManager {
5970 /// specified duration.
6071 /// - If the timeout is [`None`] the poll blocks indefinitely until an event occurs.
6172 ///
62- /// Returns all threads that are ready because they received an I/O event.
63- pub fn poll ( & mut self , timeout : Option < Duration > ) -> Result < Vec < ThreadId > , io:: Error > {
73+ /// Returns the interest receivers for all file descriptions which received an I/O event together
74+ /// with the file description they were registered for.
75+ pub fn poll (
76+ & mut self ,
77+ timeout : Option < Duration > ,
78+ ) -> Result < Vec < ( InterestReceiver , FileDescriptionRef < dyn WithSource > ) > , io:: Error > {
6479 let poll =
6580 self . poll . as_mut ( ) . expect ( "Blocking I/O should not be called with isolation enabled" ) ;
6681
@@ -70,56 +85,120 @@ impl BlockingIoManager {
7085 let ready = self
7186 . events
7287 . iter ( )
73- . map ( |event| {
88+ . flat_map ( |event| {
7489 let token = event. token ( ) ;
75- ThreadId :: new_unchecked ( token. 0 . try_into ( ) . unwrap ( ) )
90+ // We know all tokens are valid `FdId`.
91+ let fd_id = FdId :: new_unchecked ( token. 0 ) ;
92+ let ( source, interests) =
93+ self . sources . get ( & fd_id) . expect ( "Source should be registered" ) ;
94+ assert_eq ! ( source. id( ) , fd_id) ;
95+ // Because we allow spurious wake-ups, we mark all interests as ready even
96+ // though some may not have been fulfilled.
97+ interests. keys ( ) . map ( move |receiver| ( * receiver, source. clone ( ) ) )
7698 } )
7799 . collect :: < Vec < _ > > ( ) ;
78100
79- // Deregister all ready sources as we only want to receive one event per thread .
80- ready. iter ( ) . for_each ( |thread_id | self . deregister ( * thread_id ) ) ;
101+ // Deregister all ready sources as we only want to receive one event per receiver .
102+ ready. iter ( ) . for_each ( |( receiver , source ) | self . deregister ( source . id ( ) , * receiver ) ) ;
81103
82104 Ok ( ready)
83105 }
84106
85- /// Register a blocking I/O source for a thread together with it's poll interests.
86- ///
87- /// The source will be deregistered automatically once an event for it is received.
107+ /// Register an interest for a blocking I/O source.
88108 ///
89109 /// As the OS can always produce spurious wake-ups, it's the callers responsibility to
90110 /// verify the requested I/O interests are really ready and to register again if they're not.
91- pub fn register ( & mut self , source : Box < dyn WithSource > , thread : ThreadId , interests : Interest ) {
111+ ///
112+ /// It's assumed that no interest is already registered for this source with the same reason!
113+ pub fn register (
114+ & mut self ,
115+ source_fd : FileDescriptionRef < dyn WithSource > ,
116+ receiver : InterestReceiver ,
117+ interest : Interest ,
118+ ) {
92119 let poll =
93120 self . poll . as_ref ( ) . expect ( "Blocking I/O should not be called with isolation enabled" ) ;
94121
95- let token = Token ( thread. to_u32 ( ) . to_usize ( ) ) ;
122+ let id = source_fd. id ( ) ;
123+ let token = Token ( id. to_usize ( ) ) ;
124+
125+ let Some ( ( _, current_interests) ) = self . sources . get_mut ( & id) else {
126+ // The source is not yet registered.
127+
128+ // Treat errors from registering as fatal. On UNIX hosts this can only
129+ // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
130+ source_fd
131+ . with_source ( & mut |source| poll. registry ( ) . register ( source, token, interest) )
132+ . unwrap ( ) ;
133+
134+ self . sources . insert ( id, ( source_fd, FxHashMap :: from_iter ( [ ( receiver, interest) ] ) ) ) ;
135+ return ;
136+ } ;
137+
138+ // The source is already registered. We need to check whether we need to
139+ // reregister because the provided interest contains new interests for the source.
96140
97- // Treat errors from registering as fatal. On UNIX hosts this can only
141+ let old_interest =
142+ interest_union ( current_interests) . expect ( "Source should contain at least one interest" ) ;
143+
144+ current_interests
145+ . try_insert ( receiver, interest)
146+ . unwrap_or_else ( |_| panic ! ( "Receiver should be unique" ) ) ;
147+
148+ let new_interest = old_interest. add ( interest) ;
149+
150+ // Reregister the source since the overall interests might have changed.
151+
152+ // Treat errors from reregistering as fatal. On UNIX hosts this can only
98153 // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
99- source
100- . with_source ( & mut |source| source . register ( poll. registry ( ) , token, interests ) )
154+ source_fd
155+ . with_source ( & mut |source| poll. registry ( ) . reregister ( source , token, new_interest ) )
101156 . unwrap ( ) ;
102- self . sources
103- . try_insert ( thread, source)
104- . unwrap_or_else ( |_| panic ! ( "A thread cannot be registered twice at the same time" ) ) ;
105157 }
106158
107- /// Deregister the event source for a thread. Returns the kind of I/O the thread was
108- /// blocked on.
109- fn deregister ( & mut self , thread : ThreadId ) {
159+ /// Deregister an interest from a blocking I/O source.
160+ ///
161+ /// The receiver is assumed to be registered for the provided source!
162+ pub fn deregister ( & mut self , source_id : FdId , receiver : InterestReceiver ) {
110163 let poll =
111164 self . poll . as_ref ( ) . expect ( "Blocking I/O should not be called with isolation enabled" ) ;
112165
113- let Some ( source) = self . sources . remove ( & thread) else {
114- panic ! ( "Attempt to deregister a token which isn't registered" )
166+ let token = Token ( source_id. to_usize ( ) ) ;
167+ let ( fd, current_interests) =
168+ self . sources . get_mut ( & source_id) . expect ( "Source should be registered" ) ;
169+
170+ current_interests
171+ . remove ( & receiver)
172+ . unwrap_or_else ( || panic ! ( "Receiver should be registered for source" ) ) ;
173+
174+ let Some ( new_interest) = interest_union ( current_interests) else {
175+ // There are no longer any interests in this source.
176+ // We can thus deregister the source from the poll.
177+
178+ // Treat errors from deregistering as fatal. On UNIX hosts this can only
179+ // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
180+ fd. with_source ( & mut |source| poll. registry ( ) . deregister ( source) ) . unwrap ( ) ;
181+ self . sources . remove ( & source_id) ;
182+ return ;
115183 } ;
116184
117- // Treat errors from deregistering as fatal. On UNIX hosts this can only
185+ // Reregister the source since the overall interests might have changed.
186+
187+ // Treat errors from reregistering as fatal. On UNIX hosts this can only
118188 // fail due to system resource errors (e.g. ENOMEM or ENOSPC).
119- source. with_source ( & mut |source| source. deregister ( poll. registry ( ) ) ) . unwrap ( ) ;
189+ fd. with_source ( & mut |source| poll. registry ( ) . reregister ( source, token, new_interest) )
190+ . unwrap ( ) ;
120191 }
121192}
122193
194+ /// Get the union of all interests for a source. Returns `None` if the map is empty.
195+ fn interest_union ( interests : & FxHashMap < InterestReceiver , Interest > ) -> Option < Interest > {
196+ interests
197+ . values ( )
198+ . copied ( )
199+ . fold ( None , |acc, interest| acc. map ( |acc : Interest | acc. add ( interest) ) . or ( Some ( interest) ) )
200+ }
201+
123202impl < ' tcx > EvalContextExt < ' tcx > for MiriInterpCx < ' tcx > { }
124203pub trait EvalContextExt < ' tcx > : MiriInterpCxExt < ' tcx > {
125204 /// Block the current thread until some interests on an I/O source
@@ -132,15 +211,15 @@ pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> {
132211 #[ inline]
133212 fn block_thread_for_io (
134213 & mut self ,
135- source : impl WithSource + ' static ,
214+ source_fd : FileDescriptionRef < dyn WithSource > ,
136215 interests : Interest ,
137216 timeout : Option < ( TimeoutClock , TimeoutAnchor , Duration ) > ,
138217 callback : DynUnblockCallback < ' tcx > ,
139218 ) {
140219 let this = self . eval_context_mut ( ) ;
141220 this. machine . blocking_io . register (
142- Box :: new ( source ) ,
143- this. machine . threads . active_thread ( ) ,
221+ source_fd ,
222+ InterestReceiver :: UnblockThread ( this. machine . threads . active_thread ( ) ) ,
144223 interests,
145224 ) ;
146225 this. block_thread ( BlockReason :: IO , timeout, callback) ;
0 commit comments