Skip to content

Commit bda8a95

Browse files
authored
feat: track and persist session peer outcomes (#106)
* feat: add session outcome fields to `PeerReputation` Adds `last_success`, `last_tried`, and `consecutive_failures` to `PeerReputation`, plus a `record_connection_failure` method on `PeerReputationManager`. All new fields use `#[serde(default)]` so existing `reputations.json` files load without migration. `record_connection_attempt` now sets `last_tried` and `record_successful_connection` sets `last_success` and resets `consecutive_failures`. * test: cover `PeerReputation` session outcome transitions Adds unit tests for default values, `last_tried` on attempt, `last_success` plus `consecutive_failures` reset on success, failure streak increment preserving `last_success`, and legacy `reputations.json` decoding with missing fields. * feat: bump `AddrV2.time` on successful handshake Adds `AddrV2Handler::mark_seen` to refresh the stored timestamp for a directly observed peer, preserving existing services for known entries and inserting a fresh entry otherwise. `connect_to_peer` now calls `mark_seen` after a successful handshake so the `peers.dat` time reflects first-hand observation instead of gossip. * feat: track peer connection outcomes in network manager Calls `record_connection_failure` on both the TCP connect failure and the handshake failure paths in `PeerNetworkManager::connect_to_peer`, so the `consecutive_failures` streak reflects every unsuccessful attempt. * refactor: drop backward-compat shims from `PeerReputation` Removes `#[serde(default)]` on the new session outcome fields and the legacy-JSON load test. Backward compatibility of `reputations.json` across versions is no longer a requirement, so the shims and test are dead weight. * refactor: thread peer-advertised services into `mark_seen` * docs: clarify `last_connection` vs `last_tried` on `PeerReputation` * refactor: extract `make_addr_message` helper in `addrv2` * refactor: defensively set `last_tried` in `record_connection_failure` * refactor: add atomic `record_failure_with_penalty` and use at failure sites * chore: apply `cargo fmt` * refactor: clamp `consecutive_failures` on deserialization * refactor: extract `apply_score_change` and always update `last_tried` on failure * test: cover `record_failure_with_penalty` directly * refactor: clamp \`consecutive_failures\` at runtime and consolidate failure-field updates Extract private `record_failure_fields` that applies `last_tried = now` and `consecutive_failures.saturating_add(1).min(MAX_CONSECUTIVE_FAILURES)`. Both `record_connection_failure` and `record_failure_with_penalty` now delegate to it, eliminating duplicated mutations and capping the in-memory streak at the same 1000 limit enforced by the deserializer. * test: cover \`consecutive_failures\` clamp, \`last_success\` preservation, \`mark_seen\` eviction - Assert `last_success` is unchanged after `record_failure_with_penalty` - Deserialize a `PeerReputation` with `consecutive_failures: 99999` and assert it clamps to `MAX_CONSECUTIVE_FAILURES` - Fill `AddrV2Handler` to capacity and assert `mark_seen` stays bounded and includes the new entry * refactor: remove unused `record_connection_failure` * test: stabilize eviction test and cover runtime `consecutive_failures` saturation * refactor: document and assert non-negative contract on `record_failure_with_penalty` * test: cover `last_tried` preservation on success and update on failure * fix: clamp negative `score_change` in `record_failure_with_penalty` * test: cover happy-path attempt to success lifecycle * refactor: tighten `clamp_future_system_time` bounds and enforce load invariants Add a 30-day lower bound to `clamp_future_system_time` so stale or corrupted timestamps (including epoch 0) are discarded on load, in addition to future ones. Add `PeerReputation::normalize_after_load` and call it from the storage load path. It resets `consecutive_failures` to 0 whenever `last_tried` is `None`, preventing the inconsistent state where a non-zero failure streak has no temporal anchor. * test: cover `clamp_future_system_time` edge cases Add three tests: future timestamp rejected, epoch-zero rejected (exercising the new lower bound), and recent-past timestamp preserved. * docs: clarify zero-`score_change` contract on `record_failure_with_penalty` A value of 0 is a deliberate no-op for the reputation score but still records the failure counter and timestamp, which is useful for failures that should be tracked without contributing toward a ban. * fix: use `checked_sub` in `clamp_future_system_time` to avoid panic on broken clocks * test: cover `normalize_after_load` via storage round-trip * fix: use `checked_add` in `clamp_future_system_time` to avoid panic on far-future clocks * test: also cover stale-timestamp path in `normalize_after_load` round-trip * refactor: drop 30-day stale-timestamp floor from `clamp_future_system_time` Remove `TIMESTAMP_MAX_AGE` and the `floor` computation that rejected timestamps older than 30 days. The future-timestamp guard (10-second tolerance) is the only meaningful constraint. Update the `normalize_after_load` doc comment to drop the "stale" reference. * test: remove obsolete stale-timestamp tests and consolidate `clamp_future_system_time` coverage Delete `test_normalize_after_load_via_storage_round_trip_stale` (tested the removed stale-floor path). Merge the three `test_clamp_future_system_time_*` tests into a single `test_clamp_future_system_time` covering future rejection and recent-past acceptance. * fix: \`mark_seen\` now overwrites \`services\` on existing entries Since round 1 the caller passes the actual handshake-negotiated services, so preserving the gossip-sourced value was inverted. The handshake-observed value is authoritative and is now written on both new and existing entries. Rename \`test_mark_seen_bumps_time_and_preserves_services\` to \`test_mark_seen_bumps_time_and_updates_services\` and update its assertion to expect the handshake services, not the original gossip services. * test: cover positive \`normalize_after_load\` branch Add \`test_normalize_after_load_preserves_failures_when_last_tried_valid\` to assert that a valid (non-future) \`last_tried\` and a non-zero \`consecutive_failures\` are both preserved through the load round-trip, complementing the existing reset-path test. * refactor: move `record_failure_fields` to `impl PeerReputation` * refactor: move `apply_score_change` to `impl PeerReputation`
1 parent 853d3e3 commit bda8a95

5 files changed

Lines changed: 613 additions & 62 deletions

File tree

dash-spv/src/network/addrv2.rs

Lines changed: 120 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
use rand::prelude::*;
44
use std::collections::{HashMap, HashSet};
5-
use std::net::SocketAddr;
5+
use std::net::{IpAddr, SocketAddr};
66
use std::sync::Arc;
77
use std::time::{Duration, SystemTime, UNIX_EPOCH};
88
use tokio::sync::RwLock;
@@ -26,6 +26,19 @@ fn evict_if_needed(peers: &mut HashMap<SocketAddr, AddrV2Message>) {
2626
}
2727
}
2828

29+
fn make_addr_message(addr: SocketAddr, services: ServiceFlags, time: u32) -> AddrV2Message {
30+
let addr_v2 = match addr.ip() {
31+
IpAddr::V4(ipv4) => AddrV2::Ipv4(ipv4),
32+
IpAddr::V6(ipv6) => AddrV2::Ipv6(ipv6),
33+
};
34+
AddrV2Message {
35+
time,
36+
services,
37+
addr: addr_v2,
38+
port: addr.port(),
39+
}
40+
}
41+
2942
/// Handler for AddrV2 peer exchange protocol
3043
pub struct AddrV2Handler {
3144
/// Known peer addresses from AddrV2 messages
@@ -134,21 +147,36 @@ impl AddrV2Handler {
134147
})
135148
.as_secs() as u32;
136149

137-
let addr_v2 = match addr.ip() {
138-
std::net::IpAddr::V4(ipv4) => AddrV2::Ipv4(ipv4),
139-
std::net::IpAddr::V6(ipv6) => AddrV2::Ipv6(ipv6),
140-
};
150+
let mut known_peers = self.known_peers.write().await;
151+
known_peers.insert(addr, make_addr_message(addr, services, now));
152+
evict_if_needed(&mut known_peers);
153+
}
141154

142-
let addr_msg = AddrV2Message {
143-
time: now,
144-
services,
145-
addr: addr_v2,
146-
port: addr.port(),
147-
};
155+
/// Bump the stored `AddrV2.time` for `addr` to now after directly observing
156+
/// the peer (e.g. a successful handshake) and record the handshake-verified
157+
/// `services`. A first-hand observation is authoritative, so both `time` and
158+
/// `services` are overwritten on existing entries. If the entry is missing it
159+
/// is created with the provided values.
160+
pub async fn mark_seen(&self, addr: SocketAddr, services: ServiceFlags) {
161+
let now = SystemTime::now()
162+
.duration_since(UNIX_EPOCH)
163+
.unwrap_or_else(|e| {
164+
tracing::error!("System time error in mark_seen: {}", e);
165+
Duration::from_secs(0)
166+
})
167+
.as_secs() as u32;
148168

149169
let mut known_peers = self.known_peers.write().await;
150-
known_peers.insert(addr, addr_msg);
151-
evict_if_needed(&mut known_peers);
170+
match known_peers.get_mut(&addr) {
171+
Some(existing) => {
172+
existing.time = now;
173+
existing.services = services;
174+
}
175+
None => {
176+
known_peers.insert(addr, make_addr_message(addr, services, now));
177+
evict_if_needed(&mut known_peers);
178+
}
179+
}
152180
}
153181

154182
/// Build a GetAddr response message
@@ -187,6 +215,84 @@ mod tests {
187215
assert_eq!(known[0].socket_addr().unwrap(), addr);
188216
}
189217

218+
#[tokio::test]
219+
async fn test_mark_seen_bumps_time_and_updates_services() {
220+
let handler = AddrV2Handler::new();
221+
let addr: SocketAddr = "10.0.0.5:9999".parse().unwrap();
222+
223+
// Seed via AddrV2 gossip with a stale-but-valid timestamp and richer services.
224+
let gossip_services = ServiceFlags::NETWORK | ServiceFlags::COMPACT_FILTERS;
225+
let ipv4_addr = match addr.ip() {
226+
IpAddr::V4(v4) => v4,
227+
_ => panic!("test expects IPv4"),
228+
};
229+
let now =
230+
SystemTime::now().duration_since(UNIX_EPOCH).expect("system time").as_secs() as u32;
231+
let stale_time = now.saturating_sub(3600);
232+
handler
233+
.handle_addrv2(vec![AddrV2Message {
234+
time: stale_time,
235+
services: gossip_services,
236+
addr: AddrV2::Ipv4(ipv4_addr),
237+
port: addr.port(),
238+
}])
239+
.await;
240+
241+
// Observe the peer directly — handshake-verified services are authoritative.
242+
let handshake_services = ServiceFlags::NETWORK;
243+
handler.mark_seen(addr, handshake_services).await;
244+
245+
let known = handler.get_known_addresses().await;
246+
let entry = known.iter().find(|m| m.socket_addr().ok() == Some(addr)).expect("entry");
247+
assert!(entry.time >= now);
248+
assert!(entry.time > stale_time);
249+
assert_eq!(entry.services, handshake_services);
250+
}
251+
252+
#[tokio::test]
253+
async fn test_mark_seen_inserts_new_entry() {
254+
let handler = AddrV2Handler::new();
255+
let addr: SocketAddr = "10.0.0.6:9999".parse().unwrap();
256+
257+
assert!(handler.get_known_addresses().await.is_empty());
258+
259+
handler.mark_seen(addr, ServiceFlags::NETWORK).await;
260+
261+
let known = handler.get_known_addresses().await;
262+
assert_eq!(known.len(), 1);
263+
assert_eq!(known[0].socket_addr().unwrap(), addr);
264+
assert_eq!(known[0].services, ServiceFlags::NETWORK);
265+
}
266+
267+
#[tokio::test]
268+
async fn test_mark_seen_evicts_when_at_capacity() {
269+
let handler = AddrV2Handler::new();
270+
271+
// Use staggered timestamps strictly older than the mark_seen call below so
272+
// the new entry is definitively the freshest and survives eviction.
273+
let base_time =
274+
(SystemTime::now().duration_since(UNIX_EPOCH).expect("system time").as_secs() as u32)
275+
.saturating_sub(ONE_WEEK / 2);
276+
277+
let msgs: Vec<AddrV2Message> = (0..MAX_ADDR_TO_STORE)
278+
.map(|i| {
279+
let addr: SocketAddr =
280+
format!("10.{}.{}.1:9999", i / 256, i % 256).parse().unwrap();
281+
make_addr_message(addr, ServiceFlags::NETWORK, base_time - i as u32)
282+
})
283+
.collect();
284+
handler.handle_addrv2(msgs).await;
285+
286+
assert_eq!(handler.get_known_addresses().await.len(), MAX_ADDR_TO_STORE);
287+
288+
let new_addr: SocketAddr = "192.168.99.99:9999".parse().unwrap();
289+
handler.mark_seen(new_addr, ServiceFlags::NETWORK).await;
290+
291+
let known = handler.get_known_addresses().await;
292+
assert_eq!(known.len(), MAX_ADDR_TO_STORE);
293+
assert!(known.iter().any(|m| m.socket_addr().ok() == Some(new_addr)));
294+
}
295+
190296
#[tokio::test]
191297
async fn test_addrv2_timestamp_validation() {
192298
let handler = AddrV2Handler::new();
@@ -199,7 +305,7 @@ mod tests {
199305
let addr: SocketAddr =
200306
"127.0.0.1:9999".parse().expect("Failed to parse test socket address");
201307
let ipv4_addr = match addr.ip() {
202-
std::net::IpAddr::V4(v4) => v4,
308+
IpAddr::V4(v4) => v4,
203309
_ => panic!("Test expects IPv4 address but got IPv6"),
204310
};
205311

dash-spv/src/network/handshake.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,11 @@ impl HandshakeManager {
301301
self.peer_version
302302
}
303303

304+
/// Get the service flags advertised by the peer in its version message.
305+
pub fn peer_services(&self) -> Option<ServiceFlags> {
306+
self.peer_services
307+
}
308+
304309
/// Check if peer supports headers2 compression.
305310
pub fn peer_supports_headers2(&self) -> bool {
306311
self.peer_services.map(|services| services.has(NODE_HEADERS_COMPRESSED)).unwrap_or(false)

dash-spv/src/network/manager.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,10 @@ impl PeerNetworkManager {
265265
tracing::warn!("Failed to send GetAddr to {}: {}", addr, e);
266266
}
267267

268+
// Capture peer-advertised services before the peer is moved into the pool.
269+
let peer_services =
270+
handshake_manager.peer_services().unwrap_or(ServiceFlags::NETWORK);
271+
268272
// Record successful connection
269273
reputation_manager.record_successful_connection(addr).await;
270274

@@ -290,8 +294,9 @@ impl PeerNetworkManager {
290294
best_height,
291295
});
292296

293-
// Add to known addresses
294-
addrv2_handler.add_known_address(addr, ServiceFlags::NETWORK).await;
297+
// Bump the AddrV2 time on direct observation, using the peer's
298+
// actual advertised services from the version message.
299+
addrv2_handler.mark_seen(addr, peer_services).await;
295300

296301
// // Start message reader for this peer
297302
Self::start_peer_reader(
@@ -311,9 +316,8 @@ impl PeerNetworkManager {
311316
tracing::warn!("Handshake failed with {}: {}", addr, e);
312317
// Only clears connecting set. Peer was never added, so no count/event needed.
313318
pool.remove_peer(&addr).await;
314-
// Update reputation for handshake failure
315319
reputation_manager
316-
.update_reputation(
320+
.record_failure_with_penalty(
317321
addr,
318322
misbehavior_scores::INVALID_MESSAGE,
319323
"Handshake failed",
@@ -328,9 +332,8 @@ impl PeerNetworkManager {
328332
tracing::debug!("Failed to connect to {}: {}", addr, e);
329333
// Only clears connecting set. Peer was never added, so no count/event needed.
330334
pool.remove_peer(&addr).await;
331-
// Minor reputation penalty for connection failure
332335
reputation_manager
333-
.update_reputation(
336+
.record_failure_with_penalty(
334337
addr,
335338
misbehavior_scores::TIMEOUT / 2,
336339
"Connection failed",

0 commit comments

Comments
 (0)