Skip to content

Commit bb6c93b

Browse files
committed
fix: address lock-free bug
1 parent c862039 commit bb6c93b

3 files changed

Lines changed: 49 additions & 7 deletions

File tree

ddprof-lib/src/main/cpp/threadFilter.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,8 @@ class ThreadFilter {
215215
snapshot.correlation_id = 0;
216216
}
217217
snapshot.generation = blockGeneration();
218-
snapshot.active = snapshot.active_state != OSThreadState::UNKNOWN;
218+
snapshot.active = snapshot.owner != BlockRunOwner::NONE &&
219+
snapshot.active_state != OSThreadState::UNKNOWN;
219220
snapshot.has_stack_reference =
220221
sampled && (snapshot.call_trace_id != 0 || snapshot.correlation_id != 0);
221222
return snapshot;

ddprof-lib/src/test/cpp/nativeSocketInterposer_ut.cpp

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,19 @@ class NativeSocketInterposerFdTest : public ::testing::Test {
421421
int dup3ThroughHook(int oldfd, int newfd, int flags) {
422422
return NativeSocketInterposer::dup3_hook(oldfd, newfd, flags);
423423
}
424+
425+
int datagramSocketAtFd(int target_fd) {
426+
int fd = socket(AF_INET, SOCK_DGRAM, 0);
427+
if (fd < 0 || fd == target_fd) {
428+
return fd;
429+
}
430+
431+
int ret = ::dup2(fd, target_fd);
432+
int saved_errno = errno;
433+
::close(fd);
434+
errno = saved_errno;
435+
return ret;
436+
}
424437
};
425438

426439
} // namespace
@@ -1165,11 +1178,18 @@ TEST_F(NativeSocketInterposerFdTest, ConcurrentClassifierReadsAndClearsAreSafe)
11651178
g_fd_probe_calls = 0;
11661179
g_fd_probe_rc = 0;
11671180
g_fd_probe_errno = 0;
1181+
static constexpr int kReaders = 4;
1182+
std::atomic<bool> start{false};
11681183
std::atomic<bool> stop{false};
1184+
std::atomic<int> ready_readers{0};
11691185
std::atomic<int> reads{0};
11701186
int fd = 48;
11711187

