@@ -83,6 +83,18 @@ impl Worker {
8383 loop {
8484 tokio:: select! {
8585 _ = tick_interval. tick( ) => { } ,
86+ res = wake_sub. next( ) => {
87+ if res. is_none( ) {
88+ // Cancel background tasks
89+ gc_handle. abort( ) ;
90+ metrics_handle. abort( ) ;
91+
92+ return Err ( WorkflowError :: SubscriptionUnsubscribed . into( ) ) ;
93+ }
94+
95+ tick_interval. reset( ) ;
96+ } ,
97+
8698 res = & mut gc_handle => {
8799 tracing:: error!( ?res, "metrics task unexpectedly stopped" ) ;
88100 break ;
@@ -91,18 +103,17 @@ impl Worker {
91103 tracing:: error!( ?res, "metrics task unexpectedly stopped" ) ;
92104 break ;
93105 } ,
94- res = wake_sub. next( ) => {
95- if res. is_none( ) {
96- return Err ( WorkflowError :: SubscriptionUnsubscribed . into( ) ) ;
97- }
98-
99- tick_interval. reset( ) ;
100- } ,
101106 _ = ctrl_c( ) => break ,
102107 _ = sigterm. recv( ) => break ,
103108 }
104109
105- self . tick ( & shared_client, & config, & pools, & cache) . await ?;
110+ if let Err ( err) = self . tick ( & shared_client, & config, & pools, & cache) . await {
111+ // Cancel background tasks
112+ gc_handle. abort ( ) ;
113+ metrics_handle. abort ( ) ;
114+
115+ return Err ( err) ;
116+ }
106117 }
107118
108119 self . shutdown ( sigterm) . await ;
@@ -181,6 +192,7 @@ impl Worker {
181192
182193 tracing:: info!( "shutdown complete" ) ;
183194
195+ // This will stop all background tasks because the tokio runtime will stop
184196 rivet_runtime:: shutdown ( ) . await ;
185197 }
186198
0 commit comments