Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/dbscan/cell.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,6 @@ struct cellHash {

bool cas(eType* p, eType o, eType n) {
return std::atomic_compare_exchange_strong_explicit(
reinterpret_cast<std::atomic<eType>*>(p), &o, n, std::memory_order_relaxed, std::memory_order_relaxed);
reinterpret_cast<std::atomic<eType>*>(p), &o, n, std::memory_order_acq_rel, std::memory_order_acquire);
}
};
2 changes: 1 addition & 1 deletion include/dbscan/pbbs/ndHash.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ struct hashSimplePair {
bool replaceQ(eType s, eType s2) {return 0;}//return s.second > s2.second;}
bool cas(eType* p, eType o, eType n) {
return std::atomic_compare_exchange_strong_explicit(
reinterpret_cast<std::atomic<eType>*>(p), &o, n, std::memory_order_relaxed, std::memory_order_relaxed);
reinterpret_cast<std::atomic<eType>*>(p), &o, n, std::memory_order_acq_rel, std::memory_order_acquire);
}
};

Expand Down
26 changes: 13 additions & 13 deletions include/dbscan/pbbs/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,23 @@ struct Deque {
Deque() : bot(0), age(age_t{0, 0}) {}

void push_bottom(Job* job) {
auto local_bot = bot.load(std::memory_order_relaxed); // atomic load
deq[local_bot].job.store(job, std::memory_order_relaxed); // shared store
auto local_bot = bot.load(std::memory_order_acquire); // atomic load
deq[local_bot].job.store(job, std::memory_order_release); // shared store
local_bot += 1;
if (local_bot == q_size) {
throw std::runtime_error("internal error: scheduler queue overflow");
}
bot.store(local_bot, std::memory_order_relaxed); // shared store
bot.store(local_bot, std::memory_order_release); // shared store
std::atomic_thread_fence(std::memory_order_seq_cst);
}

Job* pop_top() {
Job* result = nullptr;
auto old_age = age.load(std::memory_order_relaxed); // atomic load
auto local_bot = bot.load(std::memory_order_relaxed); // atomic load
auto old_age = age.load(std::memory_order_acquire); // atomic load
auto local_bot = bot.load(std::memory_order_acquire); // atomic load
if (local_bot > old_age.top) {
auto job =
deq[old_age.top].job.load(std::memory_order_relaxed); // atomic load
deq[old_age.top].job.load(std::memory_order_acquire); // atomic load
auto new_age = old_age;
new_age.top = new_age.top + 1;
if (age.compare_exchange_strong(old_age, new_age))
Expand All @@ -117,24 +117,24 @@ struct Deque {

Job* pop_bottom() {
Job* result = nullptr;
auto local_bot = bot.load(std::memory_order_relaxed); // atomic load
auto local_bot = bot.load(std::memory_order_acquire); // atomic load
if (local_bot != 0) {
local_bot--;
bot.store(local_bot, std::memory_order_relaxed); // shared store
bot.store(local_bot, std::memory_order_release); // shared store
std::atomic_thread_fence(std::memory_order_seq_cst);
auto job =
deq[local_bot].job.load(std::memory_order_relaxed); // atomic load
auto old_age = age.load(std::memory_order_relaxed); // atomic load
deq[local_bot].job.load(std::memory_order_acquire); // atomic load
auto old_age = age.load(std::memory_order_acquire); // atomic load
if (local_bot > old_age.top)
result = job;
else {
bot.store(0, std::memory_order_relaxed); // shared store
bot.store(0, std::memory_order_release); // shared store
auto new_age = age_t{old_age.tag + 1, 0};
if ((local_bot == old_age.top) &&
age.compare_exchange_strong(old_age, new_age))
result = job;
else {
age.store(new_age, std::memory_order_relaxed); // shared store
age.store(new_age, std::memory_order_release); // shared store
result = nullptr;
}
std::atomic_thread_fence(std::memory_order_seq_cst);
Expand Down Expand Up @@ -246,7 +246,7 @@ struct scheduler {
std::vector<Deque<Job>> deques;
std::vector<attempt> attempts;
std::vector<std::thread> spawned_threads;
std::atomic<int> finished_flag;
std::atomic<bool> finished_flag;

// Start an individual scheduler task. Runs until finished().
template <typename F>
Expand Down
2 changes: 1 addition & 1 deletion include/dbscan/pbbs/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ template <class E1, class E2>
template<typename eType>
bool myCAS(eType* p, eType o, eType n) {
return std::atomic_compare_exchange_strong_explicit(
reinterpret_cast<std::atomic<eType>*>(p), &o, n, std::memory_order_relaxed, std::memory_order_relaxed);
reinterpret_cast<std::atomic<eType>*>(p), &o, n, std::memory_order_acq_rel, std::memory_order_acquire);
}

template <class ET>
Expand Down
8 changes: 4 additions & 4 deletions include/dbscan/pbbs/work_stealing_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ namespace parlay {

struct WorkStealingJob {
WorkStealingJob() {
done.store(false, std::memory_order_relaxed);
done.store(false, std::memory_order_release);
}
~WorkStealingJob() = default;
void operator()() {
assert(done.load(std::memory_order_relaxed) == false);
assert(done.load(std::memory_order_acquire) == false);
execute();
done.store(true, std::memory_order_relaxed);
done.store(true, std::memory_order_release);
}
bool finished() {
return done.load(std::memory_order_relaxed);
return done.load(std::memory_order_acquire);
}
virtual void execute() = 0;
std::atomic<bool> done;
Expand Down
Loading