Skip to content

Commit 1d9828f

Browse files
authored
[Backport release-mainnet-1.1.0-rc #4327] chore(libp2p): reduce log noise via debug demotion and periodic summary (#4371)
chore(libp2p): reduce log noise via debug demotion and periodic summary (#4327) * chore(libp2p): reduce log noise via debug demotion and periodic summary - demote per-event WARNs (dial failures, Kademlia NotFound, auth handshake failures, slow gossipsub peers, DHT internal disagreement, direct-message failures, gossip publish failures, DHT save concurrency, bootstrap duplicate-start) from WARN/ERROR to DEBUG; production WARN volume drops from ~5800/hr to <=1/min per node - add network::log_summary with 16 atomic counters and an idempotent spawn_summary_task that emits one aggregated WARN line per minute when any counter is non-zero, format is alphabetical and emits only non-zero values for stable parsing - counters increment at every demoted site before the debug call so totals survive even when DEBUG is filtered out - wire counters across libp2p-networking, hotshot, and hotshot-task-impls; add hotshot-libp2p-networking workspace dep to task-impls - remove libp2p=off from .env now that crate-level log gating is unnecessary * refactor(libp2p): emit log summary as structured tracing fields - replace free-form "name=value name=value" message with a warn! event carrying one named tracing field per counter (auth_failures, dial_failures, ...), plus interval_seconds, so structured backends like Datadog index and filter on each counter without regex - drain counters atomically into a Snapshot struct; emit only when any counter is non-zero - always emit all counters, including zero values, so Datadog field schemas stay stable across intervals - add tracing-test dev-dep and a traced_test that asserts named fields appear in the emitted event * chore(libp2p): demote log summary to info, omit zero counters - emit the libp2p periodic summary at info level; a heartbeat of normal p2p churn is informational, not warn-worthy - format only non-zero counters; zero values were padding the line without helping operators or Datadog filters - replace Snapshot struct with a flat COUNTERS table; drain_and_format iterates once, returns None when everything is zero - richer per-counter metrics belong in the existing Prometheus Libp2pMetricsValue path, not in this log line * refactor(libp2p): shrink log_summary module - collapse 16 static counters + COUNTERS table into a single counters! macro - drop plan-traceability test comments and the per-counter unit tests; keep one smoke test for emit behavior and one for concurrent correctness - trim docstring to a single paragraph - use thread::scope in the concurrency test, no manual join * chore(libp2p): drop unused return value of spawn_summary_task Also drop obvious comments restating function names. * refactor(libp2p): replace static counters with LogEvent::record() API - log_summary: 16 pub statics become one LogEvent enum + parallel NAMES/COUNTERS arrays indexed by `event as usize`; record is a method on LogEvent - call sites change from log_summary::X.fetch_add(1, Ordering::Relaxed); to LogEvent::X.record(); one import per file, one reach per call * chore(libp2p): drop log_summary header * fix(libp2p): only record DhtDisagreementGivenUp on terminal failure The counter (and the misleading "Giving up because out of retries" log) were firing unconditionally, including for retries that succeeded. Move both into the else branch where retry_count == 0. Also switch the SPAWNED CAS from Relaxed/Relaxed to AcqRel/Acquire, the idiomatic ordering for a "do once" guard. (cherry picked from commit be6ff77)
1 parent d21671b commit 1d9828f

16 files changed

Lines changed: 241 additions & 56 deletions

File tree

.env

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# sequencer service.
1616
#
1717

18-
RUST_LOG=info,libp2p=off,cliquenet=error
18+
RUST_LOG=info,cliquenet=error
1919
RUST_LOG_FORMAT=full
2020

2121
# Parallelism config

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/hotshot/hotshot/src/traits/networking/libp2p_network.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use hotshot_libp2p_networking::{
3939
record::{Namespace, RecordKey, RecordValue},
4040
store::persistent::DhtPersistentStorage,
4141
},
42+
log_summary::LogEvent,
4243
spawn_network_node,
4344
transport::construct_auth_message,
4445
},
@@ -74,7 +75,7 @@ use tokio::{
7475
},
7576
time::sleep,
7677
};
77-
use tracing::{error, info, instrument, trace, warn};
78+
use tracing::{debug, error, info, instrument, trace, warn};
7879

7980
use crate::{BroadcastDelay, EpochMembershipCoordinator};
8081

@@ -609,7 +610,8 @@ impl<T: NodeType> Libp2pNetwork<T> {
609610
if latest_seen_view.load(Ordering::Relaxed) + THRESHOLD <= *view_number {
610611
// look up
611612
if let Err(err) = handle.lookup_node(&pk, dht_timeout).await {
612-
warn!("Failed to perform lookup for key {pk}: {err}");
613+
LogEvent::DhtLookupFailure.record();
614+
debug!("Failed to perform lookup for key {pk}: {err}");
613615
};
614616
}
615617
}

