Skip to content

Commit ff8e912

Browse files
authored
feat(sidecar)!: Add stats computation via SHM (#1821)
This extracts the StatsExporter to libdd-trace-stats to reuse it outside of the data-pipeline. Using a two-bucket mechanism, it allows all processes to write stats, while the sidecar drains the other bucket. Co-authored-by: bob.weinand <bob.weinand@datadoghq.com>
1 parent 5a605d2 commit ff8e912

File tree

31 files changed

+1983
-245
lines changed

31 files changed

+1983
-245
lines changed

.github/workflows/lint.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ jobs:
6161
export AWS_LC_FIPS_SYS_NO_ASM=1
6262
fi
6363
# shellcheck disable=SC2046
64-
cargo clippy --workspace --all-targets --all-features -- -D warnings
64+
cargo clippy --workspace --all-targets --all-features -- -D warnings $([ ${{ matrix.rust_version }} = 1.84.1 ] || echo -Aclippy::manual_is_multiple_of)
6565
6666
licensecheck:
6767
runs-on: ubuntu-latest

Cargo.lock

Lines changed: 24 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datadog-ipc/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ publish = false
88

99
[dependencies]
1010
anyhow = { version = "1.0" }
11+
zwohash = "0.1.2"
1112
bincode = { version = "1" }
1213
futures = { version = "0.3", default-features = false }
1314
io-lifetimes = { version = "1.0" }
@@ -19,6 +20,9 @@ libdd-tinybytes = { path = "../libdd-tinybytes", optional = true }
1920

2021

2122
libdd-common = { path = "../libdd-common" }
23+
libdd-ddsketch = { path = "../libdd-ddsketch" }
24+
libdd-trace-protobuf = { path = "../libdd-trace-protobuf" }
25+
libdd-trace-stats = { path = "../libdd-trace-stats" }
2226
datadog-ipc-macros = { path = "../datadog-ipc-macros" }
2327
tracing = { version = "0.1", default-features = false }
2428

datadog-ipc/src/atomic_option.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Lock-free `Option<T>` with atomic take, valid for any `T` where
5+
//! `size_of::<Option<T>>() <= 8`.
6+
7+
use std::cell::UnsafeCell;
8+
use std::mem::{self, MaybeUninit};
9+
use std::ptr;
10+
use std::sync::atomic::{AtomicU16, AtomicU32, AtomicU64, AtomicU8, Ordering};
11+
12+
/// An `Option<T>` that supports lock-free atomic take.
13+
///
14+
/// # Constraints
15+
/// `size_of::<Option<T>>()` must be `<= 8`. Enforced by a `debug_assert` in
16+
/// `From<Option<T>>`). This holds for niche-optimised types (`NonNull<T>`,
17+
/// `Box<T>`, …) and for any `Option<T>` that fits in a single machine word.
18+
///
19+
/// # Storage
20+
/// The option is stored in a `UnsafeCell<Option<T>>`, giving it exactly the size
21+
/// and alignment of `Option<T>` itself. `take()` picks the narrowest atomic that
22+
/// covers `size_of::<Option<T>>()` bytes — `AtomicU8` for 1-byte options up to
23+
/// `AtomicU64` for 5–8 byte options. The atomic cast is valid because
24+
/// `align_of::<AtomicUN>() == align_of::<uN>() <= align_of::<Option<T>>()`.
25+
///
26+
/// # None sentinel
27+
/// The "none" bit-pattern is computed by value (`Option::<T>::None`) rather than
28+
/// assumed to be zero, so the implementation is correct for both niche-optimised
29+
/// types and discriminant-based options.
30+
///
31+
/// `UnsafeCell` provides the interior-mutability aliasing permission required by
32+
/// Rust's memory model when mutating through a shared reference.
33+
pub struct AtomicOption<T>(UnsafeCell<Option<T>>);
34+
35+
impl<T> AtomicOption<T> {
36+
/// Encode `val` as a `u64`, transferring ownership into the bit representation.
37+
const fn encode(val: Option<T>) -> u64 {
38+
let mut bits = 0u64;
39+
unsafe {
40+
ptr::copy_nonoverlapping(
41+
ptr::from_ref(&val).cast::<u8>(),
42+
ptr::from_mut(&mut bits).cast::<u8>(),
43+
size_of::<Option<T>>(),
44+
);
45+
mem::forget(val);
46+
}
47+
bits
48+
}
49+
50+
/// Atomically swap the storage with `new_bits`, returning the old bits.
51+
#[inline]
52+
fn atomic_swap(&self, new_bits: u64) -> u64 {
53+
unsafe {
54+
let ptr = self.0.get();
55+
match size_of::<Option<T>>() {
56+
1 => (*(ptr as *const AtomicU8)).swap(new_bits as u8, Ordering::AcqRel) as u64,
57+
2 => (*(ptr as *const AtomicU16)).swap(new_bits as u16, Ordering::AcqRel) as u64,
58+
3 | 4 => {
59+
(*(ptr as *const AtomicU32)).swap(new_bits as u32, Ordering::AcqRel) as u64
60+
}
61+
_ => (*(ptr as *const AtomicU64)).swap(new_bits, Ordering::AcqRel),
62+
}
63+
}
64+
}
65+
66+
/// Reconstruct an `Option<T>` from its `u64` bit representation.
67+
///
68+
/// # Safety
69+
/// `bits` must hold a valid `Option<T>` bit-pattern in its low
70+
/// `size_of::<Option<T>>()` bytes, as produced by a previous `encode`.
71+
const unsafe fn decode(bits: u64) -> Option<T> {
72+
let mut result = MaybeUninit::<Option<T>>::uninit();
73+
ptr::copy_nonoverlapping(
74+
ptr::from_ref(&bits).cast::<u8>(),
75+
result.as_mut_ptr().cast::<u8>(),
76+
size_of::<Option<T>>(),
77+
);
78+
result.assume_init()
79+
}
80+
81+
/// Atomically replace the stored value with `None` and return what was there.
82+
/// Returns `None` if the value was already taken.
83+
pub fn take(&self) -> Option<T> {
84+
let old = self.atomic_swap(Self::encode(None));
85+
// SAFETY: `old` holds a valid `Option<T>` bit-pattern.
86+
unsafe { Self::decode(old) }
87+
}
88+
89+
/// Atomically store `val`, dropping any previous value.
90+
pub fn set(&self, val: Option<T>) -> Option<T> {
91+
let old = self.atomic_swap(Self::encode(val));
92+
unsafe { Self::decode(old) }
93+
}
94+
95+
/// Atomically store `Some(val)`, returning the previous value.
96+
pub fn replace(&self, val: T) -> Option<T> {
97+
self.set(Some(val))
98+
}
99+
100+
/// Borrow the current value without taking it.
101+
///
102+
/// # Safety
103+
/// Must not be called concurrently with [`take`], [`set`], or [`replace`].
104+
pub unsafe fn as_option(&self) -> &Option<T> {
105+
&*self.0.get()
106+
}
107+
}
108+
109+
impl<T> From<Option<T>> for AtomicOption<T> {
110+
fn from(val: Option<T>) -> Self {
111+
// we may raise this to 16 once AtomicU128 becomes stable
112+
debug_assert!(
113+
size_of::<Option<T>>() <= size_of::<u64>(),
114+
"AtomicOption requires size_of::<Option<T>>() <= 8, got {}",
115+
size_of::<Option<T>>()
116+
);
117+
Self(UnsafeCell::new(val))
118+
}
119+
}
120+
121+
// `AtomicOption<T>` is `Send`/`Sync` when `T: Send` — same contract as `Mutex<Option<T>>`.
122+
unsafe impl<T: Send> Send for AtomicOption<T> {}
123+
unsafe impl<T: Send> Sync for AtomicOption<T> {}

datadog-ipc/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@ pub mod handles;
1212

1313
pub mod platform;
1414
pub mod rate_limiter;
15+
pub mod shm_stats;
1516

17+
mod atomic_option;
1618
pub mod client;
1719
pub mod codec;
20+
pub use atomic_option::AtomicOption;
1821

1922
pub use client::IpcClientConn;
2023
#[cfg(target_os = "linux")]

datadog-ipc/src/platform/mem_handle.rs

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33

44
use crate::handles::{HandlesTransport, TransferHandles};
55
use crate::platform::{mmap_handle, munmap_handle, OwnedFileHandle, PlatformHandle};
6+
use crate::AtomicOption;
67
#[cfg(feature = "tiny-bytes")]
78
use libdd_tinybytes::UnderlyingBytes;
89
use serde::{Deserialize, Serialize};
10+
#[cfg(target_os = "linux")]
11+
use std::os::fd::AsRawFd;
912
use std::{ffi::CString, io, ptr::NonNull};
1013

1114
#[derive(Clone, Serialize, Deserialize, Debug)]
@@ -37,15 +40,16 @@ pub(crate) struct ShmPath {
3740

3841
pub struct NamedShmHandle {
3942
pub(crate) inner: ShmHandle,
40-
pub(crate) path: Option<ShmPath>,
43+
pub(crate) path: AtomicOption<Box<ShmPath>>,
4144
}
4245

4346
impl NamedShmHandle {
44-
pub fn get_path(&self) -> &[u8] {
45-
if let Some(ref shm_path) = &self.path {
46-
shm_path.name.as_bytes()
47-
} else {
48-
b""
47+
/// # Safety
48+
/// Must not be called concurrently with `unlink()`.
49+
pub unsafe fn get_path(&self) -> &[u8] {
50+
match self.path.as_option() {
51+
Some(shm_path) => shm_path.name.to_bytes(),
52+
None => b"",
4953
}
5054
}
5155
}
@@ -87,10 +91,19 @@ where
8791
unsafe {
8892
self.set_mapping_size(size)?;
8993
}
90-
nix::unistd::ftruncate(
91-
self.get_shm().handle.as_owned_fd()?,
92-
self.get_shm().size as libc::off_t,
94+
let new_size = self.get_shm().size as libc::off_t;
95+
let fd = self.get_shm().handle.as_owned_fd()?;
96+
// Use fallocate on Linux to eagerly commit the new pages: ENOSPC at resize time is
97+
// recoverable; a later SIGBUS mid-execution is not.
98+
#[cfg(target_os = "linux")]
99+
nix::fcntl::fallocate(
100+
fd.as_raw_fd(),
101+
nix::fcntl::FallocateFlags::empty(),
102+
0,
103+
new_size,
93104
)?;
105+
#[cfg(not(target_os = "linux"))]
106+
nix::unistd::ftruncate(&fd, new_size)?;
94107
Ok(())
95108
}
96109
/// # Safety
@@ -131,6 +144,16 @@ impl FileBackedHandle for NamedShmHandle {
131144
}
132145
}
133146

147+
impl MappedMem<NamedShmHandle> {
148+
/// Unlink the backing SHM file from the filesystem so new openers get `ENOENT`.
149+
/// Existing mappings remain valid. On Windows the mapping is managed by the OS
150+
/// via handle reference counts and there is no filesystem entry to remove.
151+
#[cfg(unix)]
152+
pub fn unlink(&self) {
153+
self.mem.unlink();
154+
}
155+
}
156+
134157
impl<T: MemoryHandle> MappedMem<T> {
135158
pub fn as_slice(&self) -> &[u8] {
136159
unsafe { std::slice::from_raw_parts(self.ptr.as_ptr().cast(), self.mem.get_size()) }
@@ -152,7 +175,9 @@ impl<T: MemoryHandle> AsRef<[u8]> for MappedMem<T> {
152175
}
153176

154177
impl MappedMem<NamedShmHandle> {
155-
pub fn get_path(&self) -> &[u8] {
178+
/// # Safety
179+
/// Must not be called concurrently with `unlink()`.
180+
pub unsafe fn get_path(&self) -> &[u8] {
156181
self.mem.get_path()
157182
}
158183
}
@@ -167,9 +192,10 @@ impl<T: FileBackedHandle> From<MappedMem<T>> for ShmHandle {
167192
}
168193

169194
impl From<MappedMem<NamedShmHandle>> for NamedShmHandle {
170-
fn from(mut handle: MappedMem<NamedShmHandle>) -> NamedShmHandle {
195+
fn from(handle: MappedMem<NamedShmHandle>) -> NamedShmHandle {
196+
let path = handle.mem.path.take().into();
171197
NamedShmHandle {
172-
path: handle.mem.path.take(),
198+
path,
173199
inner: handle.into(),
174200
}
175201
}

0 commit comments

Comments
 (0)