Skip to content

Commit fc30c22

Browse files
committed
ethereum: Encapsulate adapter field and use HashMap for health lookups
- Make `adapter` field private on EthereumNetworkAdapter, add getter - Replace Vec-based health checker lookup with HashMap<String, Arc<Health>> for O(1) lookups instead of O(n*m) - Remove redundant empty check in select_weighted_adapter; WeightedIndex already returns Err for empty input, falling through to random selection - Replace struct literal construction in tests with ::new() calls - Add explicit assertions that health scores start at 1.0
1 parent 71a6f8c commit fc30c22

1 file changed

Lines changed: 75 additions & 68 deletions

File tree

chain/ethereum/src/network.rs

Lines changed: 75 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use graph::prelude::rand::{
1414
Rng,
1515
};
1616
use itertools::Itertools;
17+
use std::collections::HashMap;
1718
use std::sync::Arc;
1819

1920
pub use graph::impl_slog_value;
@@ -29,7 +30,7 @@ pub const DEFAULT_ADAPTER_ERROR_RETEST_PERCENT: f64 = 0.2;
2930
pub struct EthereumNetworkAdapter {
3031
endpoint_metrics: Arc<EndpointMetrics>,
3132
pub capabilities: NodeCapabilities,
32-
pub adapter: Arc<EthereumAdapter>,
33+
adapter: Arc<EthereumAdapter>,
3334
/// The maximum number of times this adapter can be used. We use the
3435
/// strong_count on `adapter` to determine whether the adapter is above
3536
/// that limit. That's a somewhat imprecise but convenient way to
@@ -70,6 +71,10 @@ impl EthereumNetworkAdapter {
7071
}
7172
}
7273

74+
pub fn adapter(&self) -> &Arc<EthereumAdapter> {
75+
&self.adapter
76+
}
77+
7378
#[cfg(debug_assertions)]
7479
fn is_call_only(&self) -> bool {
7580
self.adapter.is_call_only()
@@ -97,7 +102,7 @@ pub struct EthereumNetworkAdapters {
97102
// Percentage of request that should be used to retest errored adapters.
98103
retest_percent: f64,
99104
weighted: bool,
100-
health_checkers: Vec<Arc<Health>>,
105+
health_checkers: HashMap<String, Arc<Health>>,
101106
}
102107

103108
impl EthereumNetworkAdapters {
@@ -108,7 +113,7 @@ impl EthereumNetworkAdapters {
108113
call_only_adapters: vec![],
109114
retest_percent: DEFAULT_ADAPTER_ERROR_RETEST_PERCENT,
110115
weighted: false,
111-
health_checkers: vec![],
116+
health_checkers: HashMap::new(),
112117
}
113118
}
114119

@@ -135,7 +140,7 @@ impl EthereumNetworkAdapters {
135140
ProviderCheckStrategy::MarkAsValid,
136141
);
137142

138-
Self::new(chain_id, provider, call_only, None, false, vec![])
143+
Self::new(chain_id, provider, call_only, None, false, HashMap::new())
139144
}
140145

