@@ -117,17 +117,15 @@ impl core::ops::Deref for PktBuf {
117117pub struct PortTop {
118118 /// Forwarded packet queue.
119119 channel : FixedChannel < PortRawMutex , PktBuf , { config:: PORT_TXQUEUE } > ,
120- // Callers should hold send_mutex when using channel.sender().
121- // send_message() will wait on send_mutex being available using sender_waker.
122- send_mutex : BlockingMutex < ( ) > ,
120+ /// Only a single Sender can be created from a FixedChannel at a time.
121+ /// sender_waker wakes futures waiting for a Sender.
123122 sender_waker : AtomicWaker ,
124123}
125124
126125impl PortTop {
127126 pub fn new ( ) -> Self {
128127 Self {
129128 channel : FixedChannel :: new ( ) ,
130- send_mutex : BlockingMutex :: new ( ( ) ) ,
131129 sender_waker : AtomicWaker :: new ( ) ,
132130 }
133131 }
@@ -145,32 +143,34 @@ impl PortTop {
145143 /// Do not call with locks held.
146144 /// May block waiting for a port queue to flush.
147145 /// Packet must be a valid MCTP packet, may panic otherwise.
148- fn forward_packet ( & self , pkt : & [ u8 ] ) -> Result < ( ) > {
146+ async fn forward_packet ( & self , pkt : & [ u8 ] ) -> Result < ( ) > {
149147 debug_assert ! ( MctpHeader :: decode( pkt) . is_ok( ) ) ;
150148
149+ let mut sender = poll_fn ( |cx| match self . channel . sender ( ) {
150+ Some ( s) => Poll :: Ready ( WakeOnDrop :: new ( s, cx. waker ( ) ) ) ,
151+ None => {
152+ self . sender_waker . register ( cx. waker ( ) ) ;
153+ Poll :: Pending
154+ }
155+ } )
156+ . await ;
157+
158+ // Get a slot to send
151159 // With forwarded packets we don't want to block if
152160 // the queue is full (we drop packets instead).
153- let r = self . send_mutex . lock ( |_| {
154- // OK unwrap, we have the send_mutex
155- let mut sender = self . channel . sender ( ) . unwrap ( ) ;
156-
157- // Get a slot to send
158- let slot = sender. try_send ( ) . ok_or_else ( || {
159- debug ! ( "Dropped forward packet" ) ;
160- Error :: TxFailure
161- } ) ?;
162-
163- // Fill the buffer
164- if slot. set ( pkt) . is_ok ( ) {
165- sender. send_done ( ) ;
166- Ok ( ( ) )
167- } else {
168- debug ! ( "Oversized forward packet" ) ;
169- Err ( Error :: TxFailure )
170- }
171- } ) ;
172- self . sender_waker . wake ( ) ;
173- r
161+ let slot = sender. try_send ( ) . ok_or_else ( || {
162+ debug ! ( "Dropped forward packet" ) ;
163+ Error :: TxFailure
164+ } ) ?;
165+
166+ // Fill the buffer
167+ if slot. set ( pkt) . is_ok ( ) {
168+ sender. send_done ( ) ;
169+ Ok ( ( ) )
170+ } else {
171+ debug ! ( "Oversized forward packet" ) ;
172+ Err ( Error :: TxFailure )
173+ }
174174 }
175175
176176 /// Fragments and enqueues a message.
@@ -187,39 +187,41 @@ impl PortTop {
187187 // It shouldn't hold the send_mutex() across an await, since that would block
188188 // forward_packet().
189189 poll_fn ( |cx| {
190- self . send_mutex . lock ( |_| {
191- // OK to unwrap, protected by send_mutex.lock()
192- let mut sender = self . channel . sender ( ) . unwrap ( ) ;
193-
194- // Send as much as we can in a loop without blocking.
195- // If it blocks the next poll_fn iteration will continue
196- // where it left off.
197- loop {
198- let Poll :: Ready ( qpkt) = sender. poll_send ( cx) else {
199- self . sender_waker . register ( cx. waker ( ) ) ;
200- break Poll :: Pending ;
201- } ;
202-
203- qpkt. len = 0 ;
204- match fragmenter. fragment_vectored ( pkt, & mut qpkt. data ) {
205- SendOutput :: Packet ( p) => {
206- qpkt. len = p. len ( ) ;
207- sender. send_done ( ) ;
208- if fragmenter. is_done ( ) {
209- // Break here rather than using SendOutput::Complete,
210- // since we don't want to call channel.sender() an extra time.
211- break Poll :: Ready ( Ok ( fragmenter. tag ( ) ) ) ;
212- }
213- }
214- SendOutput :: Error { err, .. } => {
215- debug ! ( "Error packetising" ) ;
216- debug_assert ! ( false , "fragment () shouldn't fail" ) ;
217- break Poll :: Ready ( Err ( err) ) ;
190+ let mut sender = match self . channel . sender ( ) {
191+ Some ( s) => WakeOnDrop :: new ( s, cx. waker ( ) ) ,
192+ None => {
193+ self . sender_waker . register ( cx. waker ( ) ) ;
194+ return Poll :: Pending ;
195+ }
196+ } ;
197+
198+ // Send as much as we can in a loop without blocking.
199+ // If it blocks the next poll_fn iteration will continue
200+ // where it left off.
201+ loop {
202+ let Poll :: Ready ( qpkt) = sender. poll_send ( cx) else {
203+ break Poll :: Pending ;
204+ } ;
205+
206+ qpkt. len = 0 ;
207+ match fragmenter. fragment_vectored ( pkt, & mut qpkt. data ) {
208+ SendOutput :: Packet ( p) => {
209+ qpkt. len = p. len ( ) ;
210+ sender. send_done ( ) ;
211+ if fragmenter. is_done ( ) {
212+ // Break here rather than using SendOutput::Complete,
213+ // since we don't want to call channel.sender() an extra time.
214+ break Poll :: Ready ( Ok ( fragmenter. tag ( ) ) ) ;
218215 }
219- SendOutput :: Complete { .. } => unreachable ! ( ) ,
220216 }
217+ SendOutput :: Error { err, .. } => {
218+ debug ! ( "Error packetising" ) ;
219+ debug_assert ! ( false , "fragment () shouldn't fail" ) ;
220+ break Poll :: Ready ( Err ( err) ) ;
221+ }
222+ SendOutput :: Complete { .. } => unreachable ! ( ) ,
221223 }
222- } )
224+ }
223225 } )
224226 . await
225227 }
@@ -578,7 +580,7 @@ impl<'r> Router<'r> {
578580 return ret_src;
579581 } ;
580582
581- let _ = top. forward_packet ( pkt) ;
583+ let _ = top. forward_packet ( pkt) . await ;
582584 ret_src
583585 }
584586
0 commit comments