diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 7173c069c65..e0714c24f02 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -147,6 +147,7 @@ impl EthereumAdapter { let retry_log_message = format!("trace_filter RPC call for block range: [{}..{}]", from, to); retry(retry_log_message, &logger) + .redact_log_urls(true) .limit(ENV_VARS.request_retries) .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || { @@ -295,6 +296,7 @@ impl EthereumAdapter { let eth_adapter = self.clone(); let retry_log_message = format!("eth_getLogs RPC call for block range: [{}..{}]", from, to); retry(retry_log_message, &logger) + .redact_log_urls(true) .when(move |res: &Result<_, web3::error::Error>| match res { Ok(_) => false, Err(e) => !too_many_logs_fingerprints @@ -511,6 +513,7 @@ impl EthereumAdapter { let retry_log_message = format!("eth_getCode RPC call for block {}", block_ptr); retry(retry_log_message, &logger) + .redact_log_urls(true) .when(|result| match result { Ok(_) => false, Err(_) => true, @@ -546,6 +549,7 @@ impl EthereumAdapter { let retry_log_message = format!("eth_getBalance RPC call for block {}", block_ptr); retry(retry_log_message, &logger) + .redact_log_urls(true) .when(|result| match result { Ok(_) => false, Err(_) => true, @@ -586,6 +590,7 @@ impl EthereumAdapter { let block_id = self.block_ptr_to_id(&block_ptr); let retry_log_message = format!("eth_call RPC call for block {}", block_ptr); retry(retry_log_message, &logger) + .redact_log_urls(true) .limit(ENV_VARS.request_retries) .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || { @@ -765,6 +770,7 @@ impl EthereumAdapter { stream::iter_ok::<_, Error>(ids.into_iter().map(move |hash| { let web3 = web3.clone(); retry(format!("load block {}", hash), &logger) + .redact_log_urls(true) .limit(ENV_VARS.request_retries) .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || { @@ -799,6 +805,7 @@ impl EthereumAdapter { async move { retry(format!("load block {}", number), &logger) + .redact_log_urls(true) .limit(ENV_VARS.request_retries) .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || { @@ -856,6 +863,7 @@ impl EthereumAdapter { stream::iter_ok::<_, Error>(block_nums.into_iter().map(move |block_num| { let web3 = web3.clone(); retry(format!("load block ptr {}", block_num), &logger) + .redact_log_urls(true) .when(|res| !res.is_ok() && !detect_null_block(res)) .no_limit() .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) @@ -1140,6 +1148,7 @@ impl EthereumAdapter { let web3 = self.web3.clone(); u64::try_from( retry("chain_id RPC call", &logger) + .redact_log_urls(true) .no_limit() .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || { @@ -1175,6 +1184,7 @@ impl EthereumAdapterTrait for EthereumAdapter { let metrics = self.metrics.clone(); let provider = self.provider().to_string(); let net_version_future = retry("net_version RPC call", &logger) + .redact_log_urls(true) .no_limit() .timeout_secs(20) .run(move || { @@ -1203,6 +1213,7 @@ impl EthereumAdapterTrait for EthereumAdapter { ENV_VARS.genesis_block_number ); let gen_block_hash_future = retry(retry_log_message, &logger) + .redact_log_urls(true) .no_limit() .timeout_secs(30) .run(move || { @@ -1254,6 +1265,7 @@ impl EthereumAdapterTrait for EthereumAdapter { let web3 = self.web3.clone(); Box::new( retry("eth_getBlockByNumber(latest) no txs RPC call", logger) + .redact_log_urls(true) .no_limit() .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || { @@ -1288,6 +1300,7 @@ impl EthereumAdapterTrait for EthereumAdapter { let web3 = self.web3.clone(); Box::new( retry("eth_getBlockByNumber(latest) with txs RPC call", logger) + .redact_log_urls(true) .no_limit() .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || { @@ -1345,6 +1358,7 @@ impl EthereumAdapterTrait for EthereumAdapter { ); Box::new( retry(retry_log_message, &logger) + .redact_log_urls(true) .limit(ENV_VARS.request_retries) .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || { @@ -1376,6 +1390,7 @@ impl EthereumAdapterTrait for EthereumAdapter { ); Box::new( retry(retry_log_message, &logger) + .redact_log_urls(true) .no_limit() .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || { @@ -1458,6 +1473,7 @@ impl EthereumAdapterTrait for EthereumAdapter { ); Box::new( retry(retry_log_message, logger) + .redact_log_urls(true) .no_limit() .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || { @@ -1525,6 +1541,7 @@ impl EthereumAdapterTrait for EthereumAdapter { let web3 = self.web3.clone(); let logger = logger.clone(); let res = retry(retry_log_message, &logger) + .redact_log_urls(true) .when(|res| !res.is_ok() && !detect_null_block(res)) .no_limit() .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) @@ -2279,6 +2296,7 @@ async fn fetch_transaction_receipts_in_batch_with_retry( block_hash ); retry(retry_log_message, &logger) + .redact_log_urls(true) .limit(ENV_VARS.request_retries) .no_logging() .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) @@ -2406,6 +2424,7 @@ async fn fetch_block_receipts_with_retry( // Perform the retry operation let receipts_option = retry(retry_log_message, &logger) + .redact_log_urls(true) .limit(ENV_VARS.request_retries) .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || web3.eth().block_receipts(BlockId::Hash(block_hash)).boxed()) @@ -2450,6 +2469,7 @@ async fn fetch_transaction_receipt_with_retry( transaction_hash ); retry(retry_log_message, &logger) + .redact_log_urls(true) .limit(ENV_VARS.request_retries) .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || web3.eth().transaction_receipt(transaction_hash).boxed()) diff --git a/graph/src/util/futures.rs b/graph/src/util/futures.rs index d742457dcd1..7c49806c53a 100644 --- a/graph/src/util/futures.rs +++ b/graph/src/util/futures.rs @@ -1,5 +1,7 @@ use crate::ext::futures::FutureExtension; use futures03::{Future, FutureExt, TryFutureExt}; +use lazy_static::lazy_static; +use regex::Regex; use slog::{debug, trace, warn, Logger}; use std::fmt::Debug; use std::marker::PhantomData; @@ -61,6 +63,7 @@ pub fn retry(operation_name: impl ToString, logger: &Logger) -> RetryConfi log_after: 1, warn_after: 10, limit: RetryConfigProperty::Unknown, + redact_log_urls: false, phantom_item: PhantomData, phantom_error: PhantomData, } @@ -75,6 +78,7 @@ pub struct RetryConfig { limit: RetryConfigProperty, phantom_item: PhantomData, phantom_error: PhantomData, + redact_log_urls: bool, } impl RetryConfig @@ -125,6 +129,12 @@ where self } + /// Redact alphanumeric URLs from log messages. + pub fn redact_log_urls(mut self, redact_log_urls: bool) -> Self { + self.redact_log_urls = redact_log_urls; + self + } + /// Set how long (in seconds) to wait for an attempt to complete before giving up on that /// attempt. pub fn timeout_secs(self, timeout_secs: u64) -> RetryConfigWithTimeout { @@ -173,6 +183,7 @@ where let log_after = self.inner.log_after; let warn_after = self.inner.warn_after; let limit_opt = self.inner.limit.unwrap(&operation_name, "limit"); + let redact_log_urls = self.inner.redact_log_urls; let timeout = self.timeout; trace!(logger, "Run with retry: {}", operation_name); @@ -184,6 +195,7 @@ where log_after, warn_after, limit_opt, + redact_log_urls, move || { try_it() .timeout(timeout) @@ -214,6 +226,7 @@ impl RetryConfigNoTimeout { let log_after = self.inner.log_after; let warn_after = self.inner.warn_after; let limit_opt = self.inner.limit.unwrap(&operation_name, "limit"); + let redact_log_urls = self.inner.redact_log_urls; trace!(logger, "Run with retry: {}", operation_name); @@ -224,6 +237,7 @@ impl RetryConfigNoTimeout { log_after, warn_after, limit_opt, + redact_log_urls, // No timeout, so all errors are inner errors move || try_it().map_err(TimeoutError::Inner), ) @@ -265,6 +279,7 @@ fn run_retry( log_after: u64, warn_after: u64, limit_opt: Option, + redact_log_urls: bool, mut try_it_with_timeout: F, ) -> impl Future>> + Send where @@ -311,25 +326,38 @@ where // If needs retry if condition.check(&result) { + let result_str = || { + if redact_log_urls { + lazy_static! { + static ref RE: Regex = + Regex::new(r#"https?://[a-zA-Z0-9\-\._:/\?#&=]+"#).unwrap(); + } + let e = format!("{result:?}"); + RE.replace_all(&e, "[REDACTED]").into_owned() + } else { + format!("{result:?}") + } + }; + if attempt_count >= warn_after { // This looks like it would be nice to de-duplicate, but if we try // to use log! slog complains about requiring a const for the log level // See also b05e1594-e408-4047-aefb-71fc60d70e8f warn!( logger, - "Trying again after {} failed (attempt #{}) with result {:?}", + "Trying again after {} failed (attempt #{}) with result {}", &operation_name, attempt_count, - result + result_str(), ); } else if attempt_count >= log_after { // See also b05e1594-e408-4047-aefb-71fc60d70e8f debug!( logger, - "Trying again after {} failed (attempt #{}) with result {:?}", + "Trying again after {} failed (attempt #{}) with result {}", &operation_name, attempt_count, - result + result_str(), ); }