Skip to content

Commit f1d9b36

Browse files
committed
PR comments
Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
1 parent 583679e commit f1d9b36

File tree

4 files changed

+161
-157
lines changed

4 files changed

+161
-157
lines changed

datadog-ipc/src/shm_stats.rs

Lines changed: 44 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ const MAX_FLUSH_WAIT_ITERS: u32 = 100_000;
8787
/// Spin iterations before yielding to the OS scheduler.
8888
const YIELD_AFTER_SPINS: u32 = 8;
8989

90-
#[inline]
9190
fn bin_for_duration(nanos: i64) -> usize {
9291
if nanos <= 0 {
9392
return 0;
@@ -101,7 +100,6 @@ fn bin_for_duration(nanos: i64) -> usize {
101100
(b as usize).clamp(1, N_BINS - 2)
102101
}
103102

104-
#[inline]
105103
fn bin_representative(bin: usize) -> f64 {
106104
if bin == 0 {
107105
return 0.0;
@@ -140,11 +138,17 @@ struct ShmKeyHeader {
140138
/// valid for `AtomicU64::new(0)`.
141139
#[repr(C, align(8))]
142140
struct ShmStats {
141+
/// Total number of spans in this group.
143142
hits: AtomicU64,
143+
/// Number of error spans in this group.
144144
errors: AtomicU64,
145+
/// Sum of all span durations (nanoseconds).
145146
duration_sum: AtomicU64,
147+
/// Number of top-level spans (service-entry or measured).
146148
top_level_hits: AtomicU64,
149+
/// Histogram bins for non-error span durations.
147150
ok_bins: [AtomicU64; N_BINS],
151+
/// Histogram bins for error span durations.
148152
error_bins: [AtomicU64; N_BINS],
149153
}
150154

@@ -171,15 +175,22 @@ struct ShmBucketHeader {
171175
/// Global SHM header (first page of the mapping).
172176
#[repr(C)]
173177
struct ShmHeader {
178+
/// Layout version; checked by [`ShmSpanConcentrator::open`]. Mismatch returns an error.
174179
version: u32,
180+
/// Width of each time bucket in nanoseconds (e.g. 10 s = 10_000_000_000).
175181
bucket_size_nanos: u64,
182+
/// Number of aggregation slots per bucket (hash-table capacity).
176183
slot_count: u32,
184+
/// Byte size of one full bucket region (header + slots + string pool), page-aligned.
177185
bucket_region_size: u32,
186+
/// Byte capacity of the per-bucket string pool.
178187
string_pool_size: u32,
188+
/// Index (0 or 1) of the bucket currently being written to by PHP workers.
179189
active_idx: AtomicU8,
180190
/// Set to 1 by the sidecar when workers should re-open the SHM at the
181191
/// same path (a new, larger mapping has been created there).
182192
please_reload: AtomicU8,
193+
/// Monotonic counter incremented on every successful flush, used as the stats sequence number.
183194
flush_seq: AtomicU64,
184195
}
185196

@@ -190,13 +201,13 @@ fn bucket_hdr_size() -> usize {
190201
}
191202

192203
fn pool_start_within_bucket(slot_count: u32) -> usize {
193-
bucket_hdr_size() + slot_count as usize * size_of::<ShmEntry>()
204+
bucket_hdr_size() + (slot_count as usize) * size_of::<ShmEntry>()
194205
}
195206

196207
fn aligned_bucket_region(slot_count: u32, string_pool_size: u32) -> usize {
197208
let raw = pool_start_within_bucket(slot_count) + string_pool_size as usize;
198209
let page = page_size::get();
199-
((raw + page - 1) / page) * page
210+
raw.div_ceil(page) * page
200211
}
201212

202213
fn total_shm_size(slot_count: u32, string_pool_size: u32) -> usize {
@@ -224,7 +235,6 @@ unsafe fn pool_base(base: *const u8, bkt_start: usize, slot_count: u32) -> *cons
224235
base.add(bkt_start + pool_start_within_bucket(slot_count))
225236
}
226237

227-
#[inline]
228238
unsafe fn sref_str<'a>(pool: *const u8, sr: StringRef) -> &'a str {
229239
if sr.len == 0 {
230240
return "";
@@ -271,14 +281,14 @@ unsafe fn alloc_str(pool: *mut u8, cursor: &AtomicU32, pool_size: u32, s: &str)
271281
return StringRef::default();
272282
}
273283
if cursor
274-
.compare_exchange_weak(old, new, AcqRel, Relaxed)
284+
.compare_exchange_weak(old, new, Relaxed, Relaxed)
275285
.is_ok()
276286
{
277287
std::ptr::copy_nonoverlapping(s.as_ptr(), pool.add(old as usize), len as usize);
278288
return StringRef { offset: old, len };
279289
}
280290
spins += 1;
281-
if spins % YIELD_AFTER_SPINS == 0 {
291+
if spins.is_multiple_of(YIELD_AFTER_SPINS) {
282292
thread::yield_now();
283293
} else {
284294
hint::spin_loop();
@@ -338,21 +348,14 @@ impl OwnedShmSpanInput {
338348
/// Shared-memory span stats concentrator.
339349
///
340350
/// Created once by the sidecar; opened (read-write) by each PHP worker.
351+
#[derive(Clone)]
341352
pub struct ShmSpanConcentrator {
342353
mem: Arc<MappedMem<NamedShmHandle>>,
343354
}
344355

345356
unsafe impl Send for ShmSpanConcentrator {}
346357
unsafe impl Sync for ShmSpanConcentrator {}
347358

348-
impl Clone for ShmSpanConcentrator {
349-
fn clone(&self) -> Self {
350-
ShmSpanConcentrator {
351-
mem: Arc::clone(&self.mem),
352-
}
353-
}
354-
}
355-
356359
impl ShmSpanConcentrator {
357360
/// Create a new SHM concentrator (sidecar side).
358361
///
@@ -374,9 +377,9 @@ impl ShmSpanConcentrator {
374377
}
375378

376379
let handle = NamedShmHandle::create(path, total)?;
377-
let mem = handle.map()?;
380+
let mut mem = handle.map()?;
378381

379-
let base = mem.as_slice().as_ptr() as *mut u8;
382+
let base = mem.as_slice_mut().as_mut_ptr();
380383
unsafe {
381384
// fresh mmap. Initialized to zero.
382385
let hdr = &mut *(base as *mut ShmHeader);
@@ -467,28 +470,29 @@ impl ShmSpanConcentrator {
467470
SLOT_EMPTY => {
468471
if entry
469472
.key_hash
470-
.compare_exchange(SLOT_EMPTY, SLOT_INIT, AcqRel, Relaxed)
473+
.compare_exchange(SLOT_EMPTY, SLOT_INIT, Acquire, Relaxed)
471474
.is_ok()
472475
{
473476
unsafe {
474477
Self::write_key(entry, input, pool, &bh.string_cursor, pool_size);
475478
}
476-
fence(Release);
479+
// Release on the store synchronises the key write with any
480+
// subsequent Acquire load of the hash — no separate fence needed.
477481
entry.key_hash.store(hash, Release);
478482
Self::update_stats(entry, input);
479483
done = true;
480484
break;
481485
}
482486
spins += 1;
483-
if spins % YIELD_AFTER_SPINS == 0 {
487+
if spins.is_multiple_of(YIELD_AFTER_SPINS) {
484488
thread::yield_now();
485489
} else {
486490
hint::spin_loop();
487491
}
488492
}
489493
SLOT_INIT => {
490494
spins += 1;
491-
if spins % YIELD_AFTER_SPINS == 0 {
495+
if spins.is_multiple_of(YIELD_AFTER_SPINS) {
492496
thread::yield_now();
493497
} else {
494498
hint::spin_loop();
@@ -611,6 +615,7 @@ impl ShmSpanConcentrator {
611615
///
612616
/// * `force = false` – swap the active bucket, drain the previously-active one.
613617
/// * `force = true` – drain both buckets without swapping (shutdown).
618+
#[allow(clippy::too_many_arguments)]
614619
pub fn flush(
615620
&self,
616621
force: bool,
@@ -656,24 +661,28 @@ impl ShmSpanConcentrator {
656661
let bh = unsafe { bucket_header(base, bkt_start) };
657662

658663
// Wait for in-flight writers (bounded to tolerate dead workers).
664+
// The intermediate loads only need Relaxed; a single fence(Acquire) after
665+
// the loop synchronizes with the Release in each writer's in_flight.fetch_sub,
666+
// and covers all subsequent SHM reads in this function and callees.
659667
let mut spins = 0u32;
660-
while bh.in_flight.load(Acquire) != 0 && spins < MAX_FLUSH_WAIT_ITERS {
668+
while bh.in_flight.load(Relaxed) != 0 && spins < MAX_FLUSH_WAIT_ITERS {
661669
spins += 1;
662-
if spins % YIELD_AFTER_SPINS == 0 {
670+
if spins.is_multiple_of(YIELD_AFTER_SPINS) {
663671
thread::yield_now();
664672
} else {
665673
hint::spin_loop();
666674
}
667675
}
676+
fence(Acquire);
668677

669-
let bucket_start_ts = bh.start_nanos.load(Acquire);
678+
let bucket_start_ts = bh.start_nanos.load(Relaxed);
670679
let pool = unsafe { pool_base(base, bkt_start, slot_count) };
671680

672681
let mut grouped: Vec<pb::ClientGroupedStats> = Vec::new();
673682

674683
for slot in 0..slot_count as usize {
675684
let entry = unsafe { entry_ref(base, bkt_start, slot) };
676-
let h = entry.key_hash.load(Acquire);
685+
let h = entry.key_hash.load(Relaxed);
677686
if h == SLOT_EMPTY || h == SLOT_INIT {
678687
continue;
679688
}
@@ -740,16 +749,17 @@ impl ShmSpanConcentrator {
740749
})
741750
.collect();
742751

743-
let hits = s.hits.load(Acquire);
744-
let errors = s.errors.load(Acquire);
745-
let duration_sum = s.duration_sum.load(Acquire);
746-
let top_level_hits = s.top_level_hits.load(Acquire);
752+
// fence(Acquire) in drain_bucket's spin-wait loop already synchronises these reads.
753+
let hits = s.hits.load(Relaxed);
754+
let errors = s.errors.load(Relaxed);
755+
let duration_sum = s.duration_sum.load(Relaxed);
756+
let top_level_hits = s.top_level_hits.load(Relaxed);
747757

748758
let mut ok_sketch = DDSketch::default();
749759
let mut err_sketch = DDSketch::default();
750760
for bin in 0..N_BINS {
751-
let ok_count = s.ok_bins[bin].load(Acquire);
752-
let err_count = s.error_bins[bin].load(Acquire);
761+
let ok_count = s.ok_bins[bin].load(Relaxed);
762+
let err_count = s.error_bins[bin].load(Relaxed);
753763
let rep = bin_representative(bin);
754764
if ok_count > 0 {
755765
let _ = ok_sketch.add_with_count(rep, ok_count as f64);
@@ -837,6 +847,7 @@ mod tests {
837847
}
838848

839849
#[test]
850+
#[cfg_attr(miri, ignore)]
840851
fn test_add_and_flush() {
841852
let c = ShmSpanConcentrator::create(
842853
test_path(),
@@ -852,6 +863,7 @@ mod tests {
852863
}
853864

854865
#[test]
866+
#[cfg_attr(miri, ignore)]
855867
fn test_open_from_worker() {
856868
let path = test_path();
857869
let creator = ShmSpanConcentrator::create(
@@ -868,6 +880,7 @@ mod tests {
868880
}
869881

870882
#[test]
883+
#[cfg_attr(miri, ignore)]
871884
fn test_needs_reload() {
872885
let path = test_path();
873886
let creator = ShmSpanConcentrator::create(

datadog-sidecar/src/service/session_info.rs

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::service::{InstanceId, QueueId, RuntimeInfo};
2424
///
2525
/// It contains a list of runtimes, session configuration, tracer configuration, and log guards.
2626
/// It also has methods to manage the runtimes and configurations.
27-
#[derive(Default)]
27+
#[derive(Default, Clone)]
2828
pub(crate) struct SessionInfo {
2929
runtimes: Arc<Mutex<HashMap<String, RuntimeInfo>>>,
3030
pub(crate) session_config: Arc<Mutex<Option<libdd_telemetry::config::Config>>>,
@@ -48,31 +48,6 @@ pub(crate) struct SessionInfo {
4848
pub(crate) stats_config: Arc<Mutex<Option<crate::service::stats_flusher::StatsConfig>>>,
4949
}
5050

51-
impl Clone for SessionInfo {
52-
fn clone(&self) -> Self {
53-
SessionInfo {
54-
runtimes: self.runtimes.clone(),
55-
session_config: self.session_config.clone(),
56-
debugger_config: self.debugger_config.clone(),
57-
tracer_config: self.tracer_config.clone(),
58-
dogstatsd: self.dogstatsd.clone(),
59-
remote_config_options: self.remote_config_options.clone(),
60-
agent_infos: self.agent_infos.clone(),
61-
remote_config_interval: self.remote_config_interval.clone(),
62-
#[cfg(windows)]
63-
remote_config_notify_function: self.remote_config_notify_function.clone(),
64-
#[cfg(windows)]
65-
process_handle: self.process_handle.clone(),
66-
log_guard: self.log_guard.clone(),
67-
session_id: self.session_id.clone(),
68-
pid: self.pid.clone(),
69-
remote_config_enabled: self.remote_config_enabled.clone(),
70-
process_tags: self.process_tags.clone(),
71-
stats_config: self.stats_config.clone(),
72-
}
73-
}
74-
}
75-
7651
impl SessionInfo {
7752
/// Returns the `RuntimeInfo` for a given runtime ID.
7853
///

datadog-sidecar/src/service/stats_flusher.rs

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -275,25 +275,26 @@ pub async fn run_stats_flush_loop(
275275
0
276276
};
277277
if idle_secs >= IDLE_REMOVE_SECS {
278-
let s = map_guard.remove(&map_key).unwrap();
279-
info!(
280-
"Removing idle SHM span concentrator for env={} version={} \
281-
(idle for {idle_secs}s)",
282-
map_key.env, map_key.version,
283-
);
284-
let uri = stats_uri(&s.endpoint);
285-
let ep = s.endpoint.clone();
286-
let payload = s.concentrator.flush(
287-
true,
288-
"",
289-
&map_key.env,
290-
&map_key.version,
291-
"",
292-
&s.tracer_version,
293-
&s.runtime_id,
294-
"",
295-
);
296-
Some((payload, uri, ep))
278+
map_guard.remove(&map_key).map(|s| {
279+
info!(
280+
"Removing idle SHM span concentrator for env={} version={} \
281+
(idle for {idle_secs}s)",
282+
map_key.env, map_key.version,
283+
);
284+
let uri = stats_uri(&s.endpoint);
285+
let ep = s.endpoint.clone();
286+
let payload = s.concentrator.flush(
287+
true,
288+
"",
289+
&map_key.env,
290+
&map_key.version,
291+
"",
292+
&s.tracer_version,
293+
&s.runtime_id,
294+
"",
295+
);
296+
(payload, uri, ep)
297+
})
297298
} else {
298299
None
299300
}

0 commit comments

Comments
 (0)