Skip to content

Commit 0737908

Browse files
authored
Add support for Web3Call adapter (#4351)
- Always prefer web3call adapter for runtime adapter on ethereum if available - Prevent non call functions on call_only adapters - Add web3call type configuration - Require Web3Call endpoint to be archive
1 parent e4cb769 commit 0737908

10 files changed

Lines changed: 258 additions & 48 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chain/ethereum/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ graph-runtime-derive = { path = "../../runtime/derive" }
2727
[dev-dependencies]
2828
test-store = { path = "../../store/test-store" }
2929
base64 = "0.20.0"
30+
graph-mock = { path = "../../mock" }
3031

3132
[build-dependencies]
3233
tonic-build = { workspace = true }

chain/ethereum/src/chain.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ impl TriggersAdapterSelector<Chain> for EthereumAdapterSelector {
176176
traces: false,
177177
};
178178

179-
self.adapters.cheapest_with(&adjusted_capabilities)?
179+
self.adapters
180+
.call_or_cheapest(Some(&adjusted_capabilities))?
180181
} else {
181182
self.adapters.cheapest_with(capabilities)?
182183
};

chain/ethereum/src/ethereum_adapter.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ pub struct EthereumAdapter {
6363
web3: Arc<Web3<Transport>>,
6464
metrics: Arc<ProviderEthRpcMetrics>,
6565
supports_eip_1898: bool,
66+
call_only: bool,
6667
}
6768

6869
/// Gas limit for `eth_call`. The value of 50_000_000 is a protocol-wide parameter so this
@@ -84,18 +85,24 @@ impl CheapClone for EthereumAdapter {
8485
web3: self.web3.cheap_clone(),
8586
metrics: self.metrics.cheap_clone(),
8687
supports_eip_1898: self.supports_eip_1898,
88+
call_only: self.call_only,
8789
}
8890
}
8991
}
9092

