Skip to content

Commit 0e8c2c6

Browse files
authored
chore: update otel process ctx protocol (#1713)
# What does this PR do? The [OTel process context spec](open-telemetry/opentelemetry-specification#4719) has been updated with the following minor changes: - `published_at_ns` is renamed to `monotonic_published_at_ns` and is only supposed to be monotonic, not especially the real time clock. Further discussions converged to the use of the BOOTTIME clock as the best implementation of this monotonic clock. - the timestamp becomes the only point of synchronization (instead of the signature during the publication protocol) # Motivation Keeping up-to-date with the OTEP. # Additional Notes N/A # How to test the change? Covered by existing tests. Co-authored-by: yann.hamdaoui <yann.hamdaoui@datadoghq.com>
1 parent 8fae837 commit 0e8c2c6

2 files changed

Lines changed: 70 additions & 61 deletions

File tree

libdd-library-config/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,4 @@ serial_test = "3.2"
3232

3333
[target.'cfg(unix)'.dependencies]
3434
memfd = { version = "0.6" }
35-
rustix = { version = "1.1.3", features = ["param", "mm", "process", "fs"] }
35+
rustix = { version = "1.1.3", features = ["param", "mm", "process", "fs", "time"] }

libdd-library-config/src/otel_process_ctx.rs

Lines changed: 69 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub mod linux {
2222
atomic::{fence, AtomicU64, Ordering},
2323
Mutex, MutexGuard,
2424
},
25-
time::{SystemTime, UNIX_EPOCH},
25+
time::Duration,
2626
};
2727

2828
use anyhow::Context;
@@ -31,6 +31,7 @@ pub mod linux {
3131
fs::{ftruncate, memfd_create, MemfdFlags},
3232
mm::{madvise, mmap, mmap_anonymous, munmap, Advice, MapFlags, ProtFlags},
3333
process::{getpid, set_virtual_memory_region_name, Pid},
34+
time::{clock_gettime, ClockId},
3435
};
3536

