Skip to content

Commit 2eb009a

Browse files
duncanistaclaude
andauthored
fix(dogstatsd): decouple socket reading from metric processing (#76)
* perf(dogstatsd): optimize reader hot path Reduce per-packet overhead in the reader loop with three optimizations: - Buffer pool: cycle Vec<u8> buffers between reader and processor via an unbounded channel, eliminating heap allocation per read in steady state. Pool size is naturally bounded by the data channel capacity. - Batch drain: after each async read, call try_read_into() in a tight loop to drain all pending kernel packets without re-entering tokio's event loop. Most impactful after hypervisor scheduling pauses on Lambda. - Fast cancellation check: use is_cancelled() (AtomicBool load) at the top of each iteration, with tokio::select! on the async read to ensure the reader exits promptly even on a quiet socket. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * add some more integration tests --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 91cfc1c commit 2eb009a

File tree

2 files changed

+255
-24
lines changed

2 files changed

+255
-24
lines changed

crates/dogstatsd/src/dogstatsd.rs

Lines changed: 140 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,21 @@ impl std::fmt::Display for MessageSource {
7070
}
7171
}
7272

73+
/// A packet read from the transport, carrying a pooled buffer.
74+
/// The actual data is `buf[..len]` — the buffer itself is returned
75+
/// to the pool after processing so it can be reused without allocation.
76+
struct Packet {
77+
buf: Vec<u8>,
78+
len: usize,
79+
source: MessageSource,
80+
}
81+
82+
impl Packet {
83+
fn data(&self) -> &[u8] {
84+
&self.buf[..self.len]
85+
}
86+
}
87+
7388
// BufferReader abstracts transport methods for metric data.
7489
enum BufferReader {
7590
/// UDP socket reader (cross-platform, default transport)
@@ -119,6 +134,22 @@ impl BufferReader {
119134
},
120135
}
121136
}
137+
138+
/// Non-blocking read into the provided buffer. Returns `Ok(Some(...))`
139+
/// if a packet is immediately available, `Ok(None)` if the socket would
140+
/// block. Used after `read_into()` to drain all pending packets from the
141+
/// kernel buffer without re-entering tokio's event loop.
142+
fn try_read_into(&mut self, buf: &mut [u8]) -> std::io::Result<Option<(usize, MessageSource)>> {
143+
match self {
144+
BufferReader::UdpSocket(socket) => match socket.try_recv_from(buf) {
145+
Ok((amt, src)) => Ok(Some((amt, MessageSource::Network(src)))),
146+
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(None),
147+
Err(e) => Err(e),
148+
},
149+
// Non-UDP transports don't support non-blocking reads
150+
_ => Ok(None),
151+
}
152+
}
122153
}
123154

124155
/// DogStatsD server to receive, parse, and forward metrics.
@@ -249,17 +280,31 @@ impl DogStatsD {
249280
} = self;
250281

251282
let (tx, mut rx) = tokio::sync::mpsc::channel(queue_size);
283+
// Buffer pool: processor returns used buffers, reader reuses them.
284+
// Avoids a heap allocation + zero-fill per packet in steady state.
285+
let (pool_tx, pool_rx) = tokio::sync::mpsc::unbounded_channel();
286+
let reader_pool_tx = pool_tx.clone();
252287

253288
let reader_token = cancel_token.clone();
254289
tokio::spawn(async move {
255-
read_loop(buffer_reader, tx, reader_token, buf_size, queue_size).await;
290+
read_loop(
291+
buffer_reader,
292+
tx,
293+
reader_token,
294+
buf_size,
295+
queue_size,
296+
pool_rx,
297+
reader_pool_tx,
298+
)
299+
.await;
256300
});
257301

