Skip to content

Commit cf3d621

Browse files
committed
Fix ftruncate() causing SIGBUS on /dev/shm full
Add dogstatsd stats Add hostname Reduce flush mutex contention Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
1 parent b42be6c commit cf3d621

11 files changed

Lines changed: 376 additions & 186 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datadog-ipc/src/platform/mem_handle.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::platform::{mmap_handle, munmap_handle, OwnedFileHandle, PlatformHandl
77
use libdd_tinybytes::UnderlyingBytes;
88
use serde::{Deserialize, Serialize};
99
use std::{ffi::CString, io, ptr::NonNull};
10+
use std::os::fd::AsRawFd;
1011

1112
#[derive(Clone, Serialize, Deserialize, Debug)]
1213
pub struct ShmHandle {
@@ -87,10 +88,14 @@ where
8788
unsafe {
8889
self.set_mapping_size(size)?;
8990
}
90-
nix::unistd::ftruncate(
91-
self.get_shm().handle.as_owned_fd()?,
92-
self.get_shm().size as libc::off_t,
93-
)?;
91+
let new_size = self.get_shm().size as libc::off_t;
92+
let fd = self.get_shm().handle.as_owned_fd()?;
93+
// Use fallocate on Linux to eagerly commit the new pages: ENOSPC at resize time is
94+
// recoverable; a later SIGBUS mid-execution is not.
95+
#[cfg(target_os = "linux")]
96+
nix::fcntl::fallocate(fd.as_raw_fd(), nix::fcntl::FallocateFlags::empty(), 0, new_size)?;
97+
#[cfg(not(target_os = "linux"))]
98+
nix::unistd::ftruncate(&fd, new_size)?;
9499
Ok(())
95100
}
96101
/// # Safety

datadog-ipc/src/platform/unix/mem_handle.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use io_lifetimes::OwnedFd;
88
use libc::{chmod, off_t};
99
use nix::errno::Errno;
1010
use nix::fcntl::{open, OFlag};
11+
#[cfg(target_os = "linux")]
12+
use nix::fcntl::{fallocate, FallocateFlags};
1113
use nix::sys::mman::{self, mmap, munmap, MapFlags, ProtFlags};
1214
use nix::sys::stat::Mode;
1315
use nix::unistd::{fchown, ftruncate, mkdir, unlink, Uid};
@@ -163,6 +165,11 @@ impl NamedShmHandle {
163165

164166
pub fn create_mode(path: CString, size: usize, mode: Mode) -> io::Result<NamedShmHandle> {
165167
let fd = shm_open(path.as_bytes(), OFlag::O_CREAT | OFlag::O_RDWR, mode)?;
168+
// Use fallocate on Linux to eagerly commit pages: if /dev/shm is full we get ENOSPC
169+
// here (recoverable) rather than SIGBUS mid-execution when a worker writes a slot.
170+
#[cfg(target_os = "linux")]
171+
fallocate(fd.as_raw_fd(), FallocateFlags::empty(), 0, size as off_t)?;
172+
#[cfg(not(target_os = "linux"))]
166173
ftruncate(&fd, size as off_t)?;
167174
if let Some(uid) = shm_owner_uid() {
168175
let _ = fchown(fd.as_raw_fd(), Some(Uid::from_raw(uid)), None);

datadog-ipc/src/shm_stats.rs

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -619,13 +619,11 @@ impl ShmSpanConcentrator {
619619
pub fn flush(
620620
&self,
621621
force: bool,
622-
hostname: &str,
623-
env: &str,
624-
version: &str,
625-
service: &str,
626-
tracer_version: &str,
627-
runtime_id: &str,
628-
container_id: &str,
622+
hostname: String,
623+
env: String,
624+
version: String,
625+
service: String,
626+
runtime_id: String,
629627
) -> Option<pb::ClientStatsPayload> {
630628
let stat_buckets = self.drain_buckets(force);
631629
if stat_buckets.is_empty() {
@@ -634,15 +632,12 @@ impl ShmSpanConcentrator {
634632

635633
let seq = self.header().flush_seq.fetch_add(1, Relaxed);
636634
Some(pb::ClientStatsPayload {
637-
hostname: hostname.to_owned(),
638-
env: env.to_owned(),
639-
version: version.to_owned(),
635+
hostname,
636+
env,
637+
version,
640638
stats: stat_buckets,
641-
lang: "php".to_owned(),
642-
tracer_version: tracer_version.to_owned(),
643-
runtime_id: runtime_id.to_owned(),
644-
service: service.to_owned(),
645-
container_id: container_id.to_owned(),
639+
runtime_id,
640+
service,
646641
sequence: seq,
647642
..Default::default()
648643
})
@@ -796,7 +791,7 @@ impl ShmSpanConcentrator {
796791
.grpc_status_code
797792
.map(|c| c.to_string())
798793
.unwrap_or_default(),
799-
service_source: String::new(),
794+
service_source: read_str!(f.service_source),
800795
span_derived_primary_tags: vec![],
801796
}
802797
}
@@ -858,7 +853,7 @@ mod tests {
858853
.unwrap();
859854
c.add_span(&span("svc", "res", 1_000_000));
860855
c.add_span(&span("svc", "res", 2_000_000));
861-
let bytes = c.flush(true, "h", "e", "v", "s", "t", "r", "c");
856+
let bytes = c.flush(true, "h", "e", "v", "s", "r");
862857
assert!(bytes.is_some());
863858
}
864859

@@ -875,7 +870,7 @@ mod tests {
875870
.unwrap();
876871
let worker = ShmSpanConcentrator::open(path.as_c_str()).unwrap();
877872
worker.add_span(&span("svc2", "res2", 5_000_000));
878-
let bytes = creator.flush(true, "h", "", "", "", "t", "", "");
873+
let bytes = creator.flush(true, "h", "", "", "", "r");
879874
assert!(bytes.is_some());
880875
}
881876

@@ -915,7 +910,7 @@ mod tests {
915910
DEFAULT_STRING_POOL_BYTES,
916911
)
917912
.unwrap();
918-
assert!(c.flush(false, "h", "e", "v", "s", "t", "r", "c").is_none());
913+
assert!(c.flush(false, "h", "e", "v", "s", "r").is_none());
919914
}
920915

921916
#[test]

datadog-sidecar-ffi/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,8 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config(
626626
remote_config_enabled: bool,
627627
is_fork: bool,
628628
process_tags: &libdd_common_ffi::Vec<Tag>,
629+
hostname: ffi::CharSlice,
630+
root_service: ffi::CharSlice,
629631
) -> MaybeError {
630632
let session_id_str: String = session_id.to_utf8_lossy().into();
631633
let session_config = SessionConfig {
@@ -665,6 +667,8 @@ pub unsafe extern "C" fn ddog_sidecar_session_set_config(
665667
process_tags: process_tags.to_vec(),
666668
peer_tag_keys: vec![],
667669
span_kinds_stats_computed: vec![],
670+
hostname: hostname.to_utf8_lossy().into(),
671+
root_service: root_service.to_utf8_lossy().into(),
668672
};
669673
#[cfg(unix)]
670674
try_c!(blocking::set_session_config(

datadog-sidecar/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ datadog-sidecar-macros = { path = "../datadog-sidecar-macros" }
2323
libdd-telemetry = { path = "../libdd-telemetry", features = ["tracing"] }
2424
libdd-data-pipeline = { path = "../libdd-data-pipeline" }
2525
libdd-trace-utils = { path = "../libdd-trace-utils" }
26+
libdd-trace-stats = { path = "../libdd-trace-stats" }
2627
datadog-remote-config = { path = "../datadog-remote-config" , features = ["live-debugger"]}
2728
datadog-live-debugger = { path = "../datadog-live-debugger" }
2829
libdd-crashtracker = { path = "../libdd-crashtracker" }

datadog-sidecar/src/service/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ pub struct SessionConfig {
6868
pub process_tags: Vec<Tag>,
6969
pub peer_tag_keys: Vec<String>,
7070
pub span_kinds_stats_computed: Vec<String>,
71+
/// Tracer-configured hostname (from `DD_HOSTNAME`). Empty means "not configured".
72+
pub hostname: String,
73+
/// Process-level service name (from `DD_SERVICE`), used as the stats concentrator key.
74+
pub root_service: String,
7175
}
7276

7377
#[derive(Debug, Deserialize, Serialize)]

datadog-sidecar/src/service/session_info.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,12 @@ impl SessionInfo {
166166
self.dogstatsd.lock_or_panic()
167167
}
168168

169+
/// Clone the Arc wrapping the DogStatsD client so it can be shared with long-lived tasks
170+
/// (e.g. the stats flush loop) without creating a new UDP socket.
171+
pub(crate) fn clone_dogstatsd(&self) -> Arc<Mutex<Option<libdd_dogstatsd_client::Client>>> {
172+
self.dogstatsd.clone()
173+
}
174+
169175
pub(crate) fn configure_dogstatsd<F>(&self, f: F)
170176
where
171177
F: FnOnce(&mut Option<libdd_dogstatsd_client::Client>),

datadog-sidecar/src/service/sidecar_server.rs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::service::debugger_diagnostics_bookkeeper::{
3535
};
3636
use crate::service::exception_hash_rate_limiter::EXCEPTION_HASH_LIMITER;
3737
use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs};
38+
use crate::service::stats_flusher::{ensure_stats_concentrator, flush_all_stats_now, get_hostname, stats_endpoint, ConcentratorKey, SpanConcentratorState, StatsConfig};
3839
use crate::service::tracing::trace_flusher::TraceFlusherStats;
3940
use crate::tokio_util::run_or_spawn_shared;
4041
use datadog_live_debugger::sender::{agent_info_supports_debugger_v2_endpoint, DebuggerType};
@@ -103,14 +104,7 @@ pub struct SidecarServer {
103104
/// Diagnostics bookkeeper
104105
debugger_diagnostics_bookkeeper: Arc<DebuggerDiagnosticsBookkeeper>,
105106
/// Per-env&version SHM span concentrators (global across all sessions).
106-
pub(crate) span_concentrators: Arc<
107-
Mutex<
108-
HashMap<
109-
crate::service::stats_flusher::ConcentratorKey,
110-
crate::service::stats_flusher::SpanConcentratorState,
111-
>,
112-
>,
113-
>,
107+
pub(crate) span_concentrators: Arc<Mutex<HashMap<ConcentratorKey, Arc<SpanConcentratorState>>>>,
114108
}
115109

116110
/// Per-connection handler wrapper that tracks sessions/instances for cleanup on disconnect.
@@ -635,10 +629,14 @@ impl SidecarInterface for ConnectionSidecarHandler {
635629
});
636630
*session.agent_infos.lock_or_panic() = Some(agent_info);
637631
}
638-
*session.stats_config.lock_or_panic() = Some(crate::service::stats_flusher::StatsConfig {
639-
endpoint: config.endpoint.clone(),
640-
tracer_version: config.tracer_version.clone(),
632+
*session.stats_config.lock_or_panic() = Some(StatsConfig {
633+
endpoint: stats_endpoint(&config.endpoint).unwrap_or_else(|| config.endpoint.clone()),
641634
flush_interval: config.flush_interval,
635+
hostname: if config.hostname.is_empty() { get_hostname() } else { config.hostname.clone() },
636+
process_tags: config.process_tags.iter().map(|t| t.to_string()).collect::<Vec<_>>().join(","),
637+
root_service: config.root_service.clone(),
638+
language: config.language.clone(),
639+
tracer_version: config.tracer_version.clone(),
642640
});
643641

644642
session.set_remote_config_invariants(ConfigOptions {
@@ -848,10 +846,18 @@ impl SidecarInterface for ConnectionSidecarHandler {
848846
debug!("Registered remote config metadata: instance {instance_id:?}, queue_id: {queue_id:?}, service: {service_name}, env: {env_name}, version: {app_version}");
849847

850848
let session = self.server.get_session(&instance_id.session_id);
851-
let concentrator_guard = crate::service::stats_flusher::ensure_stats_concentrator(
849+
let concentrator_service = session
850+
.stats_config
851+
.lock()
852+
.unwrap_or_else(|e| e.into_inner())
853+
.as_ref()
854+
.map(|c| c.root_service.clone())
855+
.unwrap_or_default();
856+
let concentrator_guard = ensure_stats_concentrator(
852857
&self.server.span_concentrators,
853858
&env_name,
854859
&app_version,
860+
&concentrator_service,
855861
&instance_id.session_id,
856862
&session,
857863
);
@@ -920,7 +926,16 @@ impl SidecarInterface for ConnectionSidecarHandler {
920926
version: String,
921927
span: datadog_ipc::shm_stats::OwnedShmSpanInput,
922928
) {
923-
let map_key = crate::service::stats_flusher::ConcentratorKey { env, version };
929+
let session_id = self.session_id.get().map(|s| s.as_str()).unwrap_or("");
930+
let session = self.server.get_session(session_id);
931+
let service = session
932+
.stats_config
933+
.lock()
934+
.unwrap_or_else(|e| e.into_inner())
935+
.as_ref()
936+
.map(|c| c.root_service.clone())
937+
.unwrap_or_default();
938+
let map_key = ConcentratorKey { env, version, root_service: service };
924939
let guard = self
925940
.server
926941
.span_concentrators
@@ -938,7 +953,7 @@ impl SidecarInterface for ConnectionSidecarHandler {
938953
if let Err(e) = tokio::spawn(async move { flusher.flush().await }).await {
939954
error!("Failed flushing traces: {e:?}");
940955
}
941-
crate::service::stats_flusher::flush_all_stats_now(&self.server.span_concentrators).await;
956+
flush_all_stats_now(&self.server.span_concentrators).await;
942957
}
943958

944959
async fn set_test_session_token(&self, _peer: PeerCredentials, token: String) {

0 commit comments

Comments
 (0)