11721188
std::thread clearer([&]() {
1189+
while (ready_readers.load(std::memory_order_acquire) < kReaders) {
1190+
std::this_thread::yield();
1191+
}
1192+
start.store(true, std::memory_order_release);
11731193
for (int i = 0; i < 1000; i++) {
11741194
g_fd_probe_so_type = (i % 2 == 0) ? SOCK_STREAM : SOCK_DGRAM;
11751195
classifier.clearFdType(fd);
@@ -1182,8 +1202,12 @@ TEST_F(NativeSocketInterposerFdTest, ConcurrentClassifierReadsAndClearsAreSafe)
11821202
});
11831203

11841204
std::vector<std::thread> readers;
1185-
for (int i = 0; i < 4; i++) {
1205+
for (int i = 0; i < kReaders; i++) {
11861206
readers.emplace_back([&]() {
1207+
ready_readers.fetch_add(1, std::memory_order_release);
1208+
while (!start.load(std::memory_order_acquire)) {
1209+
std::this_thread::yield();
1210+
}
11871211
while (!stop.load(std::memory_order_acquire)) {
11881212
(void)classifier.isStreamSocket(fd);
11891213
(void)classifier.isDatagramSocket(fd);
@@ -1209,7 +1233,7 @@ TEST_F(NativeSocketInterposerFdTest, CloseHookInvalidatesFdBeforeReuse) {
12091233
EXPECT_TRUE(NativeSocketInterposer::instance()->isStreamSocket(reused_fd));
12101234
ASSERT_EQ(0, closeThroughHook(reused_fd));
12111235

1212-
int datagram_fd = socket(AF_INET, SOCK_DGRAM, 0);
1236+
int datagram_fd = datagramSocketAtFd(reused_fd);
12131237
ASSERT_EQ(reused_fd, datagram_fd);
12141238
EXPECT_FALSE(NativeSocketInterposer::instance()->isStreamSocket(datagram_fd));
12151239

@@ -1233,7 +1257,7 @@ TEST_F(NativeSocketInterposerFdTest,
12331257
EXPECT_EQ(E2BIG, errno);
12341258
EXPECT_FALSE(sampler->fdAddrCacheContainsForTest(reused_fd));
12351259

1236-
int datagram_fd = socket(AF_INET, SOCK_DGRAM, 0);
1260+
int datagram_fd = datagramSocketAtFd(reused_fd);
12371261
ASSERT_EQ(reused_fd, datagram_fd);
12381262
EXPECT_FALSE(sampler->isSocketForTest(datagram_fd));
12391263

@@ -1253,7 +1277,7 @@ TEST_F(NativeSocketInterposerFdTest, RepeatedCacheClearsDoNotResurrectOldFdType)
12531277
NativeSocketInterposer::instance()->clearFdTypeCache();
12541278
}
12551279

1256-
int datagram_fd = socket(AF_INET, SOCK_DGRAM, 0);
1280+
int datagram_fd = datagramSocketAtFd(reused_fd);
12571281
ASSERT_EQ(reused_fd, datagram_fd);
12581282
EXPECT_FALSE(NativeSocketInterposer::instance()->isStreamSocket(datagram_fd));
12591283

@@ -1426,11 +1450,12 @@ TEST_F(NativeSocketInterposerFdTest, ConcurrentFdReuseInvalidationDoesNotPreserv
14261450
});
14271451

14281452
ASSERT_EQ(0, closeThroughHook(reused_fd));
1429-
int datagram_fd = socket(AF_INET, SOCK_DGRAM, 0);
1430-
ASSERT_EQ(reused_fd, datagram_fd);
1453+
int datagram_fd = datagramSocketAtFd(reused_fd);
14311454
done.store(true, std::memory_order_release);
14321455
reader.join();
14331456

1457+
ASSERT_EQ(reused_fd, datagram_fd);
1458+
14341459
EXPECT_FALSE(NativeSocketInterposer::instance()->isStreamSocket(datagram_fd));
14351460
EXPECT_TRUE(NativeSocketInterposer::instance()->isDatagramSocket(datagram_fd));
14361461
ASSERT_EQ(0, closeThroughHook(datagram_fd));

ddprof-lib/src/test/cpp/threadFilter_ut.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,6 +701,22 @@ TEST_F(ThreadFilterTest, BlockRunSnapshotCapturesFirstCallTraceId) {
701701
EXPECT_EQ(0ULL, snapshot.correlation_id);
702702
}
703703

704+
TEST_F(ThreadFilterTest, BlockRunSnapshotTreatsOwnerlessStateAsInactive) {
705+
int slot_id = filter->registerThread();
706+
ASSERT_GE(slot_id, 0);
707+
ThreadFilter::Slot *slot = filter->slotForId(slot_id);
708+
ASSERT_NE(nullptr, slot);
709+
710+
slot->active_block_state.store(OSThreadState::IO_WAIT, std::memory_order_release);
711+
slot->active_block_owner.store(static_cast<int>(BlockRunOwner::NONE),
712+
std::memory_order_release);
713+
714+
BlockRunSnapshot snapshot = filter->snapshotBlockedRun(slot_id);
715+
EXPECT_FALSE(snapshot.active);
716+
EXPECT_EQ(BlockRunOwner::NONE, snapshot.owner);
717+
EXPECT_EQ(OSThreadState::IO_WAIT, snapshot.active_state);
718+
}
719+
704720
TEST_F(ThreadFilterTest, BlockRunSnapshotCapturesFirstCorrelationId) {
705721
int slot_id = filter->registerThread();
706722
ASSERT_GE(slot_id, 0);

0 commit comments

Comments
 (0)