@@ -24,6 +24,7 @@ use fuel_core_services::{
2424 TaskNextAction ,
2525 stream:: IntoBoxStream ,
2626} ;
27+ use futures:: stream;
2728use fuel_core_types:: {
2829 fuel_tx:: AssetId ,
2930 fuel_types:: BlockHeight ,
@@ -218,7 +219,7 @@ impl RunnableTask for Task {
218219 next_height,
219220 event. header. height( )
220221 ) ;
221- match self . connect_block_stream ( ) . await {
222+ match self . reconnect ( ) . await {
222223 Ok ( _) => return TaskNextAction :: Continue ,
223224 Err ( e) => {
224225 tracing:: error!( "Failed to reconnect block stream: {e}" ) ;
@@ -238,7 +239,7 @@ impl RunnableTask for Task {
238239 }
239240 Some ( Err ( e) ) => {
240241 tracing:: error!( "Error receiving block event: {e}; reconnecting stream" ) ;
241- match self . connect_block_stream ( ) . await {
242+ match self . reconnect ( ) . await {
242243 Ok ( _) => TaskNextAction :: Continue ,
243244 Err ( e) => {
244245 tracing:: error!( "Failed to reconnect block stream: {e}" ) ;
@@ -248,7 +249,7 @@ impl RunnableTask for Task {
248249 }
249250 None => {
250251 tracing:: warn!( "Block event stream ended unexpectedly" ) ;
251- match self . connect_block_stream ( ) . await {
252+ match self . reconnect ( ) . await {
252253 Ok ( _) => TaskNextAction :: Continue ,
253254 Err ( e) => {
254255 tracing:: error!( "Failed to reconnect block stream: {e}" ) ;
@@ -267,10 +268,22 @@ impl RunnableTask for Task {
267268}
268269
269270impl Task {
270- async fn connect_block_stream ( & mut self ) -> anyhow:: Result < ( ) > {
271- // Reset the buffer when reconnecting
272- self . buffer . reset ( ) ?;
271+ /// Disconnects the block stream to stop the external library's background fetching.
272+ ///
273+ /// The external library spawns background tasks that continue fetching blocks
274+ /// even when we're not calling .next() on the stream. By replacing the stream
275+ /// with a pending stream, we drop the old stream and cause those background
276+ /// tasks to terminate (they'll fail when trying to send to closed channels).
277+ fn disconnect_stream ( & mut self ) {
278+ // Replace with a stream that never yields, dropping the old stream.
279+ // This terminates the external library's background tasks.
280+ self . blocks_stream = TrackedStream :: new ( stream:: pending ( ) . into_boxed ( ) ) ;
281+ tracing:: debug!( "Disconnected block stream" ) ;
282+ }
273283
284+ /// Connects to the block stream starting from the current height + 1.
285+ /// Does NOT reset the buffer - call reset separately if needed.
286+ async fn connect_block_stream ( & mut self ) -> anyhow:: Result < ( ) > {
274287 let height = * self . height . borrow ( ) ;
275288 let next_height = height. succ ( ) . ok_or_else ( || {
276289 anyhow:: anyhow!( "Block height overflowed when connecting block stream" )
@@ -286,7 +299,7 @@ impl Task {
286299 // Drop actually fires at the end of this scope. If the counter
287300 // doesn't decrement, something is holding the fetcher alive.
288301 let fetcher = TrackedFetcher :: new ( ( self . fetcher_factory ) ( ) ) ;
289- tracing:: debug!( "Created new GraphqlFetcher for stream connection" ) ;
302+ tracing:: debug!( "Created new GraphqlFetcher for stream connection at height {}" , * next_height ) ;
290303
291304 let stream = fetcher
292305 . blocks_stream_starting_from ( next_height)
@@ -309,6 +322,13 @@ impl Task {
309322 Ok ( ( ) )
310323 }
311324
325+ /// Reconnects the block stream and resets the buffer.
326+ /// Use this when recovering from errors or after upload failures.
327+ async fn reconnect ( & mut self ) -> anyhow:: Result < ( ) > {
328+ self . buffer . reset ( ) ?;
329+ self . connect_block_stream ( ) . await
330+ }
331+
312332 /// Converts a block event to domain types and adds to the buffer
313333 fn append_event_to_buffer ( & mut self , event : & BlockEvent ) -> anyhow:: Result < ( ) > {
314334 let block = Block :: new (
@@ -343,6 +363,12 @@ impl Task {
343363 return Ok ( ( ) )
344364 }
345365
366+ // Disconnect stream BEFORE finalize/upload to stop the external library
367+ // from fetching more blocks. The library spawns background tasks that
368+ // continue buffering data even when we're not reading from the stream.
369+ // By disconnecting, we ensure no memory accumulates during upload.
370+ self . disconnect_stream ( ) ;
371+
346372 // Finalize the buffer and get the file paths for upload.
347373 // Note: finalize() consumes the writers and FinalizedBatchFiles owns the temp files.
348374 let finalized = self
@@ -364,7 +390,7 @@ impl Task {
364390 // the service reconnects and re-buffers blocks from the last saved height.
365391 self . buffer . reset ( ) ?;
366392
367- // Upload the Avro files to storage
393+ // Upload the Avro files to storage (no background fetching during this)
368394 process_finalized_batch ( & self . processor , finalized)
369395 . await
370396 . map_err ( |err| {
@@ -377,6 +403,9 @@ impl Task {
377403 self . processor . save_latest_height ( last_height) . await ?;
378404 self . height . send_replace ( last_height) ;
379405
406+ // Reconnect to resume fetching from the new height
407+ self . connect_block_stream ( ) . await ?;
408+
380409 Ok ( ( ) )
381410 }
382411}
0 commit comments