@@ -156,16 +156,13 @@ enum OutboundReqOutput {
156156#[ timeout = u64:: MAX ]
157157async fn outbound_req ( ctx : & ActivityCtx , input : & OutboundReqInput ) -> Result < OutboundReqOutput > {
158158 let mut term_signal = TermSignal :: get ( ) ;
159- let mut drain_sub = ctx
160- . subscribe :: < Drain > ( ( "workflow_id" , ctx. workflow_id ( ) ) )
161- . await ?;
162159
163160 loop {
164161 metrics:: SERVERLESS_OUTBOUND_REQ_ACTIVE
165162 . with_label_values ( & [ & input. namespace_id . to_string ( ) , & input. runner_name ] )
166163 . inc ( ) ;
167164
168- let res = outbound_req_inner ( ctx, input, & mut term_signal, & mut drain_sub ) . await ;
165+ let res = outbound_req_inner ( ctx, input, & mut term_signal) . await ;
169166
170167 metrics:: SERVERLESS_OUTBOUND_REQ_ACTIVE
171168 . with_label_values ( & [ & input. namespace_id . to_string ( ) , & input. runner_name ] )
@@ -203,7 +200,6 @@ async fn outbound_req_inner(
203200 ctx : & ActivityCtx ,
204201 input : & OutboundReqInput ,
205202 term_signal : & mut TermSignal ,
206- drain_sub : & mut message:: SubscriptionHandle < Drain > ,
207203) -> Result < OutboundReqOutput > {
208204 if is_runner_draining ( ctx, input. receiver_wf_id ) . await ? {
209205 return Ok ( OutboundReqOutput :: Draining { drain_sent : false } ) ;
@@ -438,7 +434,6 @@ async fn outbound_req_inner(
438434 }
439435 } ,
440436 _ = tokio:: time:: sleep( sleep_until_drain) => { }
441- _ = drain_sub. next( ) => { }
442437 _ = term_signal. recv( ) => { }
443438 } ;
444439
0 commit comments