9193
impl EthereumAdapter {
94+
pub fn is_call_only(&self) -> bool {
95+
self.call_only
96+
}
97+
9298
pub async fn new(
9399
logger: Logger,
94100
provider: String,
95101
url: &str,
96102
transport: Transport,
97103
provider_metrics: Arc<ProviderEthRpcMetrics>,
98104
supports_eip_1898: bool,
105+
call_only: bool,
99106
) -> Self {
100107
// Unwrap: The transport was constructed with this url, so it is valid and has a host.
101108
let hostname = graph::url::Url::parse(url)
@@ -122,6 +129,7 @@ impl EthereumAdapter {
122129
web3,
123130
metrics: provider_metrics,
124131
supports_eip_1898: supports_eip_1898 && !is_ganache,
132+
call_only,
125133
}
126134
}
127135

@@ -133,6 +141,8 @@ impl EthereumAdapter {
133141
to: BlockNumber,
134142
addresses: Vec<H160>,
135143
) -> Result<Vec<Trace>, Error> {
144+
assert!(!self.call_only);
145+
136146
let eth = self.clone();
137147
let retry_log_message =
138148
format!("trace_filter RPC call for block range: [{}..{}]", from, to);
@@ -225,6 +235,8 @@ impl EthereumAdapter {
225235
filter: Arc<EthGetLogsFilter>,
226236
too_many_logs_fingerprints: &'static [&'static str],
227237
) -> Result<Vec<Log>, TimeoutError<web3::error::Error>> {
238+
assert!(!self.call_only);
239+
228240
let eth_adapter = self.clone();
229241
let retry_log_message = format!("eth_getLogs RPC call for block range: [{}..{}]", from, to);
230242
retry(retry_log_message, &logger)

chain/ethereum/src/network.rs

Lines changed: 163 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use anyhow::{anyhow, Context};
1+
use anyhow::{anyhow, bail, Context};
22
use graph::cheap_clone::CheapClone;
33
use graph::prelude::rand::{self, seq::IteratorRandom};
44
use std::cmp::Ordering;
@@ -23,12 +23,26 @@ pub struct EthereumNetworkAdapter {
2323
limit: usize,
2424
}
2525

26-
#[derive(Clone)]
26+
impl EthereumNetworkAdapter {
27+
fn is_call_only(&self) -> bool {
28+
self.adapter.is_call_only()
29+
}
30+
}
31+
32+
#[derive(Clone, Default)]
2733
pub struct EthereumNetworkAdapters {
2834
pub adapters: Vec<EthereumNetworkAdapter>,
35+
pub call_only_adapters: Vec<EthereumNetworkAdapter>,
2936
}
3037

3138
impl EthereumNetworkAdapters {
39+
pub fn push_adapter(&mut self, adapter: EthereumNetworkAdapter) {
40+
if adapter.is_call_only() {
41+
self.call_only_adapters.push(adapter);
42+
} else {
43+
self.adapters.push(adapter);
44+
}
45+
}
3246
pub fn all_cheapest_with(
3347
&self,
3448
required_capabilities: &NodeCapabilities,
@@ -73,6 +87,42 @@ impl EthereumNetworkAdapters {
7387
self.adapters
7488
.retain(|adapter| adapter.adapter.provider() != provider);
7589
}
90+
91+
pub fn call_or_cheapest(
92+
&self,
93+
capabilities: Option<&NodeCapabilities>,
94+
) -> anyhow::Result<Arc<EthereumAdapter>> {
95+
match self.call_only_adapter()? {
96+
Some(adapter) => Ok(adapter),
97+
None => self.cheapest_with(capabilities.unwrap_or(&NodeCapabilities {
98+
// Archive is required for call_only
99+
archive: true,
100+
traces: false,
101+
})),
102+
}
103+
}
104+
105+
pub fn call_only_adapter(&self) -> anyhow::Result<Option<Arc<EthereumAdapter>>> {
106+
if self.call_only_adapters.is_empty() {
107+
return Ok(None);
108+
}
109+
110+
let adapters = self
111+
.call_only_adapters
112+
.iter()
113+
.min_by_key(|x| Arc::strong_count(&x.adapter))
114+
.ok_or(anyhow!("no available call only endpoints"))?;
115+
116+
// TODO: This will probably blow up a lot sooner than [limit] amount of
117+
// subgraphs, since we probably use a few instances.
118+
if Arc::strong_count(&adapters.adapter) >= adapters.limit {
119+
bail!("call only adapter has reached the concurrency limit");
120+
}
121+
122+
// Cloning here ensure we have the correct count at any given time, if we return a reference it can be cloned later
123+
// which could cause a high number of endpoints to be given away before accounting for them.
124+
Ok(Some(adapters.adapter.clone()))
125+
}
76126
}
77127

78128
#[derive(Clone)]
@@ -97,8 +147,9 @@ impl EthereumNetworks {
97147
let network_adapters = self
98148
.networks
99149
.entry(name)
100-
.or_insert(EthereumNetworkAdapters { adapters: vec![] });
101-
network_adapters.adapters.push(EthereumNetworkAdapter {
150+
.or_insert(EthereumNetworkAdapters::default());
151+
152+
network_adapters.push_adapter(EthereumNetworkAdapter {
102153
capabilities,
103154
adapter,
104155
limit,
@@ -160,6 +211,14 @@ impl EthereumNetworks {
160211

161212
#[cfg(test)]
162213
mod tests {
214+
use std::sync::Arc;
215+
216+
use graph::{prelude::MetricsRegistry, tokio, url::Url};
217+
use graph_mock::MockMetricsRegistry;
218+
use http::HeaderMap;
219+
220+
use crate::{EthereumAdapter, EthereumNetworks, ProviderEthRpcMetrics, Transport};
221+
163222
use super::NodeCapabilities;
164223

165224
#[test]
@@ -216,4 +275,104 @@ mod tests {
216275
assert_eq!(true, &full_traces >= &full);
217276
assert_eq!(true, &full_traces >= &full_traces);
218277
}
278+
279+
#[tokio::test]
280+
async fn adapter_selector_selects_eth_call() {
281+
let chain = "mainnet".to_string();
282+
let logger = graph::log::logger(true);
283+
let mock_registry: Arc<dyn MetricsRegistry> = Arc::new(MockMetricsRegistry::new());
284+
let transport =
285+
Transport::new_rpc(Url::parse("http://127.0.0.1").unwrap(), HeaderMap::new());
286+
let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone()));
287+
288+
let eth_call_adapter = Arc::new(
289+
EthereumAdapter::new(
290+
logger.clone(),
291+
String::new(),
292+
"http://127.0.0.1",
293+
transport.clone(),
294+
provider_metrics.clone(),
295+
true,
296+
true,
297+
)
298+
.await,
299+
);
300+
301+
let eth_adapter = Arc::new(
302+
EthereumAdapter::new(
303+
logger.clone(),
304+
String::new(),
305+
"http://127.0.0.1",
306+
transport.clone(),
307+
provider_metrics.clone(),
308+
true,
309+
false,
310+
)
311+
.await,
312+
);
313+
314+
let mut adapters = {
315+
let mut ethereum_networks = EthereumNetworks::new();
316+
ethereum_networks.insert(
317+
chain.clone(),
318+
NodeCapabilities {
319+
archive: true,
320+
traces: false,
321+
},
322+
eth_call_adapter.clone(),
323+
3,
324+
);
325+
ethereum_networks.insert(
326+
chain.clone(),
327+
NodeCapabilities {
328+
archive: true,
329+
traces: false,
330+
},
331+
eth_adapter.clone(),
332+
3,
333+
);
334+
ethereum_networks.networks.get(&chain).unwrap().clone()
335+
};
336+
// one reference above and one inside adapters struct
337+
assert_eq!(Arc::strong_count(&eth_call_adapter), 2);
338+
assert_eq!(Arc::strong_count(&eth_adapter), 2);
339+
340+
{
341+
// Not Found
342+
assert!(adapters
343+
.cheapest_with(&NodeCapabilities {
344+
archive: false,
345+
traces: true,
346+
})
347+
.is_err());
348+
349+
// Check cheapest is not call only
350+
let adapter = adapters
351+
.cheapest_with(&NodeCapabilities {
352+
archive: true,
353+
traces: false,
354+
})
355+
.unwrap();
356+
assert_eq!(adapter.is_call_only(), false);
357+
}
358+
359+
// Check limits
360+
{
361+
let adapter = adapters.call_or_cheapest(None).unwrap();
362+
assert!(adapter.is_call_only());
363+
assert!(adapters.call_or_cheapest(None).is_err());
364+
}
365+
366+
// Check empty falls back to call only
367+
{
368+
adapters.call_only_adapters = vec![];
369+
let adapter = adapters
370+
.call_or_cheapest(Some(&NodeCapabilities {
371+
archive: true,
372+
traces: false,
373+
}))
374+
.unwrap();
375+
assert_eq!(adapter.is_call_only(), false);
376+
}
377+
}
219378
}

chain/ethereum/src/runtime/runtime_adapter.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,10 @@ impl blockchain::RuntimeAdapter<Chain> for RuntimeAdapter {
4343
fn host_fns(&self, ds: &DataSource) -> Result<Vec<HostFn>, Error> {
4444
let abis = ds.mapping.abis.clone();
4545
let call_cache = self.call_cache.cheap_clone();
46-
let eth_adapter = self
47-
.eth_adapters
48-
.cheapest_with(&NodeCapabilities {
49-
archive: ds.mapping.requires_archive()?,
50-
traces: false,
51-
})?
52-
.cheap_clone();
46+
let eth_adapter = self.eth_adapters.call_or_cheapest(Some(&NodeCapabilities {
47+
archive: ds.mapping.requires_archive()?,
48+
traces: false,
49+
}))?;
5350

5451
let ethereum_call = HostFn {
5552
name: "ethereum.call",

node/resources/tests/full_config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ ingestor = "index_0"
4747
shard = "primary"
4848
provider = [
4949
{ label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] },
50+
{ label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }},
5051
{ label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }},
5152
{ label = "substreams", details = { type = "substreams", url = "http://localhost:9000", features = [] }},
5253
]

0 commit comments

Comments
 (0)