Skip to content

Commit e97c06e

Browse files
committed
Dynamically allocate receive buffer to avoid unbounded committed memory growth
Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
1 parent 7f5b54c commit e97c06e

3 files changed

Lines changed: 27 additions & 12 deletions

File tree

datadog-ipc-macros/src/lib.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,17 +319,16 @@ fn gen_serve_fn(
319319
return;
320320
}
321321
};
322-
let mut buf = vec![0u8; datadog_ipc::max_message_size() + datadog_ipc::HANDLE_SUFFIX_SIZE];
323322
loop {
324-
let (n, fds) = match datadog_ipc::recv_raw_async(&async_fd, &mut buf).await {
323+
let (buf, fds) = match datadog_ipc::recv_raw_async(&async_fd).await {
325324
Ok(x) => x,
326325
Err(e) => {
327326
::tracing::trace!("IPC serve: recv (connection closed?): {e}");
328327
break;
329328
}
330329
};
331330
let Ok((discriminant, mut req)) =
332-
datadog_ipc::codec::decode::<#enum_name>(&buf[..n])
331+
datadog_ipc::codec::decode::<#enum_name>(&buf)
333332
else {
334333
::tracing::warn!("IPC serve: failed to decode request");
335334
break;

datadog-ipc/src/platform/unix/sockets/mod.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,12 +348,23 @@ pub type AsyncConn = AsyncFd<OwnedFd>;
348348

349349
/// Async receive on a Tokio `AsyncFd`-wrapped IPC connection.
350350
///
351+
/// Allocates a buffer sized to `max_message_size()` per call and returns only the received
352+
/// bytes (truncated), so no large buffer is held between receives.
353+
///
351354
/// Used by the server dispatch loop (generated by `#[service]` macro).
352-
pub async fn recv_raw_async(fd: &AsyncConn, buf: &mut [u8]) -> io::Result<(usize, Vec<OwnedFd>)> {
355+
pub async fn recv_raw_async(fd: &AsyncConn) -> io::Result<(Vec<u8>, Vec<OwnedFd>)> {
353356
loop {
354357
let mut guard = fd.readable().await?;
355-
match guard.try_io(|inner| recvmsg_raw(inner.as_raw_fd(), buf, MsgFlags::empty())) {
356-
Ok(result) => return result,
358+
// SAFETY: recvmsg writes exactly the first n bytes; we truncate to n before returning,
359+
// so no uninitialized bytes are ever exposed to the caller.
360+
let mut buf = Vec::with_capacity(max_message_size());
361+
unsafe { buf.set_len(max_message_size()) };
362+
match guard.try_io(|inner| recvmsg_raw(inner.as_raw_fd(), &mut buf, MsgFlags::empty())) {
363+
Ok(Ok((n, fds))) => {
364+
buf.truncate(n);
365+
return Ok((buf, fds));
366+
}
367+
Ok(Err(e)) => return Err(e),
357368
Err(_would_block) => continue,
358369
}
359370
}

datadog-ipc/src/platform/windows/sockets.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -713,14 +713,19 @@ impl SeqpacketConn {
713713

714714
/// Async receive on a Windows named pipe IPC connection.
715715
///
716-
/// Calls `block_in_place` with a direct blocking `ReadFile` into the caller-supplied buffer,
716+
/// Calls `block_in_place` with a direct blocking `ReadFile` into a caller-owned buffer,
717717
/// bypassing mio's 4 KB internal read buffer and correctly handling messages of any size.
718-
pub async fn recv_raw_async(
719-
conn: &AsyncConn,
720-
buf: &mut [u8],
721-
) -> io::Result<(usize, Vec<OwnedHandle>)> {
718+
pub async fn recv_raw_async(conn: &AsyncConn) -> io::Result<(Vec<u8>, Vec<OwnedHandle>)> {
722719
let h = conn.handle.as_raw_handle() as SysHANDLE;
723-
tokio::task::block_in_place(|| pipe_read(h, buf, true))
720+
tokio::task::block_in_place(|| {
721+
// SAFETY: ReadFile writes exactly the first n bytes; we truncate to n before returning,
722+
// so no uninitialized bytes are ever exposed to the caller.
723+
let mut buf = Vec::with_capacity(max_message_size() + HANDLE_SUFFIX_SIZE);
724+
unsafe { buf.set_len(max_message_size() + HANDLE_SUFFIX_SIZE) };
725+
let (n, handles) = pipe_read(h, &mut buf, true)?;
726+
buf.truncate(n);
727+
Ok((buf, handles))
728+
})
724729
}
725730

726731
/// Async send on a Windows named pipe IPC connection.

0 commit comments

Comments
 (0)