258302
process_loop(
259303
&mut rx,
260304
&cancel_token,
261305
&aggregator_handle,
262306
metric_namespace.as_deref(),
307+
pool_tx,
263308
)
264309
.await;
265310
}
@@ -285,21 +330,39 @@ impl DogStatsD {
285330

286331
/// Drains the transport into the channel as fast as possible.
287332
///
288-
/// Allocates a fresh buffer for each read. On queue full, drops the packet
289-
/// and logs at power-of-two intervals (matching Go agent behavior).
333+
/// Buffers are drawn from a pool (or allocated on first use) and returned by
334+
/// the processor after parsing. In steady state this means zero heap
335+
/// allocations in the read loop — only a `recv_from` syscall + channel send.
336+
///
337+
/// After each async `read_into()`, calls `try_read_into()` in a tight loop to
338+
/// drain all packets already sitting in the kernel buffer without re-entering
339+
/// tokio's event loop.
340+
///
341+
/// Uses `is_cancelled()` as a fast-path check at the top of each iteration,
342+
/// and `tokio::select!` on the async `read_into()` to ensure the reader exits
343+
/// promptly even on a quiet socket (where `read_into` would block forever).
290344
async fn read_loop(
291345
mut reader: BufferReader,
292-
tx: tokio::sync::mpsc::Sender<(Vec<u8>, MessageSource)>,
346+
tx: tokio::sync::mpsc::Sender<Packet>,
293347
cancel: tokio_util::sync::CancellationToken,
294348
buf_size: usize,
295349
queue_capacity: usize,
350+
mut pool_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
351+
pool_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
296352
) {
297353
let mut dropped: u64 = 0;
298-
loop {
299-
let mut buf = vec![0u8; buf_size];
354+
while !cancel.is_cancelled() {
355+
// Get a buffer from the pool, or allocate if the pool is empty (cold start).
356+
let mut buf = pool_rx.try_recv().unwrap_or_else(|_| vec![0u8; buf_size]);
357+
358+
// Async wait for the first packet, but remain cancellation-aware
359+
// so the reader exits promptly on a quiet socket.
300360
let result = tokio::select! {
301361
r = reader.read_into(&mut buf) => r,
302-
_ = cancel.cancelled() => break,
362+
_ = cancel.cancelled() => {
363+
let _ = pool_tx.send(buf);
364+
break;
365+
}
303366
};
304367
let (len, source) = match result {
305368
Ok(r) => r,
@@ -309,22 +372,45 @@ async fn read_loop(
309372
}
310373
Err(e) => {
311374
error!("DogStatsD read error: {}", e);
375+
let _ = pool_tx.send(buf);
312376
continue;
313377
}
314378
};
315379
if len == 0 {
380+
let _ = pool_tx.send(buf);
316381
continue;
317382
}
318-
buf.truncate(len);
319-
match tx.try_send((buf, source)) {
320-
Ok(()) => {}
321-
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
322-
dropped += 1;
323-
if dropped.is_power_of_two() {
324-
debug!("DogStatsD queue full, {} packets dropped so far", dropped);
383+
if !try_send_packet(&tx, Packet { buf, len, source }, &mut dropped, &pool_tx) {
384+
break;
385+
}
386+
387+
// Drain any packets already in the kernel buffer without going
388+
// through tokio's event loop — just raw non-blocking syscalls.
389+
loop {
390+
let mut buf = pool_rx.try_recv().unwrap_or_else(|_| vec![0u8; buf_size]);
391+
match reader.try_read_into(&mut buf) {
392+
Ok(Some((0, _))) => {
393+
let _ = pool_tx.send(buf);
394+
}
395+
Ok(Some((len, source))) => {
396+
if !try_send_packet(&tx, Packet { buf, len, source }, &mut dropped, &pool_tx) {
397+
break;
398+
}
399+
}
400+
Ok(None) => {
401+
let _ = pool_tx.send(buf);
402+
break;
403+
}
404+
Err(e) => {
405+
error!("DogStatsD read error: {}", e);
406+
let _ = pool_tx.send(buf);
407+
break;
325408
}
326409
}
327-
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => break,
410+
}
411+
// If the channel was closed during batch drain, exit immediately.
412+
if tx.is_closed() {
413+
break;
328414
}
329415
}
330416
if dropped > 0 {
@@ -335,28 +421,58 @@ async fn read_loop(
335421
}
336422
}
337423

424+
/// Sends a packet to the channel. Returns `false` if the channel is closed
425+
/// (caller should exit the loop). On failure, returns the buffer to the pool
426+
/// so it can be reused.
427+
fn try_send_packet(
428+
tx: &tokio::sync::mpsc::Sender<Packet>,
429+
packet: Packet,
430+
dropped: &mut u64,
431+
pool: &tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
432+
) -> bool {
433+
match tx.try_send(packet) {
434+
Ok(()) => true,
435+
Err(tokio::sync::mpsc::error::TrySendError::Full(packet)) => {
436+
let _ = pool.send(packet.buf);
437+
*dropped += 1;
438+
if dropped.is_power_of_two() {
439+
debug!("DogStatsD queue full, {} packets dropped so far", *dropped);
440+
}
441+
true
442+
}
443+
Err(tokio::sync::mpsc::error::TrySendError::Closed(packet)) => {
444+
let _ = pool.send(packet.buf);
445+
false
446+
}
447+
}
448+
}
449+
338450
/// Receives packets from the channel, parses them, and forwards metrics
339-
/// to the aggregator. On cancellation, drains any remaining buffered
340-
/// packets before exiting so no already-read data is lost.
451+
/// to the aggregator. After processing each packet, returns the buffer to
452+
/// the pool for reuse by the reader. On cancellation, drains any remaining
453+
/// buffered packets before exiting so no already-read data is lost.
341454
async fn process_loop(
342-
rx: &mut tokio::sync::mpsc::Receiver<(Vec<u8>, MessageSource)>,
455+
rx: &mut tokio::sync::mpsc::Receiver<Packet>,
343456
cancel: &tokio_util::sync::CancellationToken,
344457
aggregator: &AggregatorHandle,
345458
namespace: Option<&str>,
459+
pool: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
346460
) {
347461
loop {
348462
tokio::select! {
349463
packet = rx.recv() => {
350464
match packet {
351-
Some((buf, src)) => {
352-
process_packet(&buf, &src, aggregator, namespace);
465+
Some(packet) => {
466+
process_packet(packet.data(), &packet.source, aggregator, namespace);
467+
let _ = pool.send(packet.buf);
353468
}
354469
None => break,
355470
}
356471
}
357472
_ = cancel.cancelled() => {
358-
while let Ok((buf, src)) = rx.try_recv() {
359-
process_packet(&buf, &src, aggregator, namespace);
473+
while let Ok(packet) = rx.try_recv() {
474+
process_packet(packet.data(), &packet.source, aggregator, namespace);
475+
let _ = pool.send(packet.buf);
360476
}
361477
break;
362478
}
@@ -732,8 +848,8 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
732848
#[tokio::test]
733849
async fn test_dogstatsd_custom_buffer_size() {
734850
// Use a large buffer to verify custom buf_size is wired through.
735-
// The MirrorTest reader returns pre-stored data (ignoring buf_size),
736-
// so this test ensures the buf_size field is properly wired into DogStatsD.
851+
// The MirrorTest reader copies data into the caller's buffer, so
852+
// a payload that fits within the custom size should parse correctly.
737853
let payload = "large.buf.metric:1|c\nlarge.buf.metric2:2|c\n";
738854
let custom_buf_size: usize = 16384;
739855

crates/dogstatsd/tests/integration_test.rs

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,121 @@ async fn test_send_with_retry_immediate_failure_after_one_attempt() {
297297
mock.assert_async().await;
298298
}
299299

300+
/// Verifies that `spin()` exits promptly when cancelled on a quiet socket
301+
/// (no packets arriving). Without cancellation-aware awaiting in the reader,
302+
/// the task would block on `recv_from` indefinitely.
303+
#[cfg(test)]
304+
#[tokio::test]
305+
async fn test_spin_exits_on_cancellation_without_traffic() {
306+
use dogstatsd::metric::SortedTags;
307+
308+
let (service, handle) =
309+
AggregatorService::new(SortedTags::parse("test:value").unwrap(), CONTEXTS)
310+
.expect("aggregator service creation failed");
311+
tokio::spawn(service.run());
312+
313+
let cancel_token = CancellationToken::new();
314+
let dogstatsd = DogStatsD::new(
315+
&DogStatsDConfig {
316+
host: "127.0.0.1".to_string(),
317+
port: 18128,
318+
metric_namespace: None,
319+
#[cfg(all(windows, feature = "windows-pipes"))]
320+
windows_pipe_name: None,
321+
so_rcvbuf: None,
322+
buffer_size: None,
323+
queue_size: None,
324+
},
325+
handle.clone(),
326+
cancel_token.clone(),
327+
)
328+
.await;
329+
330+
let spin_handle = tokio::spawn(async move {
331+
dogstatsd.spin().await;
332+
});
333+
334+
// Cancel immediately — no packets sent
335+
cancel_token.cancel();
336+
337+
// spin() must exit within 500ms; if it blocks on recv_from this times out.
338+
let result = timeout(Duration::from_millis(500), spin_handle).await;
339+
assert!(
340+
result.is_ok(),
341+
"spin() should exit promptly after cancellation on a quiet socket"
342+
);
343+
344+
handle.shutdown().expect("shutdown failed");
345+
}
346+
347+
/// Verifies that when the internal queue is full, the server drops packets
348+
/// rather than blocking, and still delivers the metrics it did accept.
349+
/// Uses queue_size=1 so the queue saturates almost immediately.
350+
#[cfg(test)]
351+
#[tokio::test]
352+
async fn test_queue_full_drops_packets_without_blocking() {
353+
use dogstatsd::metric::SortedTags;
354+
355+
let (service, handle) =
356+
AggregatorService::new(SortedTags::parse("test:value").unwrap(), CONTEXTS)
357+
.expect("aggregator service creation failed");
358+
tokio::spawn(service.run());
359+
360+
let cancel_token = CancellationToken::new();
361+
let dogstatsd = DogStatsD::new(
362+
&DogStatsDConfig {
363+
host: "127.0.0.1".to_string(),
364+
port: 18129,
365+
metric_namespace: None,
366+
#[cfg(all(windows, feature = "windows-pipes"))]
367+
windows_pipe_name: None,
368+
so_rcvbuf: None,
369+
buffer_size: None,
370+
queue_size: Some(1),
371+
},
372+
handle.clone(),
373+
cancel_token.clone(),
374+
)
375+
.await;
376+
377+
let spin_handle = tokio::spawn(async move {
378+
dogstatsd.spin().await;
379+
});
380+
381+
sleep(Duration::from_millis(50)).await;
382+
383+
// Fire 200 packets as fast as possible — with queue_size=1, most will be dropped.
384+
let socket = UdpSocket::bind("0.0.0.0:0").await.expect("bind failed");
385+
for i in 0..200 {
386+
let msg = format!("qfull.m{}:{}|c\n", i, i);
387+
let _ = socket.send_to(msg.as_bytes(), "127.0.0.1:18129").await;
388+
}
389+
390+
// Give the processor time to consume what it received.
391+
sleep(Duration::from_millis(300)).await;
392+
393+
let response = handle.flush().await.expect("flush failed");
394+
let received: usize = response.series.iter().map(|s| s.len()).sum();
395+
396+
// With queue_size=1, we expect some metrics to arrive but not all 200.
397+
// The exact number depends on timing, so just verify:
398+
// 1. At least some metrics got through (server isn't broken)
399+
// 2. Fewer than all 200 arrived (drops happened)
400+
assert!(
401+
received > 0,
402+
"expected at least some metrics to arrive, got 0"
403+
);
404+
assert!(
405+
received < 200,
406+
"expected some packets to be dropped with queue_size=1, but all {} arrived",
407+
received
408+
);
409+
410+
cancel_token.cancel();
411+
let _ = timeout(Duration::from_millis(500), spin_handle).await;
412+
handle.shutdown().expect("shutdown failed");
413+
}
414+
300415
/// Verifies that `buffer_size` actually controls how many bytes the server
301416
/// reads per UDP packet. Sends a payload larger than a small buffer and
302417
/// checks that metrics are lost, then sends the same payload to a server

0 commit comments

Comments
 (0)