Skip to content

Commit 31a48ff

Browse files
committed
feat(sidecar): support threaded connection for windows
Signed-off-by: Alexandre Rulleau <alexandre.rulleau@datadoghq.com>
1 parent 46924a8 commit 31a48ff

3 files changed

Lines changed: 227 additions & 16 deletions

File tree

datadog-sidecar-ffi/src/lib.rs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ use std::slice;
5757
use std::sync::Arc;
5858
use std::time::Duration;
5959

60+
use datadog_sidecar::setup::{connect_to_master, MasterListener};
61+
6062
#[no_mangle]
6163
#[cfg(target_os = "windows")]
6264
pub extern "C" fn ddog_setup_crashtracking(
@@ -308,53 +310,38 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) -
308310
}
309311

310312
#[no_mangle]
311-
#[cfg(unix)]
312313
pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError {
313-
use datadog_sidecar::setup::MasterListener;
314-
315314
let cfg = datadog_sidecar::config::FromEnv::config();
316315
try_c!(MasterListener::start(pid, cfg));
317316

318317
MaybeError::None
319318
}
320319

321320
#[no_mangle]
322-
#[cfg(unix)]
323321
pub extern "C" fn ddog_sidecar_connect_worker(
324322
pid: i32,
325323
connection: &mut *mut SidecarTransport,
326324
) -> MaybeError {
327-
use datadog_sidecar::setup::connect_to_master;
328-
329325
let transport = try_c!(connect_to_master(pid));
330326
*connection = Box::into_raw(transport);
331327

332328
MaybeError::None
333329
}
334330

335331
#[no_mangle]
336-
#[cfg(unix)]
337332
pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError {
338-
use datadog_sidecar::setup::MasterListener;
339-
340333
try_c!(MasterListener::shutdown());
341334

342335
MaybeError::None
343336
}
344337

345338
#[no_mangle]
346-
#[cfg(unix)]
347339
pub extern "C" fn ddog_sidecar_is_master_listener_active(pid: i32) -> bool {
348-
use datadog_sidecar::setup::MasterListener;
349-
350340
MasterListener::is_active(pid)
351341
}
352342

