Skip to content

Commit 205a42e

Browse files
committed
Fix data races with concentrator shutdown
Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
1 parent 7365de0 commit 205a42e

File tree

12 files changed

+303
-230
lines changed

12 files changed

+303
-230
lines changed

datadog-ipc/src/atomic_option.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Lock-free `Option<T>` with atomic take, valid for any `T` where
5+
//! `size_of::<Option<T>>() <= 8`.
6+
7+
use std::cell::UnsafeCell;
8+
use std::mem::{self, MaybeUninit};
9+
use std::ptr;
10+
use std::sync::atomic::{AtomicU16, AtomicU32, AtomicU64, AtomicU8, Ordering};
11+
12+
/// An `Option<T>` that supports lock-free atomic take.
13+
///
14+
/// # Constraints
15+
/// `size_of::<Option<T>>()` must be `<= 8`. Enforced by a `debug_assert` in
16+
/// `From<Option<T>>`). This holds for niche-optimised types (`NonNull<T>`,
17+
/// `Box<T>`, …) and for any `Option<T>` that fits in a single machine word.
18+
///
19+
/// # Storage
20+
/// The option is stored in a `UnsafeCell<Option<T>>`, giving it exactly the size
21+
/// and alignment of `Option<T>` itself. `take()` picks the narrowest atomic that
22+
/// covers `size_of::<Option<T>>()` bytes — `AtomicU8` for 1-byte options up to
23+
/// `AtomicU64` for 5–8 byte options. The atomic cast is valid because
24+
/// `align_of::<AtomicUN>() == align_of::<uN>() <= align_of::<Option<T>>()`.
25+
///
26+
/// # None sentinel
27+
/// The "none" bit-pattern is computed by value (`Option::<T>::None`) rather than
28+
/// assumed to be zero, so the implementation is correct for both niche-optimised
29+
/// types and discriminant-based options.
30+
///
31+
/// `UnsafeCell` provides the interior-mutability aliasing permission required by
32+
/// Rust's memory model when mutating through a shared reference.
33+
pub struct AtomicOption<T>(UnsafeCell<Option<T>>);
34+
35+
impl<T> AtomicOption<T> {
36+
/// Encode `val` as a `u64`, transferring ownership into the bit representation.
37+
const fn encode(val: Option<T>) -> u64 {
38+
let mut bits = 0u64;
39+
unsafe {
40+
ptr::copy_nonoverlapping(
41+
ptr::from_ref(&val).cast::<u8>(),
42+
ptr::from_mut(&mut bits).cast::<u8>(),
43+
size_of::<Option<T>>(),
44+
);
45+
mem::forget(val);
46+
}
47+
bits
48+
}
49+
50+
/// Atomically swap the storage with `new_bits`, returning the old bits.
51+
#[inline]
52+
fn atomic_swap(&self, new_bits: u64) -> u64 {
53+
unsafe {
54+
let ptr = self.0.get();
55+
match size_of::<Option<T>>() {
56+
1 => (*(ptr as *const AtomicU8)).swap(new_bits as u8, Ordering::AcqRel) as u64,
57+
2 => (*(ptr as *const AtomicU16)).swap(new_bits as u16, Ordering::AcqRel) as u64,
58+
3 | 4 => {
59+
(*(ptr as *const AtomicU32)).swap(new_bits as u32, Ordering::AcqRel) as u64
60+
}
61+
_ => (*(ptr as *const AtomicU64)).swap(new_bits, Ordering::AcqRel),
62+
}
63+
}
64+
}
65+
66+
/// Reconstruct an `Option<T>` from its `u64` bit representation.
67+
///
68+
/// # Safety
69+
/// `bits` must hold a valid `Option<T>` bit-pattern in its low
70+
/// `size_of::<Option<T>>()` bytes, as produced by a previous `encode`.
71+
const unsafe fn decode(bits: u64) -> Option<T> {
72+
let mut result = MaybeUninit::<Option<T>>::uninit();
73+
ptr::copy_nonoverlapping(
74+
ptr::from_ref(&bits).cast::<u8>(),
75+
result.as_mut_ptr().cast::<u8>(),
76+
size_of::<Option<T>>(),
77+
);
78+
result.assume_init()
79+
}
80+
81+
/// Atomically replace the stored value with `None` and return what was there.
82+
/// Returns `None` if the value was already taken.
83+
pub fn take(&self) -> Option<T> {
84+
let old = self.atomic_swap(Self::encode(None));
85+
// SAFETY: `old` holds a valid `Option<T>` bit-pattern.
86+
unsafe { Self::decode(old) }
87+
}
88+
89+
/// Atomically store `val`, dropping any previous value.
90+
pub fn set(&self, val: Option<T>) -> Option<T> {
91+
let old = self.atomic_swap(Self::encode(val));
92+
unsafe { Self::decode(old) }
93+
}
94+
95+
/// Atomically store `Some(val)`, returning the previous value.
96+
pub fn replace(&self, val: T) -> Option<T> {
97+
self.set(Some(val))
98+
}
99+
100+
/// Borrow the current value without taking it.
101+
///
102+
/// # Safety
103+
/// Must not be called concurrently with [`take`], [`set`], or [`replace`].
104+
pub unsafe fn as_option(&self) -> &Option<T> {
105+
&*self.0.get()
106+
}
107+
}
108+
109+
impl<T> From<Option<T>> for AtomicOption<T> {
110+
fn from(val: Option<T>) -> Self {
111+
// we may raise this to 16 once AtomicU128 becomes stable
112+
debug_assert!(
113+
size_of::<Option<T>>() <= size_of::<u64>(),
114+
"AtomicOption requires size_of::<Option<T>>() <= 8, got {}",
115+
size_of::<Option<T>>()
116+
);
117+
Self(UnsafeCell::new(val))
118+
}
119+
}
120+
121+
// `AtomicOption<T>` is `Send`/`Sync` when `T: Send` — same contract as `Mutex<Option<T>>`.
122+
unsafe impl<T: Send> Send for AtomicOption<T> {}
123+
unsafe impl<T: Send> Sync for AtomicOption<T> {}

