@@ -9,6 +9,7 @@ use crate::{
99 substreams,
1010} ;
1111
12+ use anyhow:: bail;
1213use futures03:: StreamExt ;
1314use http:: uri:: { Scheme , Uri } ;
1415use slog:: Logger ;
@@ -22,17 +23,28 @@ use tonic::{
2223
2324use super :: codec as firehose;
2425
25- const SUBGRAPHS_PER_CONN : usize = 100 ;
26+ /// This is constant because we found this magic number of connections after
27+ /// which the grpc connections start to hang.
28+ /// For more details see: https://github.com/graphprotocol/graph-node/issues/3879
29+ pub const SUBGRAPHS_PER_CONN : usize = 100 ;
2630
2731#[ derive( Clone , Debug ) ]
2832pub struct FirehoseEndpoint {
2933 pub provider : String ,
3034 pub token : Option < String > ,
3135 pub filters_enabled : bool ,
3236 pub compression_enabled : bool ,
37+ pub subgraph_limit : usize ,
3338 channel : Channel ,
3439}
3540
41+ #[ derive( Clone , Debug ) ]
42+ pub enum SubgraphLimit {
43+ Unlimited ,
44+ Limit ( usize ) ,
45+ NoTraffic ,
46+ }
47+
3648impl Display for FirehoseEndpoint {
3749 fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
3850 Display :: fmt ( self . provider . as_str ( ) , f)
@@ -46,6 +58,7 @@ impl FirehoseEndpoint {
4658 token : Option < String > ,
4759 filters_enabled : bool ,
4860 compression_enabled : bool ,
61+ subgraph_limit : SubgraphLimit ,
4962 ) -> Self {
5063 let uri = url
5164 . as_ref ( )
@@ -78,15 +91,31 @@ impl FirehoseEndpoint {
7891 // Timeout on each request, so the timeout to estabilish each 'Blocks' stream.
7992 . timeout ( Duration :: from_secs ( 120 ) ) ;
8093
94+ let subgraph_limit = match subgraph_limit {
95+ // See the comment on the constant
96+ SubgraphLimit :: Unlimited => SUBGRAPHS_PER_CONN ,
97+ // This is checked when parsing from config but doesn't hurt to be defensive.
98+ SubgraphLimit :: Limit ( limit) => limit. min ( SUBGRAPHS_PER_CONN ) ,
99+ SubgraphLimit :: NoTraffic => 0 ,
100+ } ;
101+
81102 FirehoseEndpoint {
82103 provider : provider. as_ref ( ) . to_string ( ) ,
83104 channel : endpoint. connect_lazy ( ) ,
84105 token,
85106 filters_enabled,
86107 compression_enabled,
108+ subgraph_limit,
87109 }
88110 }
89111
112+ // The SUBGRAPHS_PER_CONN upper bound was already limited so we leave it the same
113+ // we need to use inclusive limits (<=) because there will always be a reference
114+ // inside FirehoseEndpoints that is not used (is always cloned).
115+ pub fn has_subgraph_capacity ( self : & Arc < Self > ) -> bool {
116+ Arc :: strong_count ( & self ) <= self . subgraph_limit
117+ }
118+
90119 pub async fn get_block < M > (
91120 & self ,
92121 cursor : FirehoseCursor ,
@@ -327,8 +356,8 @@ impl FirehoseEndpoints {
327356 . iter ( )
328357 . min_by_key ( |x| Arc :: strong_count ( x) )
329358 . ok_or ( anyhow ! ( "no available firehose endpoints" ) ) ?;
330- if Arc :: strong_count ( endpoint) > SUBGRAPHS_PER_CONN {
331- return Err ( anyhow ! ( "all connections saturated with {} connections, increase the firehose conn_pool_size" , SUBGRAPHS_PER_CONN ) ) ;
359+ if ! endpoint. has_subgraph_capacity ( ) {
360+ bail ! ( "all connections saturated with {} connections, increase the firehose conn_pool_size or limit for the node " , SUBGRAPHS_PER_CONN ) ;
332361 }
333362
334363 // Cloning here ensure we have the correct count at any given time, if we return a reference it can be cloned later
@@ -396,22 +425,22 @@ impl FirehoseNetworks {
396425
397426#[ cfg( test) ]
398427mod test {
399- use std:: { mem, str :: FromStr , sync:: Arc } ;
428+ use std:: { mem, sync:: Arc } ;
400429
401- use http:: Uri ;
402- use tonic:: transport:: Channel ;
430+ use crate :: firehose:: SubgraphLimit ;
403431
404432 use super :: { FirehoseEndpoint , FirehoseEndpoints , SUBGRAPHS_PER_CONN } ;
405433
406434 #[ tokio:: test]
407435 async fn firehose_endpoint_errors ( ) {
408- let endpoint = vec ! [ Arc :: new( FirehoseEndpoint {
409- provider: String :: new( ) ,
410- token: None ,
411- filters_enabled: true ,
412- compression_enabled: true ,
413- channel: Channel :: builder( Uri :: from_str( "http://127.0.0.1" ) . unwrap( ) ) . connect_lazy( ) ,
414- } ) ] ;
436+ let endpoint = vec ! [ Arc :: new( FirehoseEndpoint :: new(
437+ String :: new( ) ,
438+ "http://127.0.0.1" . to_string( ) ,
439+ None ,
440+ false ,
441+ false ,
442+ SubgraphLimit :: Unlimited ,
443+ ) ) ] ;
415444
416445 let mut endpoints = FirehoseEndpoints :: from ( endpoint) ;
417446
@@ -432,4 +461,58 @@ mod test {
432461 let err = endpoints. random ( ) . unwrap_err ( ) ;
433462 assert ! ( err. to_string( ) . contains( "no available firehose endpoints" ) ) ;
434463 }
464+
465+ #[ tokio:: test]
466+ async fn firehose_endpoint_with_limit ( ) {
467+ let endpoint = vec ! [ Arc :: new( FirehoseEndpoint :: new(
468+ String :: new( ) ,
469+ "http://127.0.0.1" . to_string( ) ,
470+ None ,
471+ false ,
472+ false ,
473+ SubgraphLimit :: Limit ( 2 ) ,
474+ ) ) ] ;
475+
476+ let mut endpoints = FirehoseEndpoints :: from ( endpoint) ;
477+
478+ let mut keep = vec ! [ ] ;
479+ for _ in 0 ..2 {
480+ keep. push ( endpoints. random ( ) . unwrap ( ) ) ;
481+ }
482+
483+ let err = endpoints. random ( ) . unwrap_err ( ) ;
484+ assert ! ( err. to_string( ) . contains( "conn_pool_size" ) ) ;
485+
486+ mem:: drop ( keep) ;
487+ endpoints. random ( ) . unwrap ( ) ;
488+
489+ // Fails when empty too
490+ endpoints. remove ( "" ) ;
491+
492+ let err = endpoints. random ( ) . unwrap_err ( ) ;
493+ assert ! ( err. to_string( ) . contains( "no available firehose endpoints" ) ) ;
494+ }
495+
496+ #[ tokio:: test]
497+ async fn firehose_endpoint_no_traffic ( ) {
498+ let endpoint = vec ! [ Arc :: new( FirehoseEndpoint :: new(
499+ String :: new( ) ,
500+ "http://127.0.0.1" . to_string( ) ,
501+ None ,
502+ false ,
503+ false ,
504+ SubgraphLimit :: NoTraffic ,
505+ ) ) ] ;
506+
507+ let mut endpoints = FirehoseEndpoints :: from ( endpoint) ;
508+
509+ let err = endpoints. random ( ) . unwrap_err ( ) ;
510+ assert ! ( err. to_string( ) . contains( "conn_pool_size" ) ) ;
511+
512+ // Fails when empty too
513+ endpoints. remove ( "" ) ;
514+
515+ let err = endpoints. random ( ) . unwrap_err ( ) ;
516+ assert ! ( err. to_string( ) . contains( "no available firehose endpoints" ) ) ;
517+ }
435518}
0 commit comments