Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl EthereumAdapter {
let retry_log_message =
format!("trace_filter RPC call for block range: [{}..{}]", from, to);
retry(retry_log_message, &logger)
.redact_log_urls(true)
.limit(ENV_VARS.request_retries)
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Expand Down Expand Up @@ -295,6 +296,7 @@ impl EthereumAdapter {
let eth_adapter = self.clone();
let retry_log_message = format!("eth_getLogs RPC call for block range: [{}..{}]", from, to);
retry(retry_log_message, &logger)
.redact_log_urls(true)
.when(move |res: &Result<_, web3::error::Error>| match res {
Ok(_) => false,
Err(e) => !too_many_logs_fingerprints
Expand Down Expand Up @@ -511,6 +513,7 @@ impl EthereumAdapter {
let retry_log_message = format!("eth_getCode RPC call for block {}", block_ptr);

retry(retry_log_message, &logger)
.redact_log_urls(true)
.when(|result| match result {
Ok(_) => false,
Err(_) => true,
Expand Down Expand Up @@ -546,6 +549,7 @@ impl EthereumAdapter {
let retry_log_message = format!("eth_getBalance RPC call for block {}", block_ptr);

retry(retry_log_message, &logger)
.redact_log_urls(true)
.when(|result| match result {
Ok(_) => false,
Err(_) => true,
Expand Down Expand Up @@ -586,6 +590,7 @@ impl EthereumAdapter {
let block_id = self.block_ptr_to_id(&block_ptr);
let retry_log_message = format!("eth_call RPC call for block {}", block_ptr);
retry(retry_log_message, &logger)
.redact_log_urls(true)
.limit(ENV_VARS.request_retries)
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Expand Down Expand Up @@ -765,6 +770,7 @@ impl EthereumAdapter {
stream::iter_ok::<_, Error>(ids.into_iter().map(move |hash| {
let web3 = web3.clone();
retry(format!("load block {}", hash), &logger)
.redact_log_urls(true)
.limit(ENV_VARS.request_retries)
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Expand Down Expand Up @@ -799,6 +805,7 @@ impl EthereumAdapter {

async move {
retry(format!("load block {}", number), &logger)
.redact_log_urls(true)
.limit(ENV_VARS.request_retries)
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Expand Down Expand Up @@ -856,6 +863,7 @@ impl EthereumAdapter {
stream::iter_ok::<_, Error>(block_nums.into_iter().map(move |block_num| {
let web3 = web3.clone();
retry(format!("load block ptr {}", block_num), &logger)
.redact_log_urls(true)
.when(|res| !res.is_ok() && !detect_null_block(res))
.no_limit()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
Expand Down Expand Up @@ -1140,6 +1148,7 @@ impl EthereumAdapter {
let web3 = self.web3.clone();
u64::try_from(
retry("chain_id RPC call", &logger)
.redact_log_urls(true)
.no_limit()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Expand Down Expand Up @@ -1175,6 +1184,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
let metrics = self.metrics.clone();
let provider = self.provider().to_string();
let net_version_future = retry("net_version RPC call", &logger)
.redact_log_urls(true)
.no_limit()
.timeout_secs(20)
.run(move || {
Expand Down Expand Up @@ -1203,6 +1213,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
ENV_VARS.genesis_block_number
);
let gen_block_hash_future = retry(retry_log_message, &logger)
.redact_log_urls(true)
.no_limit()
.timeout_secs(30)
.run(move || {
Expand Down Expand Up @@ -1254,6 +1265,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
let web3 = self.web3.clone();
Box::new(
retry("eth_getBlockByNumber(latest) no txs RPC call", logger)
.redact_log_urls(true)
.no_limit()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Expand Down Expand Up @@ -1288,6 +1300,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
let web3 = self.web3.clone();
Box::new(
retry("eth_getBlockByNumber(latest) with txs RPC call", logger)
.redact_log_urls(true)
.no_limit()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Expand Down Expand Up @@ -1345,6 +1358,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
);
Box::new(
retry(retry_log_message, &logger)
.redact_log_urls(true)
.limit(ENV_VARS.request_retries)
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Expand Down Expand Up @@ -1376,6 +1390,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
);
Box::new(
retry(retry_log_message, &logger)
.redact_log_urls(true)
.no_limit()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Expand Down Expand Up @@ -1458,6 +1473,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
);
Box::new(
retry(retry_log_message, logger)
.redact_log_urls(true)
.no_limit()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Expand Down Expand Up @@ -1525,6 +1541,7 @@ impl EthereumAdapterTrait for EthereumAdapter {
let web3 = self.web3.clone();
let logger = logger.clone();
let res = retry(retry_log_message, &logger)
.redact_log_urls(true)
.when(|res| !res.is_ok() && !detect_null_block(res))
.no_limit()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
Expand Down Expand Up @@ -2279,6 +2296,7 @@ async fn fetch_transaction_receipts_in_batch_with_retry(
block_hash
);
retry(retry_log_message, &logger)
.redact_log_urls(true)
.limit(ENV_VARS.request_retries)
.no_logging()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
Expand Down Expand Up @@ -2406,6 +2424,7 @@ async fn fetch_block_receipts_with_retry(

// Perform the retry operation
let receipts_option = retry(retry_log_message, &logger)
.redact_log_urls(true)
.limit(ENV_VARS.request_retries)
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || web3.eth().block_receipts(BlockId::Hash(block_hash)).boxed())
Expand Down Expand Up @@ -2450,6 +2469,7 @@ async fn fetch_transaction_receipt_with_retry(
transaction_hash
);
retry(retry_log_message, &logger)
.redact_log_urls(true)
.limit(ENV_VARS.request_retries)
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || web3.eth().transaction_receipt(transaction_hash).boxed())
Expand Down
36 changes: 32 additions & 4 deletions graph/src/util/futures.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::ext::futures::FutureExtension;
use futures03::{Future, FutureExt, TryFutureExt};
use lazy_static::lazy_static;
use regex::Regex;
use slog::{debug, trace, warn, Logger};
use std::fmt::Debug;
use std::marker::PhantomData;
Expand Down Expand Up @@ -61,6 +63,7 @@ pub fn retry<I, E>(operation_name: impl ToString, logger: &Logger) -> RetryConfi
log_after: 1,
warn_after: 10,
limit: RetryConfigProperty::Unknown,
redact_log_urls: false,
phantom_item: PhantomData,
phantom_error: PhantomData,
}
Expand All @@ -75,6 +78,7 @@ pub struct RetryConfig<I, E> {
limit: RetryConfigProperty<usize>,
phantom_item: PhantomData<I>,
phantom_error: PhantomData<E>,
redact_log_urls: bool,
}

impl<I, E> RetryConfig<I, E>
Expand Down Expand Up @@ -125,6 +129,12 @@ where
self
}

/// Redact alphanumeric URLs from log messages.
pub fn redact_log_urls(mut self, redact_log_urls: bool) -> Self {
self.redact_log_urls = redact_log_urls;
self
}

/// Set how long (in seconds) to wait for an attempt to complete before giving up on that
/// attempt.
pub fn timeout_secs(self, timeout_secs: u64) -> RetryConfigWithTimeout<I, E> {
Expand Down Expand Up @@ -173,6 +183,7 @@ where
let log_after = self.inner.log_after;
let warn_after = self.inner.warn_after;
let limit_opt = self.inner.limit.unwrap(&operation_name, "limit");
let redact_log_urls = self.inner.redact_log_urls;
let timeout = self.timeout;

trace!(logger, "Run with retry: {}", operation_name);
Expand All @@ -184,6 +195,7 @@ where
log_after,
warn_after,
limit_opt,
redact_log_urls,
move || {
try_it()
.timeout(timeout)
Expand Down Expand Up @@ -214,6 +226,7 @@ impl<I, E> RetryConfigNoTimeout<I, E> {
let log_after = self.inner.log_after;
let warn_after = self.inner.warn_after;
let limit_opt = self.inner.limit.unwrap(&operation_name, "limit");
let redact_log_urls = self.inner.redact_log_urls;

trace!(logger, "Run with retry: {}", operation_name);

Expand All @@ -224,6 +237,7 @@ impl<I, E> RetryConfigNoTimeout<I, E> {
log_after,
warn_after,
limit_opt,
redact_log_urls,
// No timeout, so all errors are inner errors
move || try_it().map_err(TimeoutError::Inner),
)
Expand Down Expand Up @@ -265,6 +279,7 @@ fn run_retry<O, E, F, R>(
log_after: u64,
warn_after: u64,
limit_opt: Option<usize>,
redact_log_urls: bool,
mut try_it_with_timeout: F,
) -> impl Future<Output = Result<O, TimeoutError<E>>> + Send
where
Expand Down Expand Up @@ -311,25 +326,38 @@ where

// If needs retry
if condition.check(&result) {
let result_str = || {
if redact_log_urls {
lazy_static! {
static ref RE: Regex =
Regex::new(r#"https?://[a-zA-Z0-9\-\._:/\?#&=]+"#).unwrap();
}
let e = format!("{result:?}");
RE.replace_all(&e, "[REDACTED]").into_owned()
} else {
format!("{result:?}")
}
};

if attempt_count >= warn_after {
// This looks like it would be nice to de-duplicate, but if we try
// to use log! slog complains about requiring a const for the log level
// See also b05e1594-e408-4047-aefb-71fc60d70e8f
warn!(
logger,
"Trying again after {} failed (attempt #{}) with result {:?}",
"Trying again after {} failed (attempt #{}) with result {}",
&operation_name,
attempt_count,
result
result_str(),
);
} else if attempt_count >= log_after {
// See also b05e1594-e408-4047-aefb-71fc60d70e8f
debug!(
logger,
"Trying again after {} failed (attempt #{}) with result {:?}",
"Trying again after {} failed (attempt #{}) with result {}",
&operation_name,
attempt_count,
result
result_str(),
);
}

Expand Down
Loading