Skip to content

Commit e80e1ee

Browse files
committed
Fmt
Signed-off-by: Bob Weinand <bob.weinand@datadoghq.com>
1 parent 3d1a2cd commit e80e1ee

13 files changed

Lines changed: 76 additions & 668 deletions

File tree

LICENSE-3rdparty.yml

Lines changed: 3 additions & 576 deletions
Large diffs are not rendered by default.

datadog-ipc-macros/src/lib.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ fn gen_request_enum(enum_name: &Ident, methods: &[MethodInfo]) -> proc_macro2::T
130130
.collect();
131131

132132
quote! {
133-
#[derive(::serde::Serialize, ::serde::Deserialize)]
133+
#[derive(::serde::Serialize, ::serde::Deserialize, Debug)]
134134
pub enum #enum_name {
135135
#(#variants),*
136136
}
@@ -341,12 +341,13 @@ fn gen_serve_fn(
341341
::tracing::warn!("IPC serve: failed to receive handles");
342342
break;
343343
}
344-
let recv_counter = handler.recv_counter().fetch_add(1, ::std::sync::atomic::Ordering::Relaxed) + 1;
345-
::tracing::trace!(recv_counter, discriminant, pid = peer.pid, "IPC recv");
344+
let recv_counter = handler.recv_counter().load(::std::sync::atomic::Ordering::Relaxed) + 1;
345+
::tracing::trace!(recv_counter, ?req, pid = peer.pid, "IPC recv");
346346

347347
match req {
348348
#(#match_arms)*
349349
}
350+
handler.recv_counter().fetch_add(1, ::std::sync::atomic::Ordering::Relaxed);
350351
}
351352
}
352353
}
@@ -470,7 +471,7 @@ fn gen_channel(
470471
}
471472
}
472473

473-
/// `#[service]` replaces `#[tarpc::service]` + `#[impl_transfer_handles]`.
474+
/// `#[service]` macro.
474475
///
475476
/// Generates from a `trait` definition:
476477
/// - `{Trait}Request` enum (Clone, Serialize, Deserialize, TransferHandles)

datadog-ipc/src/example_interface.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,17 +85,15 @@ impl ExampleInterface for ExampleServer {
8585
std::future::ready(())
8686
}
8787

88-
fn shm_sum(
88+
async fn shm_sum(
8989
&self,
9090
_peer: datadog_ipc::PeerCredentials,
9191
handle: ShmHandle,
9292
len: usize,
93-
) -> impl std::future::Future<Output = u64> + Send + '_ {
94-
async move {
95-
match handle.map() {
96-
Ok(mapped) => mapped.as_slice()[..len].iter().map(|&b| b as u64).sum(),
97-
Err(_) => u64::MAX,
98-
}
93+
) -> u64 {
94+
match handle.map() {
95+
Ok(mapped) => mapped.as_slice()[..len].iter().map(|&b| b as u64).sum(),
96+
Err(_) => u64::MAX,
9997
}
10098
}
10199

datadog-ipc/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub mod client;
1717
pub mod codec;
1818

1919
pub use client::IpcClientConn;
20-
pub use platform::{recv_raw_async, send_raw_async};
2120
pub use platform::{
2221
max_message_size, PeerCredentials, SeqpacketConn, SeqpacketListener, HANDLE_SUFFIX_SIZE,
2322
};
23+
pub use platform::{recv_raw_async, send_raw_async};

datadog-ipc/src/platform/unix/sockets/linux.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl SeqpacketListener {
3434
let addr = UnixAddr::new_abstract(name).map_err(io::Error::from)?;
3535
Self::do_bind(addr)
3636
}
37-
37+
3838
fn do_bind(addr: UnixAddr) -> io::Result<Self> {
3939
let fd = create_seqpacket_socket()?;
4040
bind(fd.as_raw_fd(), &addr).map_err(io::Error::from)?;

datadog-ipc/src/platform/unix/sockets/mod.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,11 @@ impl SeqpacketConn {
321321
std::mem::size_of::<libc::c_int>() as libc::socklen_t,
322322
)
323323
};
324-
if ret < 0 { Err(io::Error::last_os_error()) } else { Ok(()) }
324+
if ret < 0 {
325+
Err(io::Error::last_os_error())
326+
} else {
327+
Ok(())
328+
}
325329
}
326330

