11// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
22// SPDX-License-Identifier: Apache-2.0
33
4- use futures:: { ready , Future , Stream } ;
4+ use futures:: { Future , Stream } ;
55use std:: fmt:: Debug ;
66use std:: {
77 pin:: Pin ,
88 task:: { Context , Poll } ,
99} ;
1010use tarpc:: server:: { Channel , InFlightRequest , Requests , Serve } ;
11+ use tokio:: sync:: mpsc:: error:: SendError ;
12+ use tokio:: sync:: mpsc:: OwnedPermit ;
1113
1214#[ allow( type_alias_bounds) ]
1315type Request < S , C : Channel > = ( S , InFlightRequest < C :: Req , C :: Resp > ) ;
1416
17+ type PendingPermit < S , C > = Pin <
18+ Box < dyn Future < Output = Result < OwnedPermit < Request < S , C > > , SendError < ( ) > > > + Send + ' static > ,
19+ > ;
20+
1521/// Replaces tarpc::server::Channel::execute which spawns one task per message with an executor
1622/// that spawns a single worker and queues requests for this task.
1723///
18- /// If the queue is full, the requests is dropped and will be cancelled by tarpc.
24+ /// If the queue is full, the request is dropped and will be cancelled by tarpc unless
25+ /// `with_backpressure` is configured for that request type.
1926pub fn execute_sequential < C , S > (
2027 reqs : Requests < C > ,
2128 serve : S ,
4350 inner : reqs,
4451 serve,
4552 tx,
53+ backpressure : |_| false ,
54+ pending : None ,
4655 }
4756}
4857
5564 inner : Requests < C > ,
5665 serve : S ,
5766 tx : tokio:: sync:: mpsc:: Sender < Request < S , C > > ,
67+ /// Returns true for requests that must not be dropped when the queue is full.
68+ /// The executor will pause reading new requests and wait for channel space instead.
69+ backpressure : fn ( & C :: Req ) -> bool ,
70+ /// Pending channel-space reservation for a backpressure request.
71+ pending : Option < ( PendingPermit < S , C > , Request < S , C > ) > ,
5872}
5973
6074impl < C , S > Future for SequentialExecutor < C , S >
@@ -68,21 +82,50 @@ where
6882 type Output = anyhow:: Result < ( ) > ;
6983
7084 fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
71- while let Some ( response_handler) = ready ! ( self . as_mut( ) . project( ) . inner. poll_next( cx) ) {
72- match response_handler {
73- Ok ( resp) => {
74- let server = self . serve . clone ( ) ;
75- if let Err ( _err) = self . as_ref ( ) . tx . try_send ( ( server, resp) ) {
76- // TODO: should we log something in case we drop the request on the floor?
85+ loop {
86+ // First flush any pending backpressure send before reading new requests.
87+ {
88+ let this = self . as_mut ( ) . project ( ) ;
89+ if let Some ( ( fut, _) ) = this. pending . as_mut ( ) {
90+ match fut. as_mut ( ) . poll ( cx) {
91+ Poll :: Pending => return Poll :: Pending ,
92+ Poll :: Ready ( Err ( _) ) => return Poll :: Ready ( Ok ( ( ) ) ) , // worker dropped
93+ Poll :: Ready ( Ok ( permit) ) => {
94+ #[ allow( clippy:: unwrap_used) ] // we've just checked this
95+ let ( _, item) = this. pending . take ( ) . unwrap ( ) ;
96+ permit. send ( item) ;
97+ // fall through to poll next request
98+ }
7799 }
78100 }
79- Err ( e) => {
80- // TODO: should we log something in case we drop the request on the floor?
81- return Poll :: Ready ( Err ( e. into ( ) ) ) ;
101+ }
102+
103+ // Read the next request off the transport.
104+ match self . as_mut ( ) . project ( ) . inner . poll_next ( cx) {
105+ Poll :: Ready ( Some ( Ok ( resp) ) ) => {
106+ let backpressured = ( self . backpressure ) ( & resp. get ( ) . message ) ;
107+ match self . as_ref ( ) . tx . try_send ( ( self . serve . clone ( ) , resp) ) {
108+ Ok ( ( ) ) => { } // loop to pick up the next request
109+ Err ( err) => {
110+ let ( _, resp) = err. into_inner ( ) ;
111+ if backpressured {
112+ let fut = Box :: pin ( self . as_ref ( ) . tx . clone ( ) . reserve_owned ( ) ) ;
113+ * self . as_mut ( ) . project ( ) . pending =
114+ Some ( ( fut, ( self . serve . clone ( ) , resp) ) ) ;
115+ } else {
116+ tracing:: warn!(
117+ "Dropping {:?}: sequential executor queue is full" ,
118+ resp. get( ) . message
119+ ) ;
120+ }
121+ }
122+ }
82123 }
124+ Poll :: Ready ( Some ( Err ( e) ) ) => return Poll :: Ready ( Err ( e. into ( ) ) ) ,
125+ Poll :: Ready ( None ) => return Poll :: Ready ( Ok ( ( ) ) ) ,
126+ Poll :: Pending => return Poll :: Pending ,
83127 }
84128 }
85- Poll :: Ready ( Ok ( ( ) ) )
86129 }
87130}
88131
@@ -97,4 +140,11 @@ where
97140 std:: mem:: swap ( & mut self . tx , & mut sender) ;
98141 sender
99142 }
143+
144+ /// Configures a predicate that identifies requests which must not be dropped when the queue
145+ /// is full. For those requests the executor will pause reading and wait for channel space.
146+ pub fn with_backpressure ( mut self , backpressure : fn ( & C :: Req ) -> bool ) -> Self {
147+ self . backpressure = backpressure;
148+ self
149+ }
100150}
0 commit comments