Skip to content

Commit 5e884e7

Browse files
authored
Merge pull request #4270 from ProvableHQ/fix/sync-avoid-peer-ban
[Fix] Avoid banning peers that are still responding to block requests
2 parents 996e92d + 06e27ac commit 5e884e7

3 files changed

Lines changed: 37 additions & 10 deletions

File tree

.github/workflows/benchmarks.yml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,11 @@ jobs:
5656
unzip ledger.zip
5757
5858
- name: Install snarkOS (test_network)
59-
run: |
60-
# use `test_network` flag without snarkVM `dev_println`
61-
cargo install --path=. --locked --features=test_consensus_heights,test_targets
59+
run: cargo install --path=. --locked --features=test_network
6260

6361
# Download previous benchmark result from cache (if exists)
6462
- name: Download previous benchmark data
65-
uses: actions/cache@v4
63+
uses: actions/cache@v5
6664
with:
6765
path: ./cache
6866
key: ${{ runner.os }}-benchmark

node/rest/src/routes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1269,7 +1269,7 @@ impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
12691269
.map_err(|e| {
12701270
let msg = e.to_string();
12711271
if let Some(stripped) = msg.strip_prefix("404: ") {
1272-
RestError::not_found(anyhow!("{}", stripped))
1272+
RestError::not_found(anyhow!("{stripped}"))
12731273
} else {
12741274
RestError::internal_server_error(e)
12751275
}

node/sync/src/block_sync.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,12 @@ pub struct BlockSync<N: Network> {
235235
/// Tracks failed requests that need to be re-issued.
236236
failed_requests: Mutex<FailedRequests<N::BlockHash>>,
237237

238+
/// Tracks the last time each peer delivered a successful block response.
239+
///
240+
/// Used in `handle_block_request_timeouts` to avoid banning peers that are actively
241+
/// responding but cannot keep up with the request rate.
242+
last_response_at: Mutex<HashMap<SocketAddr, Instant>>,
243+
238244
/// Condition variable that wakes up waiting tasks when the node is synced.
239245
synced_notify: Notify,
240246
}
@@ -258,6 +264,7 @@ impl<N: Network> BlockSync<N> {
258264
metrics: Default::default(),
259265
prepare_requests_lock: Default::default(),
260266
failed_requests: Default::default(),
267+
last_response_at: Default::default(),
261268
synced_notify: Default::default(),
262269
}
263270
}
@@ -971,6 +978,8 @@ impl<N: Network> BlockSync<N> {
971978
self.locators.write().remove(peer_ip);
972979
// Remove all common ancestor entries for this peers.
973980
self.common_ancestors.write().retain(|pair, _| !pair.contains(peer_ip));
981+
// Drop the last-response timestamp so a reconnecting peer starts fresh.
982+
self.last_response_at.lock().remove(peer_ip);
974983
// Remove all block requests to the peer.
975984
self.remove_block_requests_to_peer(peer_ip);
976985

@@ -1298,6 +1307,10 @@ impl<N: Network> BlockSync<N> {
12981307

12991308
trace!("Received a new and valid block response for height {height}");
13001309

1310+
// Record that this peer is actively responding. Used by `handle_block_request_timeouts`
1311+
// to avoid banning peers that are slow but making progress.
1312+
self.last_response_at.lock().insert(peer_ip, Instant::now());
1313+
13011314
// Notify the sync loop that something changed.
13021315
self.response_notify.notify_one();
13031316

@@ -1395,6 +1408,19 @@ impl<N: Network> BlockSync<N> {
13951408
///
13961409
/// Timed-out requests will be marked as "failed" and re-issued on the next call to `prepare_block_requests`.
13971410
pub fn handle_block_request_timeouts(&self) {
1411+
// Snapshot last-response times before locking `requests`. A request whose assigned peer
1412+
// has responded within `BLOCK_REQUEST_TIMEOUT` is not timed out, even if its own timer
1413+
// has elapsed — the peer is keeping up with a backlog and timing this request out would
1414+
// just churn it through `failed_requests` and lose its place in the queue.
1415+
let responsive_peers: HashSet<SocketAddr> = {
1416+
let last_response_at = self.last_response_at.lock();
1417+
let now = Instant::now();
1418+
last_response_at
1419+
.iter()
1420+
.filter_map(|(peer, t)| (now.duration_since(*t) <= BLOCK_REQUEST_TIMEOUT).then_some(*peer))
1421+
.collect()
1422+
};
1423+
13981424
// Avoid locking `locators` and `requests` at the same time.
13991425
let (timed_out_requests, peers_to_ban) = {
14001426
// Acquire the write lock on the requests map.
@@ -1419,9 +1445,11 @@ impl<N: Network> BlockSync<N> {
14191445
let timer_elapsed = now.duration_since(e.timestamp) > BLOCK_REQUEST_TIMEOUT;
14201446
// Determine if the request is complete.
14211447
let is_complete = e.sync_ips().is_empty() && e.response.is_some();
1448+
// If any assigned peer is still actively responding, the request is not stuck.
1449+
let has_responsive_peer = e.sync_ips().iter().any(|ip| responsive_peers.contains(ip));
14221450

14231451
// Determine if the request has timed out.
1424-
let is_timeout = timer_elapsed && !is_complete;
1452+
let is_timeout = timer_elapsed && !is_complete && !has_responsive_peer;
14251453

14261454
// Retain if this is not a timeout and is not obsolete.
14271455
let retain = !is_timeout && !is_obsolete;
@@ -1470,7 +1498,9 @@ impl<N: Network> BlockSync<N> {
14701498
}
14711499
}
14721500

1473-
// Now remove and ban any unresponsive peers
1501+
// Remove and ban the unresponsive peers. The `has_responsive_peer` check inside `retain`
1502+
// above already guarantees that a request only counts as timed out when none of its
1503+
// assigned peers have responded recently, so every peer in this set is unresponsive.
14741504
for peer_ip in peers_to_ban {
14751505
self.remove_peer(&peer_ip);
14761506
// TODO: Uncomment this when we have a more rigorous analysis and testing of peer banning.
@@ -1603,9 +1633,7 @@ impl<N: Network> BlockSync<N> {
16031633

16041634
for height in start_height..end_height {
16051635
// Ensure the current height is not in the ledger or already requested.
1606-
if let Err(err) = self.check_block_request(height) {
1607-
trace!("{err}");
1608-
1636+
if self.check_block_request(height).is_err() {
16091637
// If the sequence of block requests is interrupted, then return early.
16101638
// Otherwise, continue until the first start height that is new.
16111639
match request_hashes.is_empty() {
@@ -1793,6 +1821,7 @@ mod tests {
17931821
advance_with_sync_blocks_lock: Default::default(),
17941822
metrics: Default::default(),
17951823
prepare_requests_lock: Default::default(),
1824+
last_response_at: Default::default(),
17961825
}
17971826
}
17981827

0 commit comments

Comments
 (0)