crates/hotshot/libp2p-networking/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ tracing-subscriber = { workspace = true }
3838

3939
[dev-dependencies]
4040
hotshot-example-types = { workspace = true }
41+
tracing-test = { workspace = true }
4142

4243
[lints]
4344
workspace = true

crates/hotshot/libp2p-networking/src/network/behaviours/dht/bootstrap.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ impl DHTBootstrapTask {
5656
break;
5757
},
5858
Some(InputEvent::StartBootstrap) => {
59-
tracing::warn!("Trying to start bootstrap that's already in progress");
59+
tracing::debug!("Trying to start bootstrap that's already in progress");
6060
continue;
6161
},
6262
None => {

crates/hotshot/libp2p-networking/src/network/behaviours/dht/mod.rs

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ lazy_static! {
4242
}
4343

4444
use super::exponential_backoff::ExponentialBackoff;
45-
use crate::network::{ClientRequest, NetworkEvent};
45+
use crate::network::{ClientRequest, NetworkEvent, log_summary::LogEvent};
4646

4747
/// Behaviour wrapping libp2p's kademlia
4848
/// included:
@@ -184,7 +184,9 @@ impl<K: SignatureKey + 'static, D: DhtPersistentStorage> DHTBehaviour<K, D> {
184184
// The key already exists in the cache, send the value to all channels
185185
for chan in chans {
186186
if chan.send(entry.value.clone()).is_err() {
187-
warn!("Get DHT: channel closed before get record request result could be sent");
187+
debug!(
188+
"Get DHT: channel closed before get record request result could be sent"
189+
);
188190
}
189191
}
190192
} else {
@@ -280,7 +282,8 @@ impl<K: SignatureKey + 'static, D: DhtPersistentStorage> DHTBehaviour<K, D> {
280282
},
281283
},
282284
Err(err) => {
283-
warn!("Error in Kademlia query: {err:?}");
285+
LogEvent::DhtKadQueryError.record();
286+
debug!("Error in Kademlia query: {err:?}");
284287
false
285288
},
286289
},
@@ -323,7 +326,7 @@ impl<K: SignatureKey + 'static, D: DhtPersistentStorage> DHTBehaviour<K, D> {
323326
// Send the record to all channels that are still open
324327
for n in notify {
325328
if n.send(record.value.clone()).is_err() {
326-
warn!(
329+
debug!(
327330
"Get DHT: channel closed before get record request result could \
328331
be sent"
329332
);
@@ -334,25 +337,23 @@ impl<K: SignatureKey + 'static, D: DhtPersistentStorage> DHTBehaviour<K, D> {
334337
}
335338
}
336339
// disagreement => query more nodes
337-
else {
338-
// there is some internal disagreement or not enough nodes returned
339-
// Initiate new query that hits more replicas
340-
if retry_count > 0 {
341-
let new_retry_count = retry_count - 1;
342-
warn!(
343-
"Get DHT: Internal disagreement for get dht request {progress:?}! \
344-
requerying with more nodes. {new_retry_count:?} retries left"
345-
);
346-
self.retry_get(KadGetQuery {
347-
backoff,
348-
progress: DHTProgress::NotStarted,
349-
notify,
350-
key,
351-
retry_count: new_retry_count,
352-
records: Vec::new(),
353-
});
354-
}
355-
warn!(
340+
else if retry_count > 0 {
341+
let new_retry_count = retry_count - 1;
342+
debug!(
343+
"Get DHT: Internal disagreement for get dht request {progress:?}! requerying \
344+
with more nodes. {new_retry_count:?} retries left"
345+
);
346+
self.retry_get(KadGetQuery {
347+
backoff,
348+
progress: DHTProgress::NotStarted,
349+
notify,
350+
key,
351+
retry_count: new_retry_count,
352+
records: Vec::new(),
353+
});
354+
} else {
355+
LogEvent::DhtDisagreementGivenUp.record();
356+
debug!(
356357
"Get DHT: Internal disagreement for get dht request {progress:?}! Giving up \
357358
because out of retries. "
358359
);
@@ -438,7 +439,8 @@ impl<K: SignatureKey + 'static, D: DhtPersistentStorage> DHTBehaviour<K, D> {
438439
if let Some(chan) = self.in_progress_get_closest_peers.remove(&query_id) {
439440
let _: Result<_, _> = chan.send(());
440441
};
441-
warn!("Failed to get closest peers: {e:?}");
442+
LogEvent::DhtClosestPeersFailure.record();
443+
debug!("Failed to get closest peers: {e:?}");
442444
},
443445
},
444446
KademliaEvent::OutboundQueryProgressed {

crates/hotshot/libp2p-networking/src/network/behaviours/dht/store/persistent.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -243,9 +243,9 @@ impl<R: RecordStore, D: DhtPersistentStorage> PersistentStore<R, D> {
243243
semaphore: Arc::new(Semaphore::new(1)),
244244
};
245245

246-
// Try to restore the DHT from the persistent store. If it fails, warn and start with an empty store
246+
// Try to restore the DHT from the persistent store. If it fails, log and start with an empty store
247247
if let Err(err) = store.restore_from_persistent_storage().await {
248-
warn!(
248+
debug!(
249249
"Failed to restore DHT from persistent storage: {err}. Starting with empty store",
250250
);
251251
}
@@ -258,9 +258,9 @@ impl<R: RecordStore, D: DhtPersistentStorage> PersistentStore<R, D> {
258258
///
259259
/// Returns `true` if the DHT was saved, `false` otherwise.
260260
fn try_save_to_persistent_storage(&mut self) -> bool {
261-
// Try to acquire the semaphore, warning if another save operation is already in progress
261+
// Try to acquire the semaphore; if another save operation is already in progress, skip.
262262
let Ok(permit) = Arc::clone(&self.semaphore).try_acquire_owned() else {
263-
warn!(
263+
debug!(
264264
"Skipping DHT save to persistent storage - another save operation is already in \
265265
progress"
266266
);

crates/hotshot/libp2p-networking/src/network/behaviours/direct_message.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ use std::collections::HashMap;
99
use libp2p::request_response::{Event, Message, OutboundRequestId, ResponseChannel};
1010
use libp2p_identity::PeerId;
1111
use tokio::{spawn, sync::mpsc::UnboundedSender, time::sleep};
12-
use tracing::{debug, error, warn};
12+
use tracing::debug;
1313

1414
use super::exponential_backoff::ExponentialBackoff;
15-
use crate::network::{ClientRequest, NetworkEvent};
15+
use crate::network::{ClientRequest, NetworkEvent, log_summary::LogEvent};
1616

1717
/// Request to direct message a peert
1818
#[derive(Debug)]
@@ -58,7 +58,8 @@ impl DMBehaviour {
5858
error,
5959
connection_id: _,
6060
} => {
61-
error!("Inbound message failure from {:?}: {:?}", peer, error);
61+
LogEvent::DirectMessageInboundFailure.record();
62+
debug!("Inbound message failure from {:?}: {:?}", peer, error);
6263
None
6364
},
6465
Event::OutboundFailure {
@@ -67,7 +68,8 @@ impl DMBehaviour {
6768
error,
6869
connection_id: _,
6970
} => {
70-
warn!("Outbound message failure to {:?}: {:?}", peer, error);
71+
LogEvent::DirectMessageOutboundFailure.record();
72+
debug!("Outbound message failure to {:?}: {:?}", peer, error);
7173
if let Some(mut req) = self.in_progress_rr.remove(&request_id) {
7274
if req.retry_count == 0 {
7375
return None;
@@ -106,7 +108,7 @@ impl DMBehaviour {
106108
debug!("Received direct response {:?}", msg);
107109
Some(NetworkEvent::DirectResponse(msg, req.peer_id))
108110
} else {
109-
warn!("Received response for unknown request id {:?}", request_id);
111+
debug!("Received response for unknown request id {:?}", request_id);
110112
None
111113
}
112114
},

crates/hotshot/libp2p-networking/src/network/def.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use super::{
2323
validated::ValidatedStore,
2424
},
2525
cbor,
26+
log_summary::LogEvent,
2627
};
2728

2829
/// Overarching network behaviour performing:
@@ -98,7 +99,8 @@ impl<K: SignatureKey + 'static, D: DhtPersistentStorage> NetworkDef<K, D> {
9899
/// Publish a given gossip
99100
pub fn publish_gossip(&mut self, topic: IdentTopic, contents: Vec<u8>) {
100101
if let Err(e) = self.gossipsub.publish(topic, contents) {
101-
tracing::warn!("Failed to publish gossip message. Error: {:?}", e);
102+
LogEvent::GossipPublishFailure.record();
103+
tracing::debug!("Failed to publish gossip message. Error: {:?}", e);
102104
}
103105
}
104106
/// Subscribe to a given topic
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
use std::{
2+
sync::atomic::{AtomicBool, AtomicU64, Ordering},
3+
time::Duration,
4+
};
5+
6+
use tokio::time::interval;
7+
use tracing::info;
8+
9+
pub const SUMMARY_INTERVAL: Duration = Duration::from_secs(60);
10+
11+
macro_rules! events {
12+
($($variant:ident => $name:literal),* $(,)?) => {
13+
#[derive(Clone, Copy)]
14+
#[repr(usize)]
15+
pub enum LogEvent {
16+
$($variant),*
17+
}
18+
19+
const NAMES: &[&str] = &[$($name),*];
20+
static COUNTERS: [AtomicU64; NAMES.len()] =
21+
[const { AtomicU64::new(0) }; NAMES.len()];
22+
};
23+
}
24+
25+
events! {
26+
AuthFailure => "auth_failures",
27+
AuthHandshakeTimeout => "auth_handshake_timeouts",
28+
DhtClosestPeersFailure => "dht_closest_peers_failures",
29+
DhtDisagreementGivenUp => "dht_disagreements_given_up",
30+
DhtKadQueryError => "dht_kad_query_errors",
31+
DhtLookupFailure => "dht_lookup_failures",
32+
DialFailure => "dial_failures",
33+
DirectMessageInboundFailure => "direct_message_inbound_failures",
34+
DirectMessageOutboundFailure => "direct_message_outbound_failures",
35+
GossipPublishFailure => "gossip_publish_failures",
36+
GossipsubNotSupported => "gossipsub_not_supported",
37+
GossipsubSlowPeer => "gossipsub_slow_peer",
38+
IncomingConnError => "incoming_conn_errors",
39+
ListenerError => "listener_errors",
40+
NetworkSendFailure => "network_send_failures",
41+
VerifyFailure => "verify_failures",
42+
}
43+
44+
impl LogEvent {
45+
pub fn record(self) {
46+
COUNTERS[self as usize].fetch_add(1, Ordering::Relaxed);
47+
}
48+
}
49+
50+
fn drain_and_format() -> Option<String> {
51+
let parts: Vec<String> = COUNTERS
52+
.iter()
53+
.zip(NAMES.iter())
54+
.filter_map(|(counter, name)| {
55+
let value = counter.swap(0, Ordering::Relaxed);
56+
(value != 0).then(|| format!("{name}={value}"))
57+
})
58+
.collect();
59+
(!parts.is_empty()).then(|| parts.join(" "))
60+
}
61+
62+
fn emit_summary() {
63+
if let Some(body) = drain_and_format() {
64+
info!("libp2p {}s summary: {body}", SUMMARY_INTERVAL.as_secs());
65+
}
66+
}
67+
68+
static SPAWNED: AtomicBool = AtomicBool::new(false);
69+
70+
pub fn spawn_summary_task() {
71+
if SPAWNED
72+
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
73+
.is_err()
74+
{
75+
return;
76+
}
77+
tokio::spawn(async move {
78+
let mut ticker = interval(SUMMARY_INTERVAL);
79+
ticker.tick().await; // skip immediate first tick
80+
loop {
81+
ticker.tick().await;
82+
emit_summary();
83+
}
84+
});
85+
}
86+
87+
#[cfg(test)]
88+
mod tests {
89+
use std::sync::{Mutex, MutexGuard, OnceLock, atomic::Ordering};
90+
91+
use tracing_test::traced_test;
92+
93+
use super::{COUNTERS, LogEvent, drain_and_format, emit_summary};
94+
95+
fn test_lock() -> MutexGuard<'static, ()> {
96+
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
97+
LOCK.get_or_init(|| Mutex::new(()))
98+
.lock()
99+
.unwrap_or_else(|p| p.into_inner())
100+
}
101+
102+
fn reset() {
103+
for c in COUNTERS.iter() {
104+
c.store(0, Ordering::Relaxed);
105+
}
106+
}
107+
108+
#[test]
109+
#[traced_test]
110+
fn emits_only_nonzero_and_skips_when_idle() {
111+
let _g = test_lock();
112+
reset();
113+
emit_summary();
114+
assert!(!logs_contain("libp2p"));
115+
116+
for _ in 0..3 {
117+
LogEvent::DialFailure.record();
118+
}
119+
for _ in 0..5 {
120+
LogEvent::AuthFailure.record();
121+
}
122+
emit_summary();
123+
assert!(logs_contain("libp2p 60s summary:"));
124+
assert!(logs_contain("auth_failures=5"));
125+
assert!(logs_contain("dial_failures=3"));
126+
assert!(!logs_contain("verify_failures"));
127+
assert!(drain_and_format().is_none());
128+
}
129+
130+
#[test]
131+
fn concurrent_increments_are_not_lost() {
132+
let _g = test_lock();
133+
reset();
134+
const THREADS: usize = 8;
135+
const PER_THREAD: u64 = 1_000;
136+
std::thread::scope(|s| {
137+
for _ in 0..THREADS {
138+
s.spawn(|| {
139+
for _ in 0..PER_THREAD {
140+
LogEvent::DialFailure.record();
141+
}
142+
});
143+
}
144+
});
145+
let line = drain_and_format().expect("expected a summary");
146+
assert!(line.contains(&format!("dial_failures={}", THREADS as u64 * PER_THREAD)));
147+
}
148+
}

0 commit comments

Comments
 (0)