Skip to content

Commit 11ebe02

Browse files
Lewis-Eclaude
andcommitted
Listen on both TCP and named pipes when pipe name is configured
When a named pipe name is configured, the mini agent now starts both a TCP listener and a named pipe listener concurrently. This ensures backwards compatibility — older tracers that only support TCP can still communicate with the compat binary, while new tracers use named pipes for multi-function isolation. When no pipe name is configured, behavior is unchanged (TCP only). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8466769 commit 11ebe02

File tree

1 file changed

+185
-26
lines changed

1 file changed

+185
-26
lines changed

crates/datadog-trace-agent/src/mini_agent.rs

Lines changed: 185 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -128,53 +128,75 @@ impl MiniAgent {
128128
)
129129
});
130130

131-
// Determine which transport to use based on configuration
131+
// Determine which transports to use based on configuration
132132
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
133133
let pipe_name_opt = self.config.dd_apm_windows_pipe_name.as_ref();
134134
#[cfg(not(any(all(windows, feature = "windows-pipes"), test)))]
135135
let pipe_name_opt: Option<&String> = None;
136136

137-
if let Some(pipe_name) = pipe_name_opt {
138-
debug!("Mini Agent started: listening on named pipe {}", pipe_name);
139-
} else {
140-
debug!(
141-
"Mini Agent started: listening on port {}",
142-
self.config.dd_apm_receiver_port
143-
);
144-
}
145137
debug!(
146138
"Time taken to start the Mini Agent: {} ms",
147139
now.elapsed().as_millis()
148140
);
149141

