Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 76 additions & 4 deletions crates/evm/traces/src/identifier/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ impl TraceIdentifier for ExternalIdentifier {
type FetchFuture =
Pin<Box<dyn Future<Output = (Address, Result<Option<Metadata>, EtherscanError>)>>>;

/// Maximum number of times a single address is retried through a transient Cloudflare
/// block before we give up on it. Bounded so a persistent block can't loop forever.
const MAX_CLOUDFLARE_RETRIES: u32 = 5;

/// A rate limit aware fetcher.
///
/// Fetches information about multiple addresses concurrently, while respecting rate limits.
Expand All @@ -243,6 +247,8 @@ struct ExternalFetcher {
queue: Vec<Address>,
/// The in progress requests
in_progress: FuturesUnordered<FetchFuture>,
/// Per-address retry counter for transient Cloudflare blocks.
attempts: HashMap<Address, u32>,
}

impl ExternalFetcher {
Expand All @@ -254,6 +260,7 @@ impl ExternalFetcher {
fetcher,
queue: to_fetch.to_vec(),
in_progress: FuturesUnordered::new(),
attempts: HashMap::default(),
}
}

Expand Down Expand Up @@ -319,10 +326,24 @@ impl Stream for ExternalFetcher {
return Poll::Ready(None);
}
Err(EtherscanError::BlockedByCloudflare) => {
warn!(target: "evm::traces::external", "blocked by cloudflare");
// mark key as invalid
pin.fetcher.invalid_api_key().store(true, Ordering::Relaxed);
return Poll::Ready(None);
// A Cloudflare block is transient rate limiting (often triggered
// by request bursts), not a permanent failure like an invalid key.
// Back off and retry the address a bounded number of times instead
// of aborting the whole stream, which would abandon every still-
// queued address and leave traces only partially decoded (#9880).
let attempts = {
let entry = pin.attempts.entry(addr).or_default();
*entry += 1;
*entry
};
if attempts <= MAX_CLOUDFLARE_RETRIES {
warn!(target: "evm::traces::external", attempts, "blocked by cloudflare, backing off");
pin.backoff = Some(tokio::time::interval(pin.timeout));
pin.queue.push(addr);
} else {
warn!(target: "evm::traces::external", "blocked by cloudflare, giving up on address");
return Poll::Ready(Some((addr, (pin.fetcher.kind(), None))));
}
}
Err(err) => {
warn!(target: "evm::traces::external", ?err, "could not get info");
Expand Down Expand Up @@ -510,3 +531,54 @@ impl From<SourcifyMetadata> for Metadata {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::{collections::HashSet as StdHashSet, sync::Mutex};

/// Fetcher that returns a transient Cloudflare block the first time it sees an address, then
/// succeeds. Mirrors Etherscan/Cloudflare throttling a burst of concurrent requests.
struct FlakyCloudflareFetcher {
seen: Mutex<StdHashSet<Address>>,
invalid: AtomicBool,
}

#[async_trait::async_trait]
impl ExternalFetcherT for FlakyCloudflareFetcher {
fn kind(&self) -> FetcherKind {
FetcherKind::Etherscan
}
fn timeout(&self) -> Duration {
Duration::from_millis(1)
}
fn concurrency(&self) -> usize {
1
}
fn invalid_api_key(&self) -> &AtomicBool {
&self.invalid
}
async fn fetch(&self, address: Address) -> Result<Option<Metadata>, EtherscanError> {
let first_time = self.seen.lock().unwrap().insert(address);
if first_time { Err(EtherscanError::BlockedByCloudflare) } else { Ok(None) }
}
}

/// Regression test for #9880: a transient Cloudflare block on one address must not abandon the
/// rest of the queue. Before the fix the fetcher returned `Poll::Ready(None)` on the first
/// block, ending the stream and leaving later addresses unidentified (partial trace decoding).
#[tokio::test]
async fn cloudflare_block_retries_instead_of_abandoning_queue() {
let addrs: Vec<Address> = (1u8..=4).map(Address::with_last_byte).collect();
let fetcher: Arc<dyn ExternalFetcherT> = Arc::new(FlakyCloudflareFetcher {
seen: Mutex::new(StdHashSet::new()),
invalid: AtomicBool::new(false),
});

let collected: Vec<_> = ExternalFetcher::new(fetcher, &addrs).collect().await;

let got: StdHashSet<Address> = collected.into_iter().map(|(addr, _)| addr).collect();
let want: StdHashSet<Address> = addrs.into_iter().collect();
assert_eq!(got, want, "every address must be yielded despite a transient cloudflare block");
}
}