datadog-ipc/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ pub mod platform;
1414
pub mod rate_limiter;
1515
pub mod shm_stats;
1616

17+
mod atomic_option;
1718
pub mod client;
1819
pub mod codec;
20+
pub use atomic_option::AtomicOption;
1921

2022
pub use client::IpcClientConn;
2123
#[cfg(target_os = "linux")]

datadog-ipc/src/platform/mem_handle.rs

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use crate::handles::{HandlesTransport, TransferHandles};
55
use crate::platform::{mmap_handle, munmap_handle, OwnedFileHandle, PlatformHandle};
6+
use crate::AtomicOption;
67
#[cfg(feature = "tiny-bytes")]
78
use libdd_tinybytes::UnderlyingBytes;
89
use serde::{Deserialize, Serialize};
@@ -39,15 +40,16 @@ pub(crate) struct ShmPath {
3940

4041
pub struct NamedShmHandle {
4142
pub(crate) inner: ShmHandle,
42-
pub(crate) path: Option<ShmPath>,
43+
pub(crate) path: AtomicOption<Box<ShmPath>>,
4344
}
4445

4546
impl NamedShmHandle {
46-
pub fn get_path(&self) -> &[u8] {
47-
if let Some(ref shm_path) = &self.path {
48-
shm_path.name.as_bytes()
49-
} else {
50-
b""
47+
/// # Safety
48+
/// Must not be called concurrently with `unlink()`.
49+
pub unsafe fn get_path(&self) -> &[u8] {
50+
match self.path.as_option() {
51+
Some(shm_path) => shm_path.name.to_bytes(),
52+
None => b"",
5153
}
5254
}
5355
}
@@ -142,6 +144,16 @@ impl FileBackedHandle for NamedShmHandle {
142144
}
143145
}
144146

147+
impl MappedMem<NamedShmHandle> {
148+
/// Unlink the backing SHM file from the filesystem so new openers get `ENOENT`.
149+
/// Existing mappings remain valid. On Windows the mapping is managed by the OS
150+
/// via handle reference counts and there is no filesystem entry to remove.
151+
#[cfg(unix)]
152+
pub fn unlink(&self) {
153+
self.mem.unlink();
154+
}
155+
}
156+
145157
impl<T: MemoryHandle> MappedMem<T> {
146158
pub fn as_slice(&self) -> &[u8] {
147159
unsafe { std::slice::from_raw_parts(self.ptr.as_ptr().cast(), self.mem.get_size()) }
@@ -163,7 +175,9 @@ impl<T: MemoryHandle> AsRef<[u8]> for MappedMem<T> {
163175
}
164176

165177
impl MappedMem<NamedShmHandle> {
166-
pub fn get_path(&self) -> &[u8] {
178+
/// # Safety
179+
/// Must not be called concurrently with `unlink()`.
180+
pub unsafe fn get_path(&self) -> &[u8] {
167181
self.mem.get_path()
168182
}
169183
}
@@ -178,9 +192,10 @@ impl<T: FileBackedHandle> From<MappedMem<T>> for ShmHandle {
178192
}
179193

180194
impl From<MappedMem<NamedShmHandle>> for NamedShmHandle {
181-
fn from(mut handle: MappedMem<NamedShmHandle>) -> NamedShmHandle {
195+
fn from(handle: MappedMem<NamedShmHandle>) -> NamedShmHandle {
196+
let path = handle.mem.path.take().into();
182197
NamedShmHandle {
183-
path: handle.mem.path.take(),
198+
path,
184199
inner: handle.into(),
185200
}
186201
}

datadog-ipc/src/platform/unix/mem_handle.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,13 +183,18 @@ impl NamedShmHandle {
183183
Self::new(file.into(), None, size)
184184
}
185185

186+
/// Unlink the SHM file from the filesystem without unmapping it.
187+
pub fn unlink(&self) {
188+
let _ = self.path.take(); // Drop of Box<ShmPath> calls shm_unlink exactly once
189+
}
190+
186191
fn new(fd: OwnedFd, path: Option<CString>, size: usize) -> io::Result<NamedShmHandle> {
187192
Ok(NamedShmHandle {
188193
inner: ShmHandle {
189194
handle: fd.into(),
190195
size,
191196
},
192-
path: path.map(|path| ShmPath { name: path }),
197+
path: path.map(|path| Box::new(ShmPath { name: path })).into(),
193198
})
194199
}
195200
}

datadog-ipc/src/platform/unix/mem_handle_macos.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,18 @@ impl NamedShmHandle {
141141
Self::new(fd, None, 0)
142142
}
143143

144+
/// Unlink the SHM name from the filesystem without unmapping existing mappings.
145+
pub fn unlink(&self) {
146+
let _ = self.path.take(); // Drop of Box<ShmPath> calls shm_unlink exactly once
147+
}
148+
144149
fn new(fd: OwnedFd, path: Option<CString>, size: usize) -> io::Result<NamedShmHandle> {
145150
Ok(NamedShmHandle {
146151
inner: ShmHandle {
147152
handle: fd.into(),
148153
size: size | NOT_COMMITTED,
149154
},
150-
path: path.map(|path| ShmPath { name: path }),
155+
path: path.map(|path| Box::new(ShmPath { name: path })).into(),
151156
})
152157
}
153158
}
@@ -198,6 +203,6 @@ impl<T: FileBackedHandle + From<MappedMem<T>>> MappedMem<T> {
198203

199204
impl Drop for ShmPath {
200205
fn drop(&mut self) {
201-
_ = shm_unlink(path_slice(&self.name));
206+
_ = shm_unlink(path_slice(self.name.as_c_str()));
202207
}
203208
}

datadog-ipc/src/platform/windows/mem_handle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ impl NamedShmHandle {
151151
handle: unsafe { PlatformHandle::from_raw_handle(handle) },
152152
size,
153153
},
154-
path: Some(ShmPath { name }),
154+
path: Some(ShmPath { name }).map(Box::new).into(),
155155
})
156156
}
157157
}

datadog-ipc/src/shm_stats.rs

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@
3838
//! When the active bucket is nearly full the sidecar:
3939
//! 1. Creates a new SHM file at the *same path* (the old file is unlinked from the filesystem but
4040
//! remains accessible to processes that already have it open).
41-
//! 2. Sets `ShmHeader::please_reload = 1` on the **old** mapping so workers know to re-open the
42-
//! path on their next `add_span` call.
41+
//! 2. Sets `ShmHeader::ready = 0` on the **old** mapping so workers know to re-open the path on
42+
//! their next `add_span` call.
4343
//! 3. Holds onto the old concentrator for ≥ 1 s, flushing it periodically, to absorb any spans that
4444
//! arrived before workers noticed the reload flag.
4545
//! 4. Drops the old concentrator after that grace period.
@@ -177,6 +177,11 @@ struct ShmBucketHeader {
177177
struct ShmHeader {
178178
/// Layout version; checked by [`ShmSpanConcentrator::open`]. Mismatch returns an error.
179179
version: u32,
180+
/// Set to 1 by the sidecar when workers should re-open the SHM at the
181+
/// same path (a new, larger mapping has been created there).
182+
ready: AtomicU8,
183+
/// Index (0 or 1) of the bucket currently being written to by PHP workers.
184+
active_idx: AtomicU8,
180185
/// Width of each time bucket in nanoseconds (e.g. 10 s = 10_000_000_000).
181186
bucket_size_nanos: u64,
182187
/// Number of aggregation slots per bucket (hash-table capacity).
@@ -185,11 +190,6 @@ struct ShmHeader {
185190
bucket_region_size: u32,
186191
/// Byte capacity of the per-bucket string pool.
187192
string_pool_size: u32,
188-
/// Index (0 or 1) of the bucket currently being written to by PHP workers.
189-
active_idx: AtomicU8,
190-
/// Set to 1 by the sidecar when workers should re-open the SHM at the
191-
/// same path (a new, larger mapping has been created there).
192-
please_reload: AtomicU8,
193193
/// Monotonic counter incremented on every successful flush, used as the stats sequence number.
194194
flush_seq: AtomicU64,
195195
}
@@ -381,13 +381,20 @@ impl ShmSpanConcentrator {
381381

382382
let base = mem.as_slice_mut().as_mut_ptr();
383383
unsafe {
384-
// fresh mmap. Initialized to zero.
384+
// On Windows the named mapping may persist from a previous concentrator lifetime
385+
// (workers still hold handles after the sidecar retired it). Hence explicitly reset it.
386+
#[cfg(windows)]
387+
std::ptr::write_bytes(base, 0, total);
388+
385389
let hdr = &mut *(base as *mut ShmHeader);
386390
hdr.version = SHM_VERSION;
387391
hdr.bucket_size_nanos = bucket_size_nanos;
388392
hdr.slot_count = slot_count;
389393
hdr.bucket_region_size = aligned_bucket_region(slot_count, string_pool_size) as u32;
390394
hdr.string_pool_size = string_pool_size;
395+
// Signal readiness LAST — workers see ready=0 until this store and fall back
396+
// to IPC, preventing writes to a partially-initialized concentrator.
397+
hdr.ready.store(1, Release);
391398
}
392399

393400
Ok(ShmSpanConcentrator { mem: Arc::new(mem) })
@@ -401,6 +408,12 @@ impl ShmSpanConcentrator {
401408
let base = mem.as_slice().as_ptr();
402409
unsafe {
403410
let hdr = shm_header(base);
411+
if hdr.ready.load(Relaxed) == 0 {
412+
return Err(io::Error::new(
413+
io::ErrorKind::InvalidData,
414+
"SHM span concentrator: not yet ready",
415+
));
416+
}
404417
if hdr.version != SHM_VERSION {
405418
return Err(io::Error::new(
406419
io::ErrorKind::InvalidData,
@@ -426,7 +439,19 @@ impl ShmSpanConcentrator {
426439
/// Workers should call this before every `add_span`; when it returns `true`
427440
/// they should drop this handle, call `open(path)`, and retry.
428441
pub fn needs_reload(&self) -> bool {
429-
self.header().please_reload.load(Acquire) != 0
442+
self.header().ready.load(Acquire) == 0
443+
}
444+
445+
/// Unlink the SHM file from the filesystem so that new PHP workers cannot open it.
446+
/// Existing mappings (including this one and any already open in PHP workers) remain
447+
/// valid. Call this *before* `signal_reload` when retiring a concentrator.
448+
///
449+
/// Uses `Arc::get_mut` to take the path out (preventing a double-unlink on `Drop`).
450+
/// If multiple `Arc` clones are alive the path cannot be taken; the unlink still
451+
/// happens but `Drop` may attempt a harmless second unlink (which returns `ENOENT`).
452+
pub fn unlink(&self) {
453+
#[cfg(unix)]
454+
self.mem.unlink();
430455
}
431456

432457
/// Add a span to the currently-active bucket. Thread-safe.
@@ -578,7 +603,7 @@ impl ShmSpanConcentrator {
578603

579604
/// Signal workers to re-open the SHM (call before creating a new, larger one).
580605
pub fn signal_reload(&self) {
581-
self.header().please_reload.store(1, Release);
606+
self.header().ready.store(0, Release);
582607
}
583608

584609
/// Drain the inactive (or both, if `force`) bucket(s) and return raw stat buckets.

datadog-sidecar/src/service/runtime_info.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ pub(crate) struct RuntimeInfo {
3232
#[derive(Default)]
3333
pub(crate) struct ActiveApplication {
3434
pub remote_config_guard: Option<RemoteConfigsGuard>,
35-
pub span_concentrator_guard: Option<crate::service::stats_flusher::SpanConcentratorGuard>,
3635
pub env: Option<String>,
3736
pub app_version: Option<String>,
3837
pub service_name: Option<String>,

0 commit comments

Comments
 (0)