327331
pub fn set_sndbuf_size(&self, size: usize) -> io::Result<()> {
@@ -355,13 +359,13 @@ pub type AsyncConn = AsyncFd<OwnedFd>;
355359
pub async fn recv_raw_async(fd: &AsyncConn) -> io::Result<(Vec<u8>, Vec<OwnedFd>)> {
356360
loop {
357361
let mut guard = fd.readable().await?;
358-
// SAFETY: recvmsg writes exactly the first n bytes; we truncate to n before returning,
359-
// so no uninitialized bytes are ever exposed to the caller.
360362
let mut buf = Vec::with_capacity(max_message_size());
361-
unsafe { buf.set_len(max_message_size()) };
362-
match guard.try_io(|inner| recvmsg_raw(inner.as_raw_fd(), &mut buf, MsgFlags::empty())) {
363+
// SAFETY: all bit patterns are valid for u8; recvmsg writes exactly n bytes into
364+
// the spare capacity before set_len(n) is called below.
365+
let slice = unsafe { std::slice::from_raw_parts_mut(buf.as_mut_ptr(), max_message_size()) };
366+
match guard.try_io(|inner| recvmsg_raw(inner.as_raw_fd(), slice, MsgFlags::empty())) {
363367
Ok(Ok((n, fds))) => {
364-
buf.truncate(n);
368+
unsafe { buf.set_len(n) };
365369
return Ok((buf, fds));
366370
}
367371
Ok(Err(e)) => return Err(e),

datadog-ipc/src/platform/windows/sockets.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -718,12 +718,16 @@ impl SeqpacketConn {
718718
pub async fn recv_raw_async(conn: &AsyncConn) -> io::Result<(Vec<u8>, Vec<OwnedHandle>)> {
719719
let h = conn.handle.as_raw_handle() as SysHANDLE;
720720
tokio::task::block_in_place(|| {
721-
// SAFETY: ReadFile writes exactly the first n bytes; we truncate to n before returning,
722-
// so no uninitialized bytes are ever exposed to the caller.
723-
let mut buf = Vec::with_capacity(max_message_size() + HANDLE_SUFFIX_SIZE);
724-
unsafe { buf.set_len(max_message_size() + HANDLE_SUFFIX_SIZE) };
725-
let (n, handles) = pipe_read(h, &mut buf, true)?;
726-
buf.truncate(n);
721+
let size = max_message_size() + HANDLE_SUFFIX_SIZE;
722+
let mut buf = Vec::with_capacity(size);
723+
// SAFETY: all bit patterns are valid for u8; pipe_read writes exactly n bytes into
724+
// the spare capacity before set_len(n) is called below.
725+
let (n, handles) = pipe_read(
726+
h,
727+
unsafe { std::slice::from_raw_parts_mut(buf.as_mut_ptr(), size) },
728+
true,
729+
)?;
730+
unsafe { buf.set_len(n) };
727731
Ok((buf, handles))
728732
})
729733
}

datadog-ipc/tests/windows_shm.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,15 @@ fn test_shm_handle_transfer() {
4343

4444
/// Verifies that IPC messages larger than 4 KB are handled without panicking.
4545
///
46-
/// Before the fix, Tokio's `NamedPipeServer` registered the pipe handle with mio/IOCP, which
46+
/// Using Tokio's `NamedPipeServer`, it registered the pipe handle with mio/IOCP, which
4747
/// posted overlapped `ReadFile` calls into a fixed 4 KB internal buffer. Messages larger than
4848
/// 4 KB caused `ReadFile` to return `ERROR_MORE_DATA` synchronously; Windows still queued an
4949
/// IOCP completion, but mio had already transitioned `io.read` to `State::Err`. When the
5050
/// completion fired, mio's `read_done` hit `_ => unreachable!()` (named_pipe.rs:871).
5151
///
52-
/// The fix routes serve-loop I/O through `block_in_place` + direct `ReadFile` into the
53-
/// caller-supplied large buffer, bypassing mio's 4 KB limit entirely.
52+
/// Which is why we route serve-loop I/O through `block_in_place` + direct `ReadFile` into the
53+
/// caller-supplied large buffer, bypassing mio's 4 KB limit entirely. This serves as regression
54+
/// test.
5455
#[test]
5556
fn test_large_message() {
5657
let (conn_server, conn_client) = SeqpacketConn::socketpair().unwrap();

datadog-sidecar/src/service/blocking.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,23 @@ use super::{
55
DynamicInstrumentationConfigState, InstanceId, QueueId, SerializedTracerHeaderTags,
66
SessionConfig, SidecarAction,
77
};
8-
use libdd_telemetry::metrics::MetricContext;
98
use crate::service::sender::SidecarSender;
109
use crate::service::sidecar_interface::SidecarInterfaceChannel;
1110
use datadog_ipc::platform::{FileBackedHandle, ShmHandle};
1211
use datadog_ipc::SeqpacketConn;
1312
use datadog_live_debugger::debugger_defs::DebuggerPayload;
1413
use datadog_live_debugger::sender::DebuggerType;
1514
use libdd_common::tag::Tag;
15+
use libdd_common::MutexExt;
1616
use libdd_dogstatsd_client::DogStatsDActionOwned;
17+
use libdd_telemetry::metrics::MetricContext;
1718
use serde::Serialize;
1819
use std::sync::Mutex;
1920
use std::{
2021
io,
2122
time::{Duration, Instant},
2223
};
2324
use tracing::warn;
24-
use libdd_common::MutexExt;
2525

2626
/// `SidecarTransport` wraps a [`SidecarSender`] with transparent reconnection support.
2727
///
@@ -42,11 +42,15 @@ impl SidecarTransport {
4242
Self::do_reconnect(&mut self.inner, factory, false);
4343
}
4444

45-
pub fn do_reconnect<F>(transport: &mut Mutex<SidecarSender>, factory: F, force_reconnect: bool) -> bool
45+
pub fn do_reconnect<F>(
46+
transport: &mut Mutex<SidecarSender>,
47+
factory: F,
48+
force_reconnect: bool,
49+
) -> bool
4650
where
4751
F: FnOnce() -> Option<Box<SidecarTransport>>,
4852
{
49-
let mut transport = match transport.lock() {
53+
let transport = match transport.get_mut() {
5054
Ok(t) => t,
5155
Err(_) => return false,
5256
};
@@ -120,9 +124,7 @@ impl SidecarTransport {
120124
Err(e) => e,
121125
}
122126
};
123-
if e.kind() == io::ErrorKind::BrokenPipe
124-
|| e.kind() == io::ErrorKind::ConnectionReset
125-
{
127+
if e.kind() == io::ErrorKind::BrokenPipe || e.kind() == io::ErrorKind::ConnectionReset {
126128
if let Some(ref reconnect) = self.reconnect_fn {
127129
if Self::do_reconnect(&mut self.inner, reconnect, true) {
128130
return f(&mut self.inner.lock_or_panic());
@@ -398,10 +400,7 @@ pub fn send_dogstatsd_actions(
398400
}
399401

400402
/// Sets x-datadog-test-session-token on all requests for the given session.
401-
pub fn set_test_session_token(
402-
transport: &mut SidecarTransport,
403-
token: String,
404-
) -> io::Result<()> {
403+
pub fn set_test_session_token(transport: &mut SidecarTransport, token: String) -> io::Result<()> {
405404
lock_sender(transport)?.set_test_session_token(token);
406405
Ok(())
407406
}

datadog-sidecar/src/service/sender.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ fn cancel_if_instance(slot: &mut Option<SidecarInterfaceRequest>, instance_id: &
6969
}
7070
}
7171

72-
7372
fn cancel_if_queue(
7473
slot: &mut Option<SidecarInterfaceRequest>,
7574
instance_id: &InstanceId,
@@ -107,7 +106,11 @@ fn coalesce(outbox: &mut SidecarOutbox, incoming: SidecarInterfaceRequest) {
107106
} = incoming
108107
{
109108
cancel_if_queue(&mut outbox.set_request_config, instance_id, queue_id);
110-
cancel_if_queue(&mut outbox.set_universal_service_tags, instance_id, queue_id);
109+
cancel_if_queue(
110+
&mut outbox.set_universal_service_tags,
111+
instance_id,
112+
queue_id,
113+
);
111114
}
112115

113116
match incoming {
@@ -280,7 +283,8 @@ impl SidecarSender {
280283
if self.metric_registrations.contains_key(&metric.name) {
281284
return;
282285
}
283-
self.metric_registrations.insert(metric.name.clone(), metric.clone());
286+
self.metric_registrations
287+
.insert(metric.name.clone(), metric.clone());
284288
let req = SidecarInterfaceRequest::RegisterTelemetryMetric { metric };
285289
self.channel.send_request_blocking(&req).ok();
286290
}

0 commit comments

Comments
 (0)