Skip to content

Commit 28db9c8

Browse files
Lewis-Eclaude
andcommitted
Deduplicate accept loop code in serve_tcp and remove dead serve_named_pipe
serve_tcp now delegates to serve_accept_loop_tcp instead of duplicating the accept loop logic. serve_named_pipe is removed as it was replaced by serve_accept_loop_named_pipe in the prior commit. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 11ebe02 commit 28db9c8

File tree

1 file changed

+14
-142
lines changed

1 file changed

+14
-142
lines changed

crates/datadog-trace-agent/src/mini_agent.rs

Lines changed: 14 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -223,152 +223,24 @@ impl MiniAgent {
223223
S::Future: Send,
224224
S::Error: std::error::Error + Send + Sync + 'static,
225225
{
226-
let server = hyper::server::conn::http1::Builder::new();
227-
let mut joinset = tokio::task::JoinSet::new();
228-
229-
loop {
230-
let conn = tokio::select! {
231-
con_res = listener.accept() => match con_res {
232-
Err(e)
233-
if matches!(
234-
e.kind(),
235-
io::ErrorKind::ConnectionAborted
236-
| io::ErrorKind::ConnectionReset
237-
| io::ErrorKind::ConnectionRefused
238-
) =>
239-
{
240-
continue;
241-
}
242-
Err(e) => {
243-
error!("Server error: {e}");
244-
return Err(e.into());
245-
}
246-
Ok((conn, _)) => conn,
247-
},
248-
finished = async {
249-
match joinset.join_next().await {
250-
Some(finished) => finished,
251-
None => std::future::pending().await,
252-
}
253-
} => match finished {
254-
Err(e) if e.is_panic() => {
255-
std::panic::resume_unwind(e.into_panic());
256-
},
257-
Ok(()) | Err(_) => continue,
258-
},
259-
// If there's some error in the background tasks, we can't send data
260-
result = &mut trace_flusher_handle => {
261-
error!("Trace flusher task died: {:?}", result);
262-
return Err("Trace flusher task terminated unexpectedly".into());
263-
},
264-
result = &mut stats_flusher_handle => {
265-
error!("Stats flusher task died: {:?}", result);
266-
return Err("Stats flusher task terminated unexpectedly".into());
267-
},
268-
};
269-
let conn = hyper_util::rt::TokioIo::new(conn);
270-
let server = server.clone();
271-
let service = service.clone();
272-
joinset.spawn(async move {
273-
if let Err(e) = server.serve_connection(conn, service).await {
274-
error!("Connection error: {e}");
275-
}
276-
});
277-
}
278-
}
226+
let mut tcp_handle = tokio::spawn(Self::serve_accept_loop_tcp(listener, service));
279227

280-
#[cfg(all(windows, feature = "windows-pipes"))]
281-
async fn serve_named_pipe<S>(
282-
pipe_name: &str,
283-
service: S,
284-
mut trace_flusher_handle: tokio::task::JoinHandle<()>,
285-
mut stats_flusher_handle: tokio::task::JoinHandle<()>,
286-
) -> Result<(), Box<dyn std::error::Error>>
287-
where
288-
S: hyper::service::Service<
289-
hyper::Request<hyper::body::Incoming>,
290-
Response = hyper::Response<hyper_migration::Body>,
291-
> + Clone
292-
+ Send
293-
+ 'static,
294-
S::Future: Send,
295-
S::Error: std::error::Error + Send + Sync + 'static,
296-
{
297-
let server = hyper::server::conn::http1::Builder::new();
298-
let mut joinset = tokio::task::JoinSet::new();
299-
300-
loop {
301-
// Create a new pipe instance
302-
// pipe_name already includes \\.\pipe\ prefix from config
303-
let pipe = match ServerOptions::new().create(pipe_name) {
304-
Ok(pipe) => {
305-
debug!("Created pipe server instance '{}' in byte mode", pipe_name);
306-
pipe
307-
}
308-
Err(e) => {
309-
error!("Failed to create named pipe: {e}");
310-
return Err(e.into());
311-
}
312-
};
313-
314-
// Wait for client connection
315-
let conn = tokio::select! {
316-
connect_res = pipe.connect() => match connect_res {
317-
Err(e)
318-
if matches!(
319-
e.kind(),
320-
io::ErrorKind::ConnectionAborted
321-
| io::ErrorKind::ConnectionReset
322-
| io::ErrorKind::ConnectionRefused
323-
) =>
324-
{
325-
continue;
326-
}
327-
Err(e) => {
328-
error!("Named pipe connection error: {e}");
329-
return Err(e.into());
330-
}
331-
Ok(()) => {
332-
debug!("Client connected to '{}'", pipe_name);
333-
pipe
334-
}
335-
},
336-
finished = async {
337-
match joinset.join_next().await {
338-
Some(finished) => finished,
339-
None => std::future::pending().await,
340-
}
341-
} => match finished {
342-
Err(e) if e.is_panic() => {
343-
std::panic::resume_unwind(e.into_panic());
344-
},
345-
Ok(()) | Err(_) => continue,
346-
},
347-
// If there's some error in the background tasks, we can't send data
348-
result = &mut trace_flusher_handle => {
349-
error!("Trace flusher task died: {:?}", result);
350-
return Err("Trace flusher task terminated unexpectedly".into());
351-
},
352-
result = &mut stats_flusher_handle => {
353-
error!("Stats flusher task died: {:?}", result);
354-
return Err("Stats flusher task terminated unexpectedly".into());
355-
},
356-
};
357-
358-
// Hyper http parser handles buffering pipe data
359-
let conn = hyper_util::rt::TokioIo::new(conn);
360-
let server = server.clone();
361-
let service = service.clone();
362-
joinset.spawn(async move {
363-
if let Err(e) = server.serve_connection(conn, service).await {
364-
error!("Connection error: {e}");
365-
}
366-
});
228+
tokio::select! {
229+
result = &mut tcp_handle => {
230+
error!("TCP accept loop died: {:?}", result);
231+
return Err("TCP accept loop terminated unexpectedly".into());
232+
},
233+
result = &mut trace_flusher_handle => {
234+
error!("Trace flusher task died: {:?}", result);
235+
return Err("Trace flusher task terminated unexpectedly".into());
236+
},
237+
result = &mut stats_flusher_handle => {
238+
error!("Stats flusher task died: {:?}", result);
239+
return Err("Stats flusher task terminated unexpectedly".into());
240+
},
367241
}
368242
}
369243

370-
/// TCP accept loop without flusher monitoring, for use when running alongside named pipes.
371-
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
372244
async fn serve_accept_loop_tcp<S>(
373245
listener: tokio::net::TcpListener,
374246
service: S,

0 commit comments

Comments
 (0)