3637
use libdd_trace_protobuf::opentelemetry::proto::common::v1::ProcessContext;
@@ -52,21 +53,17 @@ pub mod linux {
5253
/// based synchronization requires the use of atomics to have any effect (see [Mandatory
5354
/// atomic](https://doc.rust-lang.org/std/sync/atomic/fn.fence.html#mandatory-atomic))
5455
///
55-
/// We use `signature` as a release notification for publication, and `published_at_ns` for
56-
/// updates. Ideally, those should be two `AtomicU64`, but this isn't compatible with
57-
/// `#[repr(C, packed)]`, since `AtomicU64` can't be used in a packed structure for alignment
58-
/// reason (what's more, their alignment might be bigger than the one of `u64` on some
59-
/// platforms).
60-
///
61-
/// In practice, given the page size and the layout of `MappingHeader`, the alignment should
62-
/// match (we statically test for it anyway). We can then use [`AtomicU64::from_ptr`] to create
63-
/// an atomic view of those fields when synchronization is needed.
56+
/// We use `monotonic_published_at_ns` for synchronization with the reader. Ideally, it should
57+
/// be an `AtomicU64`, but this is incompatible with `#[repr(C, packed)]` by default, as it
58+
/// could be misaligned. In our case, given the page size and the layout of `MappingHeader`, it
59+
/// is actually 8-bytes aligned: we use [`AtomicU64::from_ptr`] to create an atomic view when
60+
/// synchronization is needed.
6461
#[repr(C, packed)]
6562
struct MappingHeader {
6663
signature: [u8; 8],
6764
version: u32,
6865
payload_size: u32,
69-
published_at_ns: u64,
66+
monotonic_published_at_ns: u64,
7067
payload_ptr: *const u8,
7168
}
7269

@@ -232,7 +229,7 @@ pub mod linux {
232229
unsafe { madvise(mapping.start_addr, size, Advice::LinuxDontFork) }
233230
.context("madvise MADVISE_DONTFORK failed")?;
234231

235-
let published_at_ns = time_now_ns().ok_or_else(|| {
232+
let published_at_ns = since_boottime_ns().ok_or_else(|| {
236233
anyhow::anyhow!("failed to get current time for process context publication")
237234
})?;
238235

@@ -245,27 +242,26 @@ pub mod linux {
245242
ptr::write(
246243
header,
247244
MappingHeader {
248-
// signature will be set atomically at last
249-
signature: [0; 8],
245+
signature: *SIGNATURE,
250246
version: PROCESS_CTX_VERSION,
251247
payload_size: payload
252248
.len()
253249
.try_into()
254250
.context("payload size overflowed")?,
255-
published_at_ns,
251+
// will be set atomically at last
252+
monotonic_published_at_ns: 0,
256253
payload_ptr: payload.as_ptr(),
257254
},
258255
);
259-
// We typically want to avoid the compiler and the hardware to re-order the write to
260-
// the signature (which should be last according to the
256+
// We typically want to avoid the compiler and the hardware to re-order the write
257+
// to the `monotonic_published_at_ns` (which should be last according to the
261258
// specification) with the writes to other fields of the header.
262259
//
263260
// To do so, we implement synchronization during publication _as if the reader were
264261
// another thread of this program_, using atomics and fences.
265262
fence(Ordering::SeqCst);
266-
AtomicU64::from_ptr((*header).signature.as_mut_ptr().cast::<u64>())
267-
// To avoid shuffling bytes, we must use the native endianness
268-
.store(u64::from_ne_bytes(*SIGNATURE), Ordering::Relaxed);
263+
AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns))
264+
.store(published_at_ns, Ordering::Relaxed);
269265
}
270266

271267
let _ = mapping.set_name();
@@ -281,28 +277,27 @@ pub mod linux {
281277
fn update(&mut self, payload: Vec<u8>) -> anyhow::Result<()> {
282278
let header = self.mapping.start_addr as *mut MappingHeader;
283279

284-
let published_at_ns = time_now_ns()
280+
let monotonic_published_at_ns = since_boottime_ns()
285281
.ok_or_else(|| anyhow::anyhow!("could not get the current timestamp"))?;
286282
let payload_size = payload.len().try_into().map_err(|_| {
287-
anyhow::anyhow!("couldn't update process protocol: new payload too large")
283+
anyhow::anyhow!("couldn't update process context: new payload too large")
288284
})?;
289285

290-
// Safety
286+
// Safety:
291287
//
292288
// [^atomic-u64-alignment]: Page size is at minimum 4KB and will be always 8 bytes
293-
// aligned even on exotic platforms. The respective offsets of `signature` and
294-
// `published_at_ns` are 0 and 16 bytes, so they are 8-bytes aligned (`AtomicU64` has
295-
// both a size and align of 8 bytes).
289+
// aligned even on exotic platforms. The offset `monotonic_published_at_ns` is 16
290+
// bytes, so it's 8-bytes aligned (`AtomicU64` has both a size and align of 8 bytes).
296291
//
297292
// The header memory is valid for both read and writes.
298293
let published_at_atomic =
299-
unsafe { AtomicU64::from_ptr(addr_of_mut!((*header).published_at_ns)) };
294+
unsafe { AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns)) };
300295

301296
// A process shouldn't try to concurrently update its own context
302297
//
303-
// Note: be careful of early return while `published_at` is still zero, as this would
304-
// effectively "lock" any future publishing. Move throwing code above this swap, or
305-
// properly restore the previous value if the former can't be done.
298+
// Note: be careful of early return while `monotonic_published_at` is still zero, as
299+
// this would effectively "lock" any future publishing. Move throwing code above this
300+
// swap, or properly restore the previous value if the former can't be done.
306301
if published_at_atomic.swap(0, Ordering::Relaxed) == 0 {
307302
return Err(anyhow::anyhow!(
308303
"concurrent update of the process context is not supported"
@@ -320,7 +315,7 @@ pub mod linux {
320315
}
321316

322317
fence(Ordering::SeqCst);
323-
published_at_atomic.store(published_at_ns, Ordering::Relaxed);
318+
published_at_atomic.store(monotonic_published_at_ns, Ordering::Relaxed);
324319

325320
Ok(())
326321
}
@@ -335,11 +330,10 @@ pub mod linux {
335330
size_of::<MappingHeader>()
336331
}
337332

338-
fn time_now_ns() -> Option<u64> {
339-
SystemTime::now()
340-
.duration_since(UNIX_EPOCH)
341-
.ok()
342-
.and_then(|d| u64::try_from(d.as_nanos()).ok())
333+
/// Returns the value of the monotonic BOOTTIME clock in nanoseconds.
334+
fn since_boottime_ns() -> Option<u64> {
335+
let duration = Duration::try_from(clock_gettime(ClockId::Boottime)).ok()?;
336+
u64::try_from(duration.as_nanos()).ok()
343337
}
344338

345339
/// Locks the context handle. Returns a uniform error if the lock has been poisoned.
@@ -424,6 +418,7 @@ pub mod linux {
424418
use std::{
425419
fs::File,
426420
io::{BufRead, BufReader},
421+
ptr::{self, addr_of_mut},
427422
sync::atomic::{fence, AtomicU64, Ordering},
428423
};
429424

@@ -447,19 +442,32 @@ pub mod linux {
447442
|| name.starts_with("[anon:OTEL_CTX]")
448443
}
449444

450-
/// Reads the signature from a memory address to verify it's an OTEL_CTX mapping. This also
451-
/// establish proper synchronization/memory ordering through atomics since the reader is
452-
/// the same process in this test setup.
453-
fn verify_signature_at(addr: usize) -> bool {
454-
let ptr: *mut u64 = std::ptr::with_exposed_provenance_mut(addr);
455-
// Safety: We're reading from our own process memory at an address
456-
// we found in /proc/self/maps. This should be safe as long as the
457-
// mapping exists and has read permissions.
445+
/// Establishes proper synchronization/memory ordering with the writer, checking that
446+
/// `monotonic_published_at` is not zero and that the signature is correct. Returns a
447+
/// pointer to the initialized header in case of success.
448+
fn verify_mapping_at(addr: usize) -> anyhow::Result<*const MappingHeader> {
449+
let header: *mut MappingHeader = ptr::with_exposed_provenance_mut(addr);
450+
// Safety: we're reading from our own process memory at an address we found in
451+
// /proc/self/maps. This should be safe as long as the mapping exists and has read
452+
// permissions.
458453
//
459-
// For the alignment constraint of `AtomicU64`, see [atomic-u64-alignment].
460-
let signature = unsafe { AtomicU64::from_ptr(ptr).load(Ordering::Relaxed) };
454+
// For the alignment constraint of `AtomicU64`, see [^atomic-u64-alignment].
455+
let published_at = unsafe {
456+
AtomicU64::from_ptr(addr_of_mut!((*header).monotonic_published_at_ns))
457+
.load(Ordering::Relaxed)
458+
};
459+
ensure!(published_at != 0, "monotonic_published_at_ns is zero: couldn't read an initialized header in the candidate mapping");
461460
fence(Ordering::SeqCst);
462-
&signature.to_ne_bytes() == super::SIGNATURE
461+
462+
// Safety: if `monotonic_published_at_ns` is non-zero, the header is properly
463+
// initialized and thus readable.
464+
let signature = unsafe { &header.as_ref().unwrap().signature };
465+
ensure!(
466+
signature == super::SIGNATURE,
467+
"invalid signature in the candidate mapping"
468+
);
469+
470+
Ok(header)
463471
}
464472

465473
/// Find the OTEL_CTX mapping in /proc/self/maps
@@ -485,17 +493,15 @@ pub mod linux {
485493
/// Read the process context from the current process.
486494
///
487495
/// This searches `/proc/self/maps` for an OTEL_CTX mapping and decodes its contents.
488-
pub fn read_process_context() -> anyhow::Result<MappingHeader> {
496+
///
497+
/// **CAUTION**: Note that the reader implemented in this module, as well as the helper
498+
/// functions it relies on, are specialized for tests (for example, it doesn't check for
499+
/// concurrent writers after reading the header, because we know they can't be). Do not
500+
/// extract or use as it is as a generic Rust OTel process context reader.
501+
fn read_process_context() -> anyhow::Result<MappingHeader> {
489502
let mapping_addr = find_otel_mapping()?;
490-
let header_ptr = mapping_addr as *const MappingHeader;
491-
492-
// Note: verifying the signature ensures proper synchronization
493-
ensure!(
494-
verify_signature_at(mapping_addr),
495-
"verification of the signature failed"
496-
);
497-
498-
// Safety: we found this address in /proc/self/maps and verified the signature
503+
let header_ptr = verify_mapping_at(mapping_addr)?;
504+
// Safety: the pointer returned by `verify_mapping_at` points to an initialized header
499505
Ok(unsafe { std::ptr::read(header_ptr) })
500506
}
501507

@@ -524,10 +530,13 @@ pub mod linux {
524530
header.payload_size == payload_v1.len() as u32,
525531
"wrong payload size"
526532
);
527-
assert!(header.published_at_ns > 0, "published_at_ns is zero");
533+
assert!(
534+
header.monotonic_published_at_ns > 0,
535+
"monotonic_published_at_ns is zero"
536+
);
528537
assert!(read_payload == payload_v1.as_bytes(), "payload mismatch");
529538

530-
let published_at_ns_v1 = header.published_at_ns;
539+
let published_at_ns_v1 = header.monotonic_published_at_ns;
531540
// Ensure the clock advances so the updated timestamp is strictly greater
532541
std::thread::sleep(std::time::Duration::from_nanos(10));
533542

@@ -551,7 +560,7 @@ pub mod linux {
551560
"wrong payload size"
552561
);
553562
assert!(
554-
header.published_at_ns > published_at_ns_v1,
563+
header.monotonic_published_at_ns > published_at_ns_v1,
555564
"published_at_ns should be strictly greater after update"
556565
);
557566
assert!(read_payload == payload_v2.as_bytes(), "payload mismatch");

0 commit comments

Comments
 (0)