141146
pub fn new(
@@ -144,7 +149,7 @@ impl EthereumNetworkAdapters {
144149
call_only_adapters: Vec<EthereumNetworkAdapter>,
145150
retest_percent: Option<f64>,
146151
weighted: bool,
147-
health_checkers: Vec<Arc<Health>>,
152+
health_checkers: HashMap<String, Arc<Health>>,
148153
) -> Self {
149154
#[cfg(debug_assertions)]
150155
call_only_adapters.iter().for_each(|a| {
@@ -233,7 +238,7 @@ impl EthereumNetworkAdapters {
233238
.max_by_key(|a| a.current_error_count())
234239
.filter(|a| a.current_error_count() > 0)
235240
{
236-
return Ok(most_errored.adapter.clone());
241+
return Ok(most_errored.adapter().clone());
237242
}
238243
}
239244

@@ -267,29 +272,21 @@ impl EthereumNetworkAdapters {
267272
input: &[&EthereumNetworkAdapter],
268273
required_capabilities: &NodeCapabilities,
269274
) -> Result<Arc<EthereumAdapter>, Error> {
270-
if input.is_empty() {
271-
return Err(anyhow!(
272-
"A matching Ethereum network with {:?} was not found.",
273-
required_capabilities
274-
));
275-
}
276-
277275
let weights: Vec<_> = input
278276
.iter()
279277
.map(|a| {
280-
let health_checker = self
278+
let score = self
281279
.health_checkers
282-
.iter()
283-
.find(|h| h.provider() == a.provider());
284-
let score = health_checker.map_or(1.0, |h| h.score());
280+
.get(a.provider())
281+
.map_or(1.0, |h| h.score());
285282
a.weight * score
286283
})
287284
.collect();
288285
if let Ok(dist) = WeightedIndex::new(&weights) {
289286
let idx = dist.sample(&mut rand::rng());
290-
Ok(input[idx].adapter.clone())
287+
Ok(input[idx].adapter().clone())
291288
} else {
292-
// Fallback to random selection if weights are invalid
289+
// Fallback to random selection if weights are invalid (e.g., all zeros or empty)
293290
Self::select_random_adapter(input, required_capabilities)
294291
}
295292
}
@@ -303,12 +300,9 @@ impl EthereumNetworkAdapters {
303300
input: &[&EthereumNetworkAdapter],
304301
required_capabilities: &NodeCapabilities,
305302
) -> Result<Arc<EthereumAdapter>, Error> {
306-
let choices = input
307-
.iter()
308-
.copied()
309-
.choose_multiple(&mut rand::rng(), 3);
303+
let choices = input.iter().copied().choose_multiple(&mut rand::rng(), 3);
310304
if let Some(adapter) = choices.iter().min_by_key(|a| a.current_error_count()) {
311-
Ok(adapter.adapter.clone())
305+
Ok(adapter.adapter().clone())
312306
} else {
313307
Err(anyhow!(
314308
"A matching Ethereum network with {:?} was not found.",
@@ -349,7 +343,7 @@ impl EthereumNetworkAdapters {
349343
.await
350344
.map(|mut adapters| adapters.next())
351345
.unwrap_or_default()
352-
.map(|ethereum_network_adapter| ethereum_network_adapter.adapter.clone())
346+
.map(|ethereum_network_adapter| ethereum_network_adapter.adapter().clone())
353347
}
354348

355349
/// call_or_cheapest will bypass ProviderManagers' validation in order to remain non async.
@@ -381,21 +375,21 @@ impl EthereumNetworkAdapters {
381375
let adapters = self
382376
.call_only_adapters
383377
.iter()
384-
.min_by_key(|x| Arc::strong_count(&x.adapter))
378+
.min_by_key(|x| Arc::strong_count(x.adapter()))
385379
.ok_or(anyhow!("no available call only endpoints"))?;
386380

387381
// TODO: This will probably blow up a lot sooner than [limit] amount of
388382
// subgraphs, since we probably use a few instances.
389383
if !adapters
390384
.limit
391-
.has_capacity(Arc::strong_count(&adapters.adapter))
385+
.has_capacity(Arc::strong_count(adapters.adapter()))
392386
{
393387
bail!("call only adapter has reached the concurrency limit");
394388
}
395389

396390
// Cloning here ensure we have the correct count at any given time, if we return a reference it can be cloned later
397391
// which could cause a high number of endpoints to be given away before accounting for them.
398-
Ok(Some(adapters.adapter.clone()))
392+
Ok(Some(adapters.adapter().clone()))
399393
}
400394
}
401395

@@ -412,6 +406,7 @@ mod tests {
412406
use graph::{
413407
endpoint::EndpointMetrics, firehose::SubgraphLimit, prelude::MetricsRegistry, url::Url,
414408
};
409+
use std::collections::HashMap;
415410
use std::sync::Arc;
416411

417412
use crate::{EthereumAdapter, EthereumAdapterTrait, ProviderEthRpcMetrics, Transport};
@@ -812,26 +807,26 @@ mod tests {
812807
SubgraphLimit::Unlimited
813808
};
814809

815-
no_retest_adapters.push(EthereumNetworkAdapter {
816-
endpoint_metrics: metrics.clone(),
817-
capabilities: NodeCapabilities {
810+
no_retest_adapters.push(EthereumNetworkAdapter::new(
811+
metrics.clone(),
812+
NodeCapabilities {
818813
archive: true,
819814
traces: false,
820815
},
821-
adapter: adapter.clone(),
822-
limit: limit.clone(),
823-
weight: 1.0,
824-
});
825-
always_retest_adapters.push(EthereumNetworkAdapter {
826-
endpoint_metrics: metrics.clone(),
827-
capabilities: NodeCapabilities {
816+
adapter.clone(),
817+
limit.clone(),
818+
1.0,
819+
));
820+
always_retest_adapters.push(EthereumNetworkAdapter::new(
821+
metrics.clone(),
822+
NodeCapabilities {
828823
archive: true,
829824
traces: false,
830825
},
831826
adapter,
832827
limit,
833-
weight: 1.0,
834-
});
828+
1.0,
829+
));
835830
});
836831
let manager = ProviderManager::<EthereumNetworkAdapter>::new(
837832
logger,
@@ -853,7 +848,7 @@ mod tests {
853848
vec![],
854849
Some(0f64),
855850
false,
856-
vec![],
851+
HashMap::new(),
857852
);
858853

859854
let always_retest_adapters = EthereumNetworkAdapters::new(
@@ -862,7 +857,7 @@ mod tests {
862857
vec![],
863858
Some(1f64),
864859
false,
865-
vec![],
860+
HashMap::new(),
866861
);
867862

868863
assert_eq!(
@@ -910,36 +905,35 @@ mod tests {
910905
metrics.report_for_test(&ProviderName::from(error_provider), false);
911906

912907
let mut no_retest_adapters = vec![];
913-
no_retest_adapters.push(EthereumNetworkAdapter {
914-
endpoint_metrics: metrics.clone(),
915-
capabilities: NodeCapabilities {
908+
no_retest_adapters.push(EthereumNetworkAdapter::new(
909+
metrics.clone(),
910+
NodeCapabilities {
916911
archive: true,
917912
traces: false,
918913
},
919-
adapter: fake_adapter(&logger, error_provider, &provider_metrics, &metrics, false)
920-
.await,
921-
limit: SubgraphLimit::Unlimited,
922-
weight: 1.0,
923-
});
914+
fake_adapter(&logger, error_provider, &provider_metrics, &metrics, false).await,
915+
SubgraphLimit::Unlimited,
916+
1.0,
917+
));
924918

925919
let mut always_retest_adapters = vec![];
926-
always_retest_adapters.push(EthereumNetworkAdapter {
927-
endpoint_metrics: metrics.clone(),
928-
capabilities: NodeCapabilities {
920+
always_retest_adapters.push(EthereumNetworkAdapter::new(
921+
metrics.clone(),
922+
NodeCapabilities {
929923
archive: true,
930924
traces: false,
931925
},
932-
adapter: fake_adapter(
926+
fake_adapter(
933927
&logger,
934928
no_error_provider,
935929
&provider_metrics,
936930
&metrics,
937931
false,
938932
)
939933
.await,
940-
limit: SubgraphLimit::Unlimited,
941-
weight: 1.0,
942-
});
934+
SubgraphLimit::Unlimited,
935+
1.0,
936+
));
943937
let manager = ProviderManager::<EthereumNetworkAdapter>::new(
944938
logger.clone(),
945939
always_retest_adapters
@@ -955,7 +949,7 @@ mod tests {
955949
vec![],
956950
Some(1f64),
957951
false,
958-
vec![],
952+
HashMap::new(),
959953
);
960954

961955
assert_eq!(
@@ -985,7 +979,7 @@ mod tests {
985979
vec![],
986980
Some(0f64),
987981
false,
988-
vec![],
982+
HashMap::new(),
989983
);
990984
assert_eq!(
991985
no_retest_adapters
@@ -1000,31 +994,31 @@ mod tests {
1000994
);
1001995

1002996
let mut no_available_adapter = vec![];
1003-
no_available_adapter.push(EthereumNetworkAdapter {
1004-
endpoint_metrics: metrics.clone(),
1005-
capabilities: NodeCapabilities {
997+
no_available_adapter.push(EthereumNetworkAdapter::new(
998+
metrics.clone(),
999+
NodeCapabilities {
10061000
archive: true,
10071001
traces: false,
10081002
},
1009-
adapter: fake_adapter(
1003+
fake_adapter(
10101004
&logger,
10111005
no_error_provider,
10121006
&provider_metrics,
10131007
&metrics,
10141008
false,
10151009
)
10161010
.await,
1017-
limit: SubgraphLimit::Disabled,
1018-
weight: 1.0,
1019-
});
1011+
SubgraphLimit::Disabled,
1012+
1.0,
1013+
));
10201014
let manager = ProviderManager::new(
10211015
logger,
10221016
vec![(chain_id.clone(), no_available_adapter.to_vec())].into_iter(),
10231017
ProviderCheckStrategy::MarkAsValid,
10241018
);
10251019

10261020
let no_available_adapter =
1027-
EthereumNetworkAdapters::new(chain_id, manager, vec![], None, false, vec![]);
1021+
EthereumNetworkAdapters::new(chain_id, manager, vec![], None, false, HashMap::new());
10281022
let res = no_available_adapter
10291023
.cheapest_with(&NodeCapabilities {
10301024
archive: true,
@@ -1128,7 +1122,20 @@ mod tests {
11281122
let health_checker1 = Arc::new(Health::new(adapter1.clone()));
11291123
let health_checker2 = Arc::new(Health::new(adapter2.clone()));
11301124

1131-
adapters.health_checkers = vec![health_checker1.clone(), health_checker2.clone()];
1125+
// Verify health checkers start with a perfect score of 1.0
1126+
assert_eq!(health_checker1.score(), 1.0);
1127+
assert_eq!(health_checker2.score(), 1.0);
1128+
1129+
let mut health_map = HashMap::new();
1130+
health_map.insert(
1131+
health_checker1.provider().to_string(),
1132+
health_checker1.clone(),
1133+
);
1134+
health_map.insert(
1135+
health_checker2.provider().to_string(),
1136+
health_checker2.clone(),
1137+
);
1138+
adapters.health_checkers = health_map;
11321139
adapters.weighted = true;
11331140

11341141
let mut adapter1_count = 0;

0 commit comments

Comments
 (0)