353343
#[no_mangle]
354-
#[cfg(unix)]
355344
pub extern "C" fn ddog_sidecar_clear_inherited_listener() -> MaybeError {
356-
use datadog_sidecar::setup::MasterListener;
357-
358345
try_c!(MasterListener::clear_inherited_state());
359346

360347
MaybeError::None

datadog-sidecar/src/setup/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,18 @@ mod windows;
1212
#[cfg(windows)]
1313
pub use self::windows::*;
1414

15-
// Thread-based listener module (Unix only)
15+
// Thread-based listener module (Unix)
1616
#[cfg(unix)]
1717
pub mod thread_listener;
1818
#[cfg(unix)]
1919
pub use thread_listener::{connect_to_master, MasterListener};
2020

21+
// Thread-based listener module (Windows)
22+
#[cfg(windows)]
23+
pub mod thread_listener_windows;
24+
#[cfg(windows)]
25+
pub use thread_listener_windows::{connect_to_master, MasterListener};
26+
2127
use datadog_ipc::platform::Channel;
2228
use std::io;
2329

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use std::io;
5+
use std::os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle};
6+
use std::sync::{Mutex, OnceLock};
7+
use std::thread::{self, JoinHandle};
8+
use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
9+
use tokio::sync::oneshot;
10+
use tracing::{error, info};
11+
12+
use crate::config::Config;
13+
use crate::entry::MainLoopConfig;
14+
use crate::service::blocking::SidecarTransport;
15+
use datadog_ipc::platform::metadata::ProcessHandle;
16+
use datadog_ipc::platform::Channel;
17+
use datadog_ipc::transport::blocking::BlockingTransport;
18+
19+
static MASTER_LISTENER: OnceLock<Mutex<Option<MasterListener>>> = OnceLock::new();
20+
21+
pub struct MasterListener {
22+
shutdown_tx: Option<oneshot::Sender<()>>,
23+
thread_handle: Option<JoinHandle<()>>,
24+
pid: i32,
25+
}
26+
27+
impl MasterListener {
28+
/// Start the master listener thread using Windows Named Pipes.
29+
///
30+
/// This spawns a new OS thread that creates a named pipe server
31+
/// to listen for worker connections. Only one listener can be active per process.
32+
pub fn start(pid: i32, config: Config) -> io::Result<()> {
33+
let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None));
34+
let mut listener_guard = listener_mutex
35+
.lock()
36+
.map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?;
37+
38+
if listener_guard.is_some() {
39+
return Err(io::Error::other("Master listener is already running"));
40+
}
41+
42+
let pipe_name = format!(r"\\.\pipe\ddtrace_sidecar_{}", pid);
43+
let (shutdown_tx, shutdown_rx) = oneshot::channel();
44+
45+
let thread_handle = thread::Builder::new()
46+
.name(format!("ddtrace-sidecar-listener-{}", pid))
47+
.spawn(move || {
48+
if let Err(e) = run_listener_windows(pipe_name, shutdown_rx) {
49+
error!("Listener thread error: {}", e);
50+
}
51+
})
52+
.map_err(|e| io::Error::other(format!("Failed to spawn listener thread: {}", e)))?;
53+
54+
*listener_guard = Some(MasterListener {
55+
shutdown_tx: Some(shutdown_tx),
56+
thread_handle: Some(thread_handle),
57+
pid,
58+
});
59+
60+
info!("Started Windows named pipe listener (PID {})", pid);
61+
Ok(())
62+
}
63+
64+
/// Shutdown the master listener thread.
65+
///
66+
/// Sends shutdown signal and joins the listener thread. This is blocking
67+
/// and will wait for the thread to exit cleanly.
68+
pub fn shutdown() -> io::Result<()> {
69+
let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None));
70+
let mut listener_guard = listener_mutex
71+
.lock()
72+
.map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?;
73+
74+
if let Some(mut master) = listener_guard.take() {
75+
// Signal shutdown by sending to the oneshot sender
76+
if let Some(tx) = master.shutdown_tx.take() {
77+
let _ = tx.send(());
78+
}
79+
80+
if let Some(handle) = master.thread_handle.take() {
81+
handle
82+
.join()
83+
.map_err(|_| io::Error::other("Failed to join listener thread"))?;
84+
}
85+
86+
info!("Master listener thread shut down successfully");
87+
Ok(())
88+
} else {
89+
Err(io::Error::other("No master listener is running"))
90+
}
91+
}
92+
93+
/// Check if the master listener is active for the given PID.
94+
///
95+
/// Unlike Unix, Windows doesn't have fork, so this is simpler than the Unix version.
96+
pub fn is_active(pid: i32) -> bool {
97+
let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None));
98+
if let Ok(listener_guard) = listener_mutex.lock() {
99+
listener_guard.as_ref().is_some_and(|l| l.pid == pid)
100+
} else {
101+
false
102+
}
103+
}
104+
105+
/// Clear inherited listener state.
106+
///
107+
/// On Windows, this is a no-op since Windows doesn't have fork.
108+
/// Kept for API compatibility with Unix version.
109+
pub fn clear_inherited_state() -> io::Result<()> {
110+
Ok(())
111+
}
112+
}
113+
114+
/// Accept connections in a loop for Windows named pipes.
115+
async fn accept_pipe_loop_windows(
116+
pipe_name: String,
117+
handler: Box<dyn Fn(tokio::net::windows::named_pipe::NamedPipeServer)>,
118+
mut shutdown_rx: oneshot::Receiver<()>,
119+
) -> io::Result<()> {
120+
let mut server = ServerOptions::new()
121+
.first_pipe_instance(true)
122+
.max_instances(254) // Windows allows up to 255 instances
123+
.create(&pipe_name)?;
124+
125+
info!("Named pipe server created at: {}", pipe_name);
126+
127+
loop {
128+
tokio::select! {
129+
_ = &mut shutdown_rx => {
130+
info!("Shutdown signal received in Windows pipe listener");
131+
break;
132+
}
133+
result = server.connect() => {
134+
match result {
135+
Ok(_) => {
136+
info!("Accepted new worker connection on named pipe");
137+
handler(server);
138+
139+
server = ServerOptions::new()
140+
.create(&pipe_name)?;
141+
}
142+
Err(e) => {
143+
error!("Failed to accept worker connection: {}", e);
144+
match ServerOptions::new().create(&pipe_name) {
145+
Ok(new_server) => server = new_server,
146+
Err(e2) => {
147+
error!("Failed to recover named pipe: {}", e2);
148+
break;
149+
}
150+
}
151+
}
152+
}
153+
}
154+
}
155+
}
156+
Ok(())
157+
}
158+
159+
/// Entry point for Windows named pipe listener
160+
fn run_listener_windows(
161+
pipe_name: String,
162+
shutdown_rx: oneshot::Receiver<()>,
163+
) -> io::Result<()> {
164+
info!("Listener thread running, creating Windows named pipe server");
165+
166+
let acquire_listener = move || {
167+
let cancel = || {};
168+
let pipe_name_clone = pipe_name.clone();
169+
Ok((
170+
move |handler| accept_pipe_loop_windows(pipe_name_clone, handler, shutdown_rx),
171+
cancel,
172+
))
173+
};
174+
175+
let loop_config = MainLoopConfig {
176+
enable_ctrl_c_handler: false,
177+
enable_crashtracker: false,
178+
external_shutdown_rx: None,
179+
};
180+
181+
crate::entry::enter_listener_loop_with_config(acquire_listener, loop_config)
182+
.map_err(|e| io::Error::other(format!("Windows thread listener failed: {}", e)))?;
183+
184+
info!("Listener thread exiting");
185+
Ok(())
186+
}
187+
188+
/// Connect to the master listener as a worker using Windows Named Pipes.
189+
///
190+
/// Establishes a connection to the master listener thread for the given PID.
191+
pub fn connect_to_master(pid: i32) -> io::Result<Box<SidecarTransport>> {
192+
info!("Connecting to master listener via named pipe (PID {})", pid);
193+
194+
let pipe_name = format!(r"\\.\pipe\ddtrace_sidecar_{}", pid);
195+
196+
let client = ClientOptions::new().open(&pipe_name)?;
197+
198+
info!("Connected to named pipe: {}", pipe_name);
199+
200+
let raw_handle = client.as_raw_handle();
201+
let owned_handle = unsafe { OwnedHandle::from_raw_handle(raw_handle) };
202+
203+
std::mem::forget(client);
204+
205+
// ProcessHandle for thread mode - we know the PID immediately
206+
let process_handle = ProcessHandle::Getter(Box::new(move || Ok(ProcessHandle::Pid(pid as u32))));
207+
let channel = Channel::from_client_handle_and_pid(owned_handle, process_handle);
208+
209+
let transport = BlockingTransport::from(channel);
210+
211+
let sidecar_transport = Box::new(SidecarTransport {
212+
inner: Mutex::new(transport),
213+
reconnect_fn: None,
214+
});
215+
216+
info!("Successfully connected to master listener");
217+
Ok(sidecar_transport)
218+
}

0 commit comments

Comments
 (0)