Skip to content

Commit 8efaa0e

Browse files
committed
Add /proc/net/dev sampler + configurable tick period
Folds network byte counters into the existing CPU sampler task (one timer, one shutdown handle). Each tick the sampler reads `/proc/net/dev`, drops loopback, and emits one row per non-lo interface with byte deltas vs the previous tick: net_node{N}.csv : t_ns, iface, rx_bytes, tx_bytes `rx_bytes`/`tx_bytes` are the raw deltas; rate is derived downstream as `bytes / (t_ns - prev_t_ns_for_same_iface) * 1e9`, so the tick period can be tuned without breaking the schema. * `--sampler-tick-ms` CLI flag on `new-protocol-bench-node` (default 50 ms) lets the operator change the sampling cadence per run. * `collect-results.py` fetches `net_{name}.csv` alongside the other optional CSVs (silent skip when absent). * `TraceData.net_samples()` / `net_window()` mirror the CPU/core accessors with the same lazy-load + cache pattern. Verifies the network-saturation hypothesis directly: on instances with constrained bandwidth (c8g.xlarge: 1.875 Gbps) tx_bytes is expected to flat-top during proposal-broadcast windows; on c8gn instances the same workload should leave significant headroom.
1 parent ed73db8 commit 8efaa0e

4 files changed

Lines changed: 95 additions & 1 deletion

File tree

crates/hotshot/new-protocol/bench/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ pub struct NodeConfig {
3737
/// Block payload size in bytes.
3838
#[arg(long, default_value_t = 0)]
3939
pub block_size: usize,
40+
41+
/// Period between CPU + network sampler ticks (milliseconds). 50ms is the
42+
/// default; lower values give finer resolution at the cost of more
43+
/// /proc reads per second.
44+
#[arg(long, default_value_t = 50)]
45+
pub sampler_tick_ms: u64,
4046
}
4147

4248
impl NodeConfig {

crates/hotshot/new-protocol/bench/src/cpu_sampler.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ struct Inner {
3333
out_dir: PathBuf,
3434
cpu_rows: Mutex<Vec<CpuRow>>,
3535
core_rows: Mutex<Vec<CoreRow>>,
36+
net_rows: Mutex<Vec<NetRow>>,
3637
}
3738

3839
#[derive(Serialize)]
@@ -54,6 +55,18 @@ struct CoreRow {
5455
idle_pct: f64,
5556
}
5657

58+
/// Per-interface byte deltas since the previous tick. Loopback (`lo`) is
59+
/// dropped. Rate is derived downstream as
60+
/// `rx_bytes / (t_ns_curr - t_ns_prev_for_same_iface) * 1e9`, so the tick
61+
/// period can be changed without breaking the schema.
62+
#[derive(Serialize)]
63+
struct NetRow {
64+
t_ns: i128,
65+
iface: String,
66+
rx_bytes: u64,
67+
tx_bytes: u64,
68+
}
69+
5770
impl CpuSampler {
5871
/// Start sampling. `out_dir` is the directory next to `leader_trace_node*.csv`.
5972
pub fn start(node_id: u64, out_dir: PathBuf, tick: Duration) -> Self {
@@ -62,6 +75,7 @@ impl CpuSampler {
6275
out_dir,
6376
cpu_rows: Mutex::new(Vec::with_capacity(4096)),
6477
core_rows: Mutex::new(Vec::with_capacity(4096)),
78+
net_rows: Mutex::new(Vec::with_capacity(4096)),
6579
});
6680

6781
let join = {
@@ -96,6 +110,7 @@ async fn run_sampler(inner: Arc<Inner>, tick: Duration) {
96110
let mut prev_proc: Option<(u64, u64)> = None;
97111
let mut prev_threads: HashMap<i64, (u64, u64)> = HashMap::new();
98112
let mut prev_cores: HashMap<u32, CoreTicks> = HashMap::new();
113+
let mut prev_net: HashMap<String, (u64, u64)> = HashMap::new();
99114
let clk_tck = clk_tck();
100115
let mut ticker = tokio::time::interval(tick);
101116
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
@@ -175,6 +190,25 @@ async fn run_sampler(inner: Arc<Inner>, tick: Duration) {
175190
prev_cores.insert(cpu_id, ticks);
176191
}
177192
}
193+
194+
// ---- per-interface /proc/net/dev ----
195+
if let Some(ifaces) = read_proc_net_dev() {
196+
for (iface, rx, tx) in ifaces {
197+
if let Some(&(prev_rx, prev_tx)) = prev_net.get(&iface) {
198+
let drx = rx.saturating_sub(prev_rx);
199+
let dtx = tx.saturating_sub(prev_tx);
200+
if drx > 0 || dtx > 0 {
201+
inner.net_rows.lock().push(NetRow {
202+
t_ns,
203+
iface: iface.clone(),
204+
rx_bytes: drx,
205+
tx_bytes: dtx,
206+
});
207+
}
208+
}
209+
prev_net.insert(iface, (rx, tx));
210+
}
211+
}
178212
}
179213
}
180214

@@ -206,6 +240,16 @@ fn flush(inner: &Inner) -> std::io::Result<()> {
206240
}
207241
wtr.flush()?;
208242
}
243+
244+
let net_rows = std::mem::take(&mut *inner.net_rows.lock());
245+
if !net_rows.is_empty() {
246+
let path = inner.out_dir.join(format!("net_node{}.csv", inner.node_id));
247+
let mut wtr = csv::Writer::from_path(&path)?;
248+
for r in net_rows {
249+
wtr.serialize(r).map_err(|e| std::io::Error::other(e.to_string()))?;
250+
}
251+
wtr.flush()?;
252+
}
209253
Ok(())
210254
}
211255

