Skip to content

Commit aa69c87

Browse files
fix(profiling): add TSAN annotations to ExporterManager channel operations
When C++ tests are compiled with -fsanitize=thread but the Rust library is not instrumented, TSAN cannot observe the atomic synchronization inside crossbeam-channel, causing false positive data race reports. Add __tsan_acquire/__tsan_release annotations around channel send/recv operations using dlsym to dynamically detect TSAN at runtime. This establishes the happens-before relationship that TSAN needs without changing runtime behavior when sanitizers are not active.
1 parent 45d455a commit aa69c87

3 files changed

Lines changed: 77 additions & 8 deletions

File tree

Cargo.lock

Lines changed: 8 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libdd-profiling/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ byteorder = { version = "1.5", features = ["std"] }
3333
bytes = "1.11.1"
3434
chrono = {version = "0.4", default-features = false, features = ["std", "clock"]}
3535
crossbeam-channel = "0.5.15"
36+
libc = "0.2"
3637
crossbeam-utils = { version = "0.8.21" }
3738
cxx = { version = "1.0", optional = true }
3839
futures = { version = "0.3", default-features = false }

libdd-profiling/src/exporter/exporter_manager.rs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,65 @@ use crate::{exporter::File, internal::EncodedProfile};
66
use crossbeam_channel::{Receiver, Sender};
77
use libdd_common::tag::Tag;
88
use reqwest::RequestBuilder;
9+
use std::sync::Arc;
910
use std::thread::JoinHandle;
1011
use tokio_util::sync::CancellationToken;
1112

13+
/// TSAN annotations for crossbeam-channel synchronization.
14+
///
15+
/// When C++ tests are compiled with `-fsanitize=thread` but the Rust library is
16+
/// not, TSAN cannot see the atomic synchronization inside crossbeam-channel.
17+
/// These annotations tell TSAN about the happens-before relationship between
18+
/// channel send and receive operations via `__tsan_release`/`__tsan_acquire`.
19+
mod tsan {
20+
use std::ffi::c_void;
21+
use std::sync::OnceLock;
22+
23+
type TsanAnnotateFn = unsafe extern "C" fn(*mut c_void);
24+
25+
struct TsanFunctions {
26+
acquire: Option<TsanAnnotateFn>,
27+
release: Option<TsanAnnotateFn>,
28+
}
29+
30+
static FUNCTIONS: OnceLock<TsanFunctions> = OnceLock::new();
31+
32+
fn get() -> &'static TsanFunctions {
33+
FUNCTIONS.get_or_init(|| unsafe {
34+
let acquire =
35+
libc::dlsym(libc::RTLD_DEFAULT, b"__tsan_acquire\0".as_ptr().cast());
36+
let release =
37+
libc::dlsym(libc::RTLD_DEFAULT, b"__tsan_release\0".as_ptr().cast());
38+
TsanFunctions {
39+
acquire: if acquire.is_null() {
40+
None
41+
} else {
42+
Some(std::mem::transmute(acquire))
43+
},
44+
release: if release.is_null() {
45+
None
46+
} else {
47+
Some(std::mem::transmute(release))
48+
},
49+
}
50+
})
51+
}
52+
53+
/// Signal that data associated with `addr` has been published (sender side).
54+
pub fn release(addr: *const u8) {
55+
if let Some(f) = get().release {
56+
unsafe { f(addr as *mut c_void) };
57+
}
58+
}
59+
60+
/// Signal that data associated with `addr` has been consumed (receiver side).
61+
pub fn acquire(addr: *const u8) {
62+
if let Some(f) = get().acquire {
63+
unsafe { f(addr as *mut c_void) };
64+
}
65+
}
66+
}
67+
1268
#[derive(Debug)]
1369
pub enum ExporterManager {
1470
Active {
@@ -17,6 +73,8 @@ pub enum ExporterManager {
1773
handle: JoinHandle<()>,
1874
sender: Sender<RequestBuilder>,
1975
receiver: Receiver<RequestBuilder>,
76+
/// Stable heap address used for TSAN release/acquire annotations.
77+
tsan_sync: Arc<u8>,
2078
},
2179
Suspended {
2280
exporter: ProfileExporter,
@@ -31,8 +89,10 @@ impl ExporterManager {
3189
pub fn new(exporter: ProfileExporter) -> anyhow::Result<Self> {
3290
let (sender, receiver) = crossbeam_channel::bounded(2);
3391
let cancel = CancellationToken::new();
92+
let tsan_sync = Arc::new(0u8);
3493
let cloned_receiver: Receiver<RequestBuilder> = receiver.clone();
3594
let cloned_cancel = cancel.clone();
95+
let cloned_tsan_sync = tsan_sync.clone();
3696
let handle = std::thread::spawn(move || {
3797
let Ok(runtime) = tokio::runtime::Builder::new_current_thread()
3898
.enable_all()
@@ -46,6 +106,7 @@ impl ExporterManager {
46106
let Ok(msg) = cloned_receiver.recv() else {
47107
return;
48108
};
109+
tsan::acquire(Arc::as_ptr(&cloned_tsan_sync));
49110
if cloned_cancel
50111
.run_until_cancelled(msg.send())
51112
.await
@@ -63,6 +124,7 @@ impl ExporterManager {
63124
handle,
64125
sender,
65126
receiver,
127+
tsan_sync,
66128
})
67129
}
68130

@@ -75,6 +137,7 @@ impl ExporterManager {
75137
handle,
76138
sender,
77139
receiver,
140+
tsan_sync: _,
78141
} = old
79142
else {
80143
*self = old;
@@ -147,7 +210,10 @@ impl ExporterManager {
147210
process_tags: Option<&str>,
148211
) -> anyhow::Result<()> {
149212
let Self::Active {
150-
exporter, sender, ..
213+
exporter,
214+
sender,
215+
tsan_sync,
216+
..
151217
} = self
152218
else {
153219
anyhow::bail!("Cannot queue on manager in state: {:?}", self);
@@ -163,6 +229,7 @@ impl ExporterManager {
163229
)?;
164230
// TODO, use thiserror and get back the actual error if one.
165231
sender.try_send(msg)?;
232+
tsan::release(Arc::as_ptr(tsan_sync));
166233
Ok(())
167234
}
168235

0 commit comments

Comments
 (0)