@@ -2008,6 +2008,9 @@ impl<Counterpart: Role> ConnectionTo<Counterpart> {
20082008 match self . message_tx . unbounded_send ( message) {
20092009 Ok ( ( ) ) => ( ) ,
20102010 Err ( error) => {
2011+ #[ cfg( feature = "unstable_cancel_request" ) ]
2012+ cancellation. disarm ( ) ;
2013+
20112014 let OutgoingMessage :: Request {
20122015 method,
20132016 response_tx,
@@ -2030,6 +2033,9 @@ impl<Counterpart: Role> ConnectionTo<Counterpart> {
20302033 }
20312034
20322035 Err ( err) => {
2036+ #[ cfg( feature = "unstable_cancel_request" ) ]
2037+ cancellation. disarm ( ) ;
2038+
20332039 response_tx
20342040 . send ( ResponsePayload {
20352041 result : Err ( crate :: util:: internal_error ( format ! (
@@ -2442,26 +2448,35 @@ impl ResponseRouter<serde_json::Value> {
24422448 /// Create a new response context for routing a response to a local awaiter.
24432449 ///
24442450 /// When `respond_with_result` is called, the response is sent through the oneshot
2445- /// channel to the code that originally sent the request.
2451+ /// channel to the code that originally sent the request. If that receiver was
2452+ /// dropped, the response is discarded because there is no local awaiter left.
24462453 pub ( crate ) fn new (
24472454 method : String ,
24482455 id : jsonrpcmsg:: Id ,
24492456 role_id : RoleId ,
24502457 sender : oneshot:: Sender < ResponsePayload > ,
24512458 ) -> Self {
2459+ let response_method = method. clone ( ) ;
2460+ let response_id = id. clone ( ) ;
24522461 Self {
24532462 method,
24542463 id,
24552464 role_id,
24562465 send_fn : Box :: new ( move |response : Result < serde_json:: Value , crate :: Error > | {
2457- sender
2466+ if sender
24582467 . send ( ResponsePayload {
24592468 result : response,
24602469 ack_tx : None ,
24612470 } )
2462- . map_err ( |_| {
2463- crate :: util:: internal_error ( "failed to send response, receiver dropped" )
2464- } )
2471+ . is_err ( )
2472+ {
2473+ tracing:: debug!(
2474+ method = %response_method,
2475+ id = ?response_id,
2476+ "dropped response because local receiver was gone"
2477+ ) ;
2478+ }
2479+ Ok ( ( ) )
24652480 } ) ,
24662481 }
24672482 }
@@ -3174,9 +3189,11 @@ enum SentRequestCancellation {
31743189 Send {
31753190 message_tx : OutgoingMessageTx ,
31763191 notification : UntypedMessage ,
3192+ armed : Arc < AtomicBool > ,
31773193 } ,
31783194 Failed {
31793195 error : String ,
3196+ armed : Arc < AtomicBool > ,
31803197 } ,
31813198}
31823199
@@ -3199,8 +3216,20 @@ impl SentRequestCancellation {
31993216 Ok ( notification) => Self :: Send {
32003217 message_tx,
32013218 notification,
3219+ armed : Arc :: new ( AtomicBool :: new ( true ) ) ,
32023220 } ,
3203- Err ( error) => Self :: Failed { error } ,
3221+ Err ( error) => Self :: Failed {
3222+ error,
3223+ armed : Arc :: new ( AtomicBool :: new ( true ) ) ,
3224+ } ,
3225+ }
3226+ }
3227+
3228+ fn disarm ( & self ) {
3229+ match self {
3230+ Self :: Send { armed, .. } | Self :: Failed { armed, .. } => {
3231+ armed. store ( false , Ordering :: Release ) ;
3232+ }
32043233 }
32053234 }
32063235
@@ -3209,15 +3238,37 @@ impl SentRequestCancellation {
32093238 Self :: Send {
32103239 message_tx,
32113240 notification,
3212- } => send_raw_message (
3213- message_tx,
3214- OutgoingMessage :: Notification {
3215- untyped : notification. clone ( ) ,
3216- } ,
3217- ) ,
3218- Self :: Failed { error } => Err ( crate :: util:: internal_error ( format ! (
3219- "failed to create cancel request notification: {error}"
3220- ) ) ) ,
3241+ armed,
3242+ } => {
3243+ if !armed. swap ( false , Ordering :: AcqRel ) {
3244+ return Ok ( ( ) ) ;
3245+ }
3246+
3247+ send_raw_message (
3248+ message_tx,
3249+ OutgoingMessage :: Notification {
3250+ untyped : notification. clone ( ) ,
3251+ } ,
3252+ )
3253+ }
3254+ Self :: Failed { error, armed } => {
3255+ if !armed. swap ( false , Ordering :: AcqRel ) {
3256+ return Ok ( ( ) ) ;
3257+ }
3258+
3259+ Err ( crate :: util:: internal_error ( format ! (
3260+ "failed to create cancel request notification: {error}"
3261+ ) ) )
3262+ }
3263+ }
3264+ }
3265+ }
3266+
3267+ #[ cfg( feature = "unstable_cancel_request" ) ]
3268+ impl Drop for SentRequestCancellation {
3269+ fn drop ( & mut self ) {
3270+ if let Err ( error) = self . send ( ) {
3271+ tracing:: debug!( ?error, "failed to auto-cancel dropped request" ) ;
32213272 }
32223273 }
32233274}
@@ -3226,13 +3277,19 @@ impl SentRequestCancellation {
32263277impl Debug for SentRequestCancellation {
32273278 fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
32283279 match self {
3229- Self :: Send { notification, .. } => f
3280+ Self :: Send {
3281+ notification,
3282+ armed,
3283+ ..
3284+ } => f
32303285 . debug_struct ( "SentRequestCancellation" )
32313286 . field ( "notification" , notification)
3287+ . field ( "armed" , & armed. load ( Ordering :: Acquire ) )
32323288 . finish ( ) ,
3233- Self :: Failed { error } => f
3289+ Self :: Failed { error, armed } => f
32343290 . debug_struct ( "SentRequestCancellation" )
32353291 . field ( "error" , error)
3292+ . field ( "armed" , & armed. load ( Ordering :: Acquire ) )
32363293 . finish ( ) ,
32373294 }
32383295 }
@@ -3410,6 +3467,8 @@ impl<T: JsonRpcResponse> SentRequest<T> {
34103467 }
34113468 } ;
34123469
3470+ downstream_cancellation. disarm ( ) ;
3471+
34133472 let ResponsePayload { result, ack_tx } = response. map_err ( |err| {
34143473 crate :: util:: internal_error ( format ! ( "response to `{method}` never received: {err}" ) )
34153474 } ) ?;
@@ -3501,6 +3560,9 @@ impl<T: JsonRpcResponse> SentRequest<T> {
35013560 result : Ok ( json_value) ,
35023561 ack_tx,
35033562 } ) => {
3563+ #[ cfg( feature = "unstable_cancel_request" ) ]
3564+ self . cancellation . disarm ( ) ;
3565+
35043566 // Ack immediately - we're in a spawned task, so the dispatch loop
35053567 // can continue while we process the value.
35063568 if let Some ( tx) = ack_tx {
@@ -3515,15 +3577,23 @@ impl<T: JsonRpcResponse> SentRequest<T> {
35153577 result : Err ( err) ,
35163578 ack_tx,
35173579 } ) => {
3580+ #[ cfg( feature = "unstable_cancel_request" ) ]
3581+ self . cancellation . disarm ( ) ;
3582+
35183583 if let Some ( tx) = ack_tx {
35193584 let _ = tx. send ( ( ) ) ;
35203585 }
35213586 Err ( err)
35223587 }
3523- Err ( err) => Err ( crate :: util:: internal_error ( format ! (
3524- "response to `{}` never received: {}" ,
3525- self . method, err
3526- ) ) ) ,
3588+ Err ( err) => {
3589+ #[ cfg( feature = "unstable_cancel_request" ) ]
3590+ self . cancellation . disarm ( ) ;
3591+
3592+ Err ( crate :: util:: internal_error ( format ! (
3593+ "response to `{}` never received: {}" ,
3594+ self . method, err
3595+ ) ) )
3596+ }
35273597 }
35283598 }
35293599
@@ -3673,11 +3743,16 @@ impl<T: JsonRpcResponse> SentRequest<T> {
36733743 let method = self . method ;
36743744 let response_rx = self . response_rx ;
36753745 let to_result = self . to_result ;
3746+ #[ cfg( feature = "unstable_cancel_request" ) ]
3747+ let cancellation = self . cancellation ;
36763748 let location = Location :: caller ( ) ;
36773749
36783750 Task :: new ( location, async move {
36793751 match response_rx. await {
36803752 Ok ( ResponsePayload { result, ack_tx } ) => {
3753+ #[ cfg( feature = "unstable_cancel_request" ) ]
3754+ cancellation. disarm ( ) ;
3755+
36813756 // Convert the result using to_result for Ok values
36823757 let typed_result = match result {
36833758 Ok ( json_value) => to_result ( json_value) ,
@@ -3695,9 +3770,14 @@ impl<T: JsonRpcResponse> SentRequest<T> {
36953770
36963771 outcome
36973772 }
3698- Err ( err) => Err ( crate :: util:: internal_error ( format ! (
3699- "response to `{method}` never received: {err}"
3700- ) ) ) ,
3773+ Err ( err) => {
3774+ #[ cfg( feature = "unstable_cancel_request" ) ]
3775+ cancellation. disarm ( ) ;
3776+
3777+ Err ( crate :: util:: internal_error ( format ! (
3778+ "response to `{method}` never received: {err}"
3779+ ) ) )
3780+ }
37013781 }
37023782 } )
37033783 . spawn ( & task_tx)
0 commit comments