@@ -279,6 +323,34 @@ fn read_proc_stat_cores() -> Option<Vec<(u32, CoreTicks)>> {
279323
Some(out)
280324
}
281325

326+
/// Read `/proc/net/dev` and return cumulative `(iface, rx_bytes, tx_bytes)`
327+
/// for every non-loopback interface. Each line looks like
328+
///
329+
/// ```text
330+
/// ens5: 1234567890 9876543 0 0 ... 87654321098 1234567 ...
331+
/// ```
332+
///
333+
/// where the first field after the colon is `rx_bytes` and the 9th is
334+
/// `tx_bytes` (Linux man-page order).
335+
#[cfg(target_os = "linux")]
336+
fn read_proc_net_dev() -> Option<Vec<(String, u64, u64)>> {
337+
let s = std::fs::read_to_string("/proc/net/dev").ok()?;
338+
let mut out = Vec::new();
339+
for line in s.lines().skip(2) {
340+
let (name, rest) = line.split_once(':')?;
341+
let iface = name.trim();
342+
if iface == "lo" || iface.is_empty() {
343+
continue;
344+
}
345+
let nums: Vec<u64> = rest.split_whitespace().filter_map(|f| f.parse().ok()).collect();
346+
if nums.len() < 9 {
347+
continue;
348+
}
349+
out.push((iface.to_string(), nums[0], nums[8]));
350+
}
351+
Some(out)
352+
}
353+
282354
#[cfg(target_os = "linux")]
283355
fn clk_tck() -> u64 {
284356
// SAFETY: sysconf with a valid name is always safe.
@@ -324,4 +396,15 @@ mod tests {
324396
let s = std::fs::read_to_string("/proc/self/stat").expect("read");
325397
let _ = parse_stat(&s).expect("parse self");
326398
}
399+
400+
#[test]
401+
fn reads_real_proc_net_dev() {
402+
let ifs = read_proc_net_dev().expect("proc/net/dev present");
403+
// At least one non-loopback interface should be present on any
404+
// realistic Linux build host. We don't assert *which* one.
405+
assert!(!ifs.is_empty(), "expected at least one non-lo interface");
406+
for (iface, _rx, _tx) in &ifs {
407+
assert_ne!(iface, "lo");
408+
}
409+
}
327410
}

crates/hotshot/new-protocol/bench/src/node.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,11 @@ pub async fn run(cfg: NodeConfig) -> Result<()> {
6363
.parent()
6464
.map(|p| p.to_path_buf())
6565
.unwrap_or_default();
66-
let cpu_sampler = CpuSampler::start(cfg.node_id, cpu_out_dir, Duration::from_millis(50));
66+
let cpu_sampler = CpuSampler::start(
67+
cfg.node_id,
68+
cpu_out_dir,
69+
Duration::from_millis(cfg.sampler_tick_ms),
70+
);
6771

6872
let coordinator = build_coordinator(
6973
public_key,

crates/hotshot/new-protocol/bench/tests/smoke.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ fn node_config(
3939
.to_string_lossy()
4040
.into_owned(),
4141
block_size,
42+
sampler_tick_ms: 50,
4243
}
4344
}
4445

0 commit comments

Comments
 (0)