Skip to content

Commit c6ef98e

Browse files
Leiyksbwoebi
andauthored
feat(sidecar): add thread mode as fallback connection for restricted environments (#1447)
# What does this PR do? Implements a thread-based sidecar connection mode as an alternative to the existing subprocess mode. When enabled, the sidecar runs as a Tokio thread within the PHP process rather than as a separate subprocess. **Key implementation details:** - New `thread` connection mode alongside existing `subprocess` mode - Uses an abstract Unix socket (Linux) or named pipe (Windows) for IPC between the PHP-FPM master thread listener and worker processes - The master UID is encoded in the socket/pipe name to support cross-user scenarios (e.g. FPM master as root, workers as `www-data`) - SHM open mode is configurable via a global hook (`set_shm_open_mode`) to support cross-user shared memory access via `fchown`/`SO_PEERCRED` - Orphan promotion: if the master's thread listener is unavailable, a worker can promote itself to master - Uses `current_thread` Tokio runtime to avoid spawning additional OS threads beyond the single listener thread - Windows support via named pipes (where subprocess mode had limitations) # How to test the change? Tested via the `dd-trace-php` integration test suite: - `SidecarThreadModeTest`: verifies multi-request tracing works in thread mode - `SidecarThreadModeRootTest`: verifies cross-user SHM access when FPM master runs as root - `.phpt` unit tests for connection mode configuration and auto-fallback behavior Co-authored-by: bob.weinand <bob.weinand@datadoghq.com>
1 parent b09f6af commit c6ef98e

11 files changed

Lines changed: 687 additions & 42 deletions

File tree

datadog-ipc/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ tracing-subscriber = { version = "0.3.22" }
4444
spawn_worker = { path = "../spawn_worker" }
4545

4646
[target.'cfg(not(windows))'.dependencies]
47-
nix = { version = "0.29", features = ["fs", "mman", "process", "poll", "socket"] }
47+
nix = { version = "0.29", features = ["fs", "mman", "process", "poll", "socket", "user"] }
4848
sendfd = { version = "0.4", features = ["tokio"] }
4949
tokio = { version = "1.23", features = ["sync", "io-util", "signal"] }
5050

datadog-ipc/src/platform/unix/mem_handle.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,19 @@ use nix::errno::Errno;
1010
use nix::fcntl::{open, OFlag};
1111
use nix::sys::mman::{self, mmap, munmap, MapFlags, ProtFlags};
1212
use nix::sys::stat::Mode;
13-
use nix::unistd::{ftruncate, mkdir, unlink};
13+
use nix::unistd::{fchown, ftruncate, mkdir, unlink, Uid};
1414
use nix::NixPath;
1515
use std::ffi::{CStr, CString};
1616
use std::fs::File;
1717
use std::io;
1818
use std::num::NonZeroUsize;
1919
use std::os::fd::AsFd;
2020
use std::os::unix::fs::MetadataExt;
21-
use std::sync::atomic::{AtomicI32, Ordering};
21+
use std::os::unix::io::AsRawFd;
22+
use std::sync::atomic::{AtomicI32, AtomicU32, Ordering};
23+
24+
// Sentinel value meaning "no owner UID override"
25+
const NO_OWNER_UID: u32 = u32::MAX;
2226

2327
fn fallback_path<P: ?Sized + NixPath>(name: &P) -> nix::Result<CString> {
2428
name.with_nix_path(|cstr| {
@@ -95,6 +99,21 @@ pub(crate) fn munmap_handle<T: MemoryHandle>(mapped: &mut MappedMem<T>) {
9599

96100
static ANON_SHM_ID: AtomicI32 = AtomicI32::new(0);
97101

102+
static SHM_OWNER_UID: AtomicU32 = AtomicU32::new(NO_OWNER_UID);
103+
104+
pub fn set_shm_owner_uid(uid: u32) {
105+
SHM_OWNER_UID.store(uid, Ordering::Relaxed);
106+
}
107+
108+
fn shm_owner_uid() -> Option<u32> {
109+
let uid = SHM_OWNER_UID.load(Ordering::Relaxed);
110+
if uid == NO_OWNER_UID {
111+
None
112+
} else {
113+
Some(uid)
114+
}
115+
}
116+
98117
impl ShmHandle {
99118
#[cfg(target_os = "linux")]
100119
fn open_anon_shm(name: &str) -> anyhow::Result<OwnedFd> {
@@ -145,6 +164,9 @@ impl NamedShmHandle {
145164
pub fn create_mode(path: CString, size: usize, mode: Mode) -> io::Result<NamedShmHandle> {
146165
let fd = shm_open(path.as_bytes(), OFlag::O_CREAT | OFlag::O_RDWR, mode)?;
147166
ftruncate(&fd, size as off_t)?;
167+
if let Some(uid) = shm_owner_uid() {
168+
let _ = fchown(fd.as_raw_fd(), Some(Uid::from_raw(uid)), None);
169+
}
148170
Self::new(fd, Some(path), size)
149171
}
150172

datadog-ipc/src/platform/unix/mem_handle_macos.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ use nix::errno::Errno;
99
use nix::fcntl::OFlag;
1010
use nix::sys::mman::{mmap, munmap, shm_open, shm_unlink, MapFlags, ProtFlags};
1111
use nix::sys::stat::Mode;
12-
use nix::unistd::ftruncate;
12+
use nix::unistd::{fchown, ftruncate, Uid};
1313
use std::ffi::{CStr, CString};
1414
use std::io;
1515
use std::num::NonZeroUsize;
1616
use std::os::fd::{AsFd, OwnedFd};
17-
use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering};
17+
use std::os::unix::io::AsRawFd;
18+
use std::sync::atomic::{AtomicI32, AtomicU32, AtomicUsize, Ordering};
1819

1920
const MAPPING_MAX_SIZE: usize = 1 << 17; // 128 MiB ought to be enough for everybody?
2021
const NOT_COMMITTED: usize = 1 << (usize::BITS - 1);
@@ -69,6 +70,23 @@ pub(crate) fn munmap_handle<T: MemoryHandle>(mapped: &MappedMem<T>) {
6970

7071
static ANON_SHM_ID: AtomicI32 = AtomicI32::new(0);
7172

73+
const NO_OWNER_UID: u32 = u32::MAX;
74+
75+
static SHM_OWNER_UID: AtomicU32 = AtomicU32::new(NO_OWNER_UID);
76+
77+
pub fn set_shm_owner_uid(uid: u32) {
78+
SHM_OWNER_UID.store(uid, Ordering::Relaxed);
79+
}
80+
81+
fn shm_owner_uid() -> Option<u32> {
82+
let uid = SHM_OWNER_UID.load(Ordering::Relaxed);
83+
if uid == NO_OWNER_UID {
84+
None
85+
} else {
86+
Some(uid)
87+
}
88+
}
89+
7290
impl ShmHandle {
7391
pub fn new(size: usize) -> anyhow::Result<ShmHandle> {
7492
let path = format!(
@@ -112,6 +130,9 @@ impl NamedShmHandle {
112130
truncate?;
113131
}
114132
}
133+
if let Some(uid) = shm_owner_uid() {
134+
let _ = fchown(fd.as_raw_fd(), Some(Uid::from_raw(uid)), None);
135+
}
115136
Self::new(fd, Some(path), size)
116137
}
117138

datadog-ipc/src/platform/unix/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ pub(crate) use mem_handle_macos::*;
1919
#[cfg(not(target_os = "macos"))]
2020
mod mem_handle;
2121
#[cfg(not(target_os = "macos"))]
22+
pub use mem_handle::set_shm_owner_uid;
23+
#[cfg(not(target_os = "macos"))]
2224
pub(crate) use mem_handle::*;
25+
#[cfg(target_os = "macos")]
26+
pub use mem_handle_macos::set_shm_owner_uid;
2327

2428
#[no_mangle]
2529
#[cfg(polyfill_glibc_memfd)]

datadog-sidecar-ffi/src/lib.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ use std::slice;
6060
use std::sync::Arc;
6161
use std::time::Duration;
6262

63+
use datadog_sidecar::setup::{connect_to_master, MasterListener};
64+
6365
#[no_mangle]
6466
#[cfg(target_os = "windows")]
6567
pub extern "C" fn ddog_setup_crashtracking(
@@ -311,6 +313,46 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) -
311313
MaybeError::None
312314
}
313315

316+
#[no_mangle]
317+
pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError {
318+
let cfg = datadog_sidecar::config::FromEnv::config();
319+
#[cfg(unix)]
320+
datadog_sidecar::set_sidecar_master_pid(pid as u32);
321+
try_c!(MasterListener::start(pid, cfg));
322+
323+
MaybeError::None
324+
}
325+
326+
#[no_mangle]
327+
pub extern "C" fn ddog_sidecar_connect_worker(
328+
pid: i32,
329+
connection: &mut *mut SidecarTransport,
330+
) -> MaybeError {
331+
let transport = try_c!(connect_to_master(pid));
332+
*connection = Box::into_raw(transport);
333+
334+
MaybeError::None
335+
}
336+
337+
#[no_mangle]
338+
pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError {
339+
try_c!(MasterListener::shutdown());
340+
341+
MaybeError::None
342+
}
343+
344+
#[no_mangle]
345+
pub extern "C" fn ddog_sidecar_is_master_listener_active(pid: i32) -> bool {
346+
MasterListener::is_active(pid)
347+
}
348+
349+
#[no_mangle]
350+
pub extern "C" fn ddog_sidecar_clear_inherited_listener() -> MaybeError {
351+
try_c!(MasterListener::clear_inherited_state());
352+
353+
MaybeError::None
354+
}
355+
314356
#[no_mangle]
315357
pub extern "C" fn ddog_sidecar_ping(transport: &mut Box<SidecarTransport>) -> MaybeError {
316358
try_c!(blocking::ping(transport));

datadog-sidecar/src/entry.rs

Lines changed: 81 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::{
1515
},
1616
time::{Duration, Instant},
1717
};
18-
use tokio::sync::mpsc;
18+
use tokio::sync::{mpsc, oneshot};
1919

2020
#[cfg(unix)]
2121
use crate::crashtracker::crashtracker_unix_socket_path;
@@ -32,7 +32,32 @@ use crate::tracer::SHM_LIMITER;
3232
use crate::watchdog::Watchdog;
3333
use crate::{ddog_daemon_entry_point, setup_daemon_process};
3434

35-
async fn main_loop<L, C, Fut>(listener: L, cancel: Arc<C>) -> io::Result<()>
35+
/// Configuration for main_loop behavior
36+
pub struct MainLoopConfig {
37+
pub enable_ctrl_c_handler: bool,
38+
pub enable_crashtracker: bool,
39+
pub external_shutdown_rx: Option<oneshot::Receiver<()>>,
40+
/// Set to false in thread mode so the worker's UID can be obtained on the
41+
/// first connection and used to fchown the SHM.
42+
pub init_shm_eagerly: bool,
43+
}
44+
45+
impl Default for MainLoopConfig {
46+
fn default() -> Self {
47+
Self {
48+
enable_ctrl_c_handler: true,
49+
enable_crashtracker: true,
50+
external_shutdown_rx: None,
51+
init_shm_eagerly: true,
52+
}
53+
}
54+
}
55+
56+
pub async fn main_loop<L, C, Fut>(
57+
listener: L,
58+
cancel: Arc<C>,
59+
loop_config: MainLoopConfig,
60+
) -> io::Result<()>
3661
where
3762
L: FnOnce(Box<dyn Fn(IpcClient)>) -> Fut,
3863
Fut: Future<Output = io::Result<()>>,
@@ -64,32 +89,49 @@ where
6489
}
6590
});
6691

67-
tokio::spawn(async move {
68-
if let Err(err) = tokio::signal::ctrl_c().await {
69-
tracing::error!("Error setting up signal handler {}", err);
70-
}
71-
tracing::info!("Received Ctrl-C Signal, shutting down");
72-
cancel();
73-
});
92+
if let Some(shutdown_rx) = loop_config.external_shutdown_rx {
93+
let cancel = cancel.clone();
94+
tokio::spawn(async move {
95+
let _ = shutdown_rx.await;
96+
tracing::info!("External shutdown signal received");
97+
cancel();
98+
});
99+
}
100+
101+
if loop_config.enable_ctrl_c_handler {
102+
let cancel = cancel.clone();
103+
tokio::spawn(async move {
104+
if let Err(err) = tokio::signal::ctrl_c().await {
105+
tracing::error!("Error setting up signal handler {}", err);
106+
}
107+
tracing::info!("Received Ctrl-C Signal, shutting down");
108+
cancel();
109+
});
110+
}
74111

75112
#[cfg(unix)]
76-
tokio::spawn(async move {
77-
let socket_path = crashtracker_unix_socket_path();
78-
match libdd_crashtracker::get_receiver_unix_socket(socket_path.to_str().unwrap_or_default())
79-
{
80-
Ok(listener) => loop {
81-
if let Err(e) =
82-
libdd_crashtracker::async_receiver_entry_point_unix_listener(&listener).await
83-
{
84-
tracing::warn!("Got error while receiving crash report: {e}");
85-
}
86-
},
87-
Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"),
88-
}
89-
});
113+
if loop_config.enable_crashtracker {
114+
tokio::spawn(async move {
115+
let socket_path = crashtracker_unix_socket_path();
116+
match libdd_crashtracker::get_receiver_unix_socket(
117+
socket_path.to_str().unwrap_or_default(),
118+
) {
119+
Ok(listener) => loop {
120+
if let Err(e) =
121+
libdd_crashtracker::async_receiver_entry_point_unix_listener(&listener)
122+
.await
123+
{
124+
tracing::warn!("Got error while receiving crash report: {e}");
125+
}
126+
},
127+
Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"),
128+
}
129+
});
130+
}
90131

91-
// Init. Early, before we start listening.
92-
drop(SHM_LIMITER.lock());
132+
if loop_config.init_shm_eagerly {
133+
drop(SHM_LIMITER.lock());
134+
}
93135

94136
let server = SidecarServer::default();
95137

@@ -143,6 +185,19 @@ where
143185
}
144186

145187
pub fn enter_listener_loop<F, L, Fut, C>(acquire_listener: F) -> anyhow::Result<()>
188+
where
189+
F: FnOnce() -> io::Result<(L, C)>,
190+
L: FnOnce(Box<dyn Fn(IpcClient)>) -> Fut,
191+
Fut: Future<Output = io::Result<()>>,
192+
C: Fn() + Sync + Send + 'static,
193+
{
194+
enter_listener_loop_with_config(acquire_listener, MainLoopConfig::default())
195+
}
196+
197+
pub fn enter_listener_loop_with_config<F, L, Fut, C>(
198+
acquire_listener: F,
199+
loop_config: MainLoopConfig,
200+
) -> anyhow::Result<()>
146201
where
147202
F: FnOnce() -> io::Result<(L, C)>,
148203
L: FnOnce(Box<dyn Fn(IpcClient)>) -> Fut,
@@ -159,7 +214,7 @@ where
159214
let (listener, cancel) = acquire_listener()?;
160215

161216
runtime
162-
.block_on(main_loop(listener, Arc::new(cancel)))
217+
.block_on(main_loop(listener, Arc::new(cancel), loop_config))
163218
.map_err(|e| e.into())
164219
}
165220

datadog-sidecar/src/setup/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,18 @@ mod windows;
1212
#[cfg(windows)]
1313
pub use self::windows::*;
1414

15+
// Thread-based listener module (Unix)
16+
#[cfg(unix)]
17+
pub mod thread_listener;
18+
#[cfg(unix)]
19+
pub use thread_listener::{connect_to_master, MasterListener};
20+
21+
// Thread-based listener module (Windows)
22+
#[cfg(windows)]
23+
pub mod thread_listener_windows;
24+
#[cfg(windows)]
25+
pub use thread_listener_windows::{connect_to_master, MasterListener};
26+
1527
use datadog_ipc::platform::Channel;
1628
use std::io;
1729

0 commit comments

Comments
 (0)