142+
// Always start TCP listener
143+
let addr = SocketAddr::from(([127, 0, 0, 1], self.config.dd_apm_receiver_port));
144+
let tcp_listener = tokio::net::TcpListener::bind(&addr).await?;
145+
debug!(
146+
"Mini Agent listening on TCP port {}",
147+
self.config.dd_apm_receiver_port
148+
);
149+
150150
if let Some(pipe_name) = pipe_name_opt {
151-
// Windows named pipe transport
151+
// Both TCP and named pipe transports
152+
debug!("Mini Agent also listening on named pipe {}", pipe_name);
153+
152154
#[cfg(all(windows, feature = "windows-pipes"))]
153155
{
154-
Self::serve_named_pipe(
155-
pipe_name,
156-
service,
157-
trace_flusher_handle,
158-
stats_flusher_handle,
159-
)
160-
.await?;
156+
let tcp_service = service.clone();
157+
let pipe_service = service;
158+
159+
let mut tcp_handle = tokio::spawn(Self::serve_accept_loop_tcp(
160+
tcp_listener,
161+
tcp_service,
162+
));
163+
164+
let mut pipe_handle = tokio::spawn(Self::serve_accept_loop_named_pipe(
165+
pipe_name.clone(),
166+
pipe_service,
167+
));
168+
169+
// Monitor all tasks — if any critical task dies, shut down
170+
tokio::select! {
171+
result = &mut tcp_handle => {
172+
error!("TCP accept loop died: {:?}", result);
173+
return Err("TCP accept loop terminated unexpectedly".into());
174+
},
175+
result = &mut pipe_handle => {
176+
error!("Named pipe accept loop died: {:?}", result);
177+
return Err("Named pipe accept loop terminated unexpectedly".into());
178+
},
179+
result = &mut trace_flusher_handle => {
180+
error!("Trace flusher task died: {:?}", result);
181+
return Err("Trace flusher task terminated unexpectedly".into());
182+
},
183+
result = &mut stats_flusher_handle => {
184+
error!("Stats flusher task died: {:?}", result);
185+
return Err("Stats flusher task terminated unexpectedly".into());
186+
},
187+
}
161188
}
162189
#[cfg(not(all(windows, feature = "windows-pipes")))]
163190
{
164-
let _ = pipe_name; // Suppress unused variable warning
191+
let _ = pipe_name;
165192
unreachable!(
166-
"Named pipes are only supported on Windows with the windows-pipes feature \
167-
enabled, cannot use pipe: {}.",
168-
pipe_name
193+
"Named pipes are only supported on Windows with the windows-pipes feature enabled."
169194
);
170195
}
171196
} else {
172-
// TCP transport
173-
let addr = SocketAddr::from(([127, 0, 0, 1], self.config.dd_apm_receiver_port));
174-
let listener = tokio::net::TcpListener::bind(&addr).await?;
175-
197+
// TCP-only transport
176198
Self::serve_tcp(
177-
listener,
199+
tcp_listener,
178200
service,
179201
trace_flusher_handle,
180202
stats_flusher_handle,
@@ -345,6 +367,143 @@ impl MiniAgent {
345367
}
346368
}
347369

370+
/// TCP accept loop without flusher monitoring, for use when running alongside named pipes.
371+
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
372+
async fn serve_accept_loop_tcp<S>(
373+
listener: tokio::net::TcpListener,
374+
service: S,
375+
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
376+
where
377+
S: hyper::service::Service<
378+
hyper::Request<hyper::body::Incoming>,
379+
Response = hyper::Response<hyper_migration::Body>,
380+
> + Clone
381+
+ Send
382+
+ 'static,
383+
S::Future: Send,
384+
S::Error: std::error::Error + Send + Sync + 'static,
385+
{
386+
let server = hyper::server::conn::http1::Builder::new();
387+
let mut joinset = tokio::task::JoinSet::new();
388+
389+
loop {
390+
let conn = tokio::select! {
391+
con_res = listener.accept() => match con_res {
392+
Err(e)
393+
if matches!(
394+
e.kind(),
395+
io::ErrorKind::ConnectionAborted
396+
| io::ErrorKind::ConnectionReset
397+
| io::ErrorKind::ConnectionRefused
398+
) =>
399+
{
400+
continue;
401+
}
402+
Err(e) => {
403+
error!("TCP server error: {e}");
404+
return Err(e.into());
405+
}
406+
Ok((conn, _)) => conn,
407+
},
408+
finished = async {
409+
match joinset.join_next().await {
410+
Some(finished) => finished,
411+
None => std::future::pending().await,
412+
}
413+
} => match finished {
414+
Err(e) if e.is_panic() => {
415+
std::panic::resume_unwind(e.into_panic());
416+
},
417+
Ok(()) | Err(_) => continue,
418+
},
419+
};
420+
let conn = hyper_util::rt::TokioIo::new(conn);
421+
let server = server.clone();
422+
let service = service.clone();
423+
joinset.spawn(async move {
424+
if let Err(e) = server.serve_connection(conn, service).await {
425+
error!("TCP connection error: {e}");
426+
}
427+
});
428+
}
429+
}
430+
431+
/// Named pipe accept loop without flusher monitoring, for use when running alongside TCP.
432+
#[cfg(all(windows, feature = "windows-pipes"))]
433+
async fn serve_accept_loop_named_pipe<S>(
434+
pipe_name: String,
435+
service: S,
436+
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
437+
where
438+
S: hyper::service::Service<
439+
hyper::Request<hyper::body::Incoming>,
440+
Response = hyper::Response<hyper_migration::Body>,
441+
> + Clone
442+
+ Send
443+
+ 'static,
444+
S::Future: Send,
445+
S::Error: std::error::Error + Send + Sync + 'static,
446+
{
447+
let server = hyper::server::conn::http1::Builder::new();
448+
let mut joinset = tokio::task::JoinSet::new();
449+
450+
loop {
451+
let pipe = match ServerOptions::new().create(&pipe_name) {
452+
Ok(pipe) => {
453+
debug!("Created pipe server instance '{}' in byte mode", pipe_name);
454+
pipe
455+
}
456+
Err(e) => {
457+
error!("Failed to create named pipe: {e}");
458+
return Err(e.into());
459+
}
460+
};
461+
462+
let conn = tokio::select! {
463+
connect_res = pipe.connect() => match connect_res {
464+
Err(e)
465+
if matches!(
466+
e.kind(),
467+
io::ErrorKind::ConnectionAborted
468+
| io::ErrorKind::ConnectionReset
469+
| io::ErrorKind::ConnectionRefused
470+
) =>
471+
{
472+
continue;
473+
}
474+
Err(e) => {
475+
error!("Named pipe connection error: {e}");
476+
return Err(e.into());
477+
}
478+
Ok(()) => {
479+
debug!("Client connected to '{}'", pipe_name);
480+
pipe
481+
}
482+
},
483+
finished = async {
484+
match joinset.join_next().await {
485+
Some(finished) => finished,
486+
None => std::future::pending().await,
487+
}
488+
} => match finished {
489+
Err(e) if e.is_panic() => {
490+
std::panic::resume_unwind(e.into_panic());
491+
},
492+
Ok(()) | Err(_) => continue,
493+
},
494+
};
495+
496+
let conn = hyper_util::rt::TokioIo::new(conn);
497+
let server = server.clone();
498+
let service = service.clone();
499+
joinset.spawn(async move {
500+
if let Err(e) = server.serve_connection(conn, service).await {
501+
error!("Named pipe connection error: {e}");
502+
}
503+
});
504+
}
505+
}
506+
348507
#[allow(clippy::too_many_arguments)]
349508
async fn trace_endpoint_handler(
350509
config: Arc<config::Config>,

0 commit comments

Comments
 (0)