Skip to content

Commit 8e5f0b9

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
++
1 parent e89ddf3 commit 8e5f0b9

2 files changed

Lines changed: 47 additions & 41 deletions

File tree

include/ipfixprobe/outputPlugin/outputStorage/mc2OutputStorage.hpp

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,27 @@ class MC2OutputStorage : public MCOutputStorage<ElementType> {
2222
const Reference<OutputContainer<ElementType>>& container,
2323
const uint8_t writerId) noexcept override
2424
{
25-
typename MCOutputStorage<ElementType>::Queue& queue = this->m_queues[writerId];
25+
typename MCOutputStorage<ElementType>::Queue& queue = this->m_queues[writerId].get();
2626
const std::size_t enqueCount = queue.enqueCount.load(std::memory_order_acquire);
27-
const std::size_t writeIndex = enqueCount % queue.storage.size();
28-
if (enqueCount >= queue.storage.size()
29-
&& enqueCount - queue.storage.size() >= queue.cachedFinishedIndex) {
27+
const std::size_t writeIndex = enqueCount % queue.storage->size();
28+
if (enqueCount >= queue.storage->size()
29+
&& enqueCount - queue.storage->size() >= queue.cachedFinishedIndex) {
3030
queue.cachedFinishedIndex
31-
= queue.groupData.finishedIndex.load(std::memory_order_acquire);
31+
= queue.groupData->finishedIndex.load(std::memory_order_acquire);
3232
}
3333
BackoffScheme backoffScheme(10, std::numeric_limits<std::size_t>::max());
34-
while (queue.enqueCount.load(std::memory_order_acquire) >= queue.storage.size()
35-
&& queue.enqueCount.load(std::memory_order_acquire) - queue.storage.size()
34+
while (queue.enqueCount.load(std::memory_order_acquire) >= queue.storage->size()
35+
&& queue.enqueCount.load(std::memory_order_acquire) - queue.storage->size()
3636
>= queue.cachedFinishedIndex) {
3737
backoffScheme.backoff();
3838
queue.cachedFinishedIndex
39-
= queue.groupData.finishedIndex.load(std::memory_order_acquire);
39+
= queue.groupData->finishedIndex.load(std::memory_order_acquire);
4040
}
4141

4242
// std::atomic_thread_fence(std::memory_order_seq_cst);
4343
// this->m_allocationBuffer->replace(queue.storage[writeIndex], element, writerId);
4444
// this->assignAndDeallocate(queue.storage[writeIndex], container, writerId);
45-
queue.storage[writeIndex].assign(container, this->makeDeallocationCallback(writerId));
45+
queue.storage.get()[writeIndex].assign(container, this->makeDeallocationCallback(writerId));
4646
// std::atomic_thread_fence(std::memory_order_seq_cst);
4747
queue.enqueCount.fetch_add(1, std::memory_order_release);
4848
/*if (queue.storage[writeIndex].getData().storage.size() == 0) {
@@ -57,7 +57,7 @@ class MC2OutputStorage : public MCOutputStorage<ElementType> {
5757
= this->m_readersData[readerIndex].get();
5858
if (readerData.lastReadSuccessful) {
5959
this->m_queues[readerData.lastQueueIndex % this->m_queues.size()]
60-
.groupData.readsFinished++;
60+
->groupData->readsFinished.fetch_add(1, std::memory_order_acq_rel);
6161
}
6262

6363
if (readerData.shiftQueue) {
@@ -67,10 +67,11 @@ class MC2OutputStorage : public MCOutputStorage<ElementType> {
6767
}
6868
for (uint8_t queueShifts = 0; queueShifts < this->m_expectedWritersCount; queueShifts++) {
6969
const uint8_t currentQueueIndex = readerData.lastQueueIndex % this->m_queues.size();
70-
typename MCOutputStorage<ElementType>::Queue& queue = this->m_queues[currentQueueIndex];
70+
typename MCOutputStorage<ElementType>::Queue& queue
71+
= this->m_queues[currentQueueIndex].get();
7172
queue.sync();
7273
const std::size_t dequeTry
73-
= queue.groupData.dequeueTries.fetch_add(1, std::memory_order_acq_rel);
74+
= queue.groupData->dequeueTries.fetch_add(1, std::memory_order_acq_rel);
7475
// const std::size_t d_x = readerData.cachedEnqueCounts[currentQueueIndex];
7576
// const std::size_t d_enqueCount = queue.enqueCount.load();
7677
// const auto d_enque = queue.enqueCount.load(std::memory_order_acquire);
@@ -80,28 +81,28 @@ class MC2OutputStorage : public MCOutputStorage<ElementType> {
8081
}
8182
// const std::size_t d_y = readerData.cachedEnqueCounts[currentQueueIndex];
8283
if (dequeTry >= readerData.cachedEnqueCounts[currentQueueIndex]) {
83-
queue.groupData.dequeueTries.fetch_sub(1, std::memory_order_acq_rel);
84+
queue.groupData->dequeueTries.fetch_sub(1, std::memory_order_acq_rel);
8485
readerData.lastQueueIndex++;
8586
readerData.readWithoutShift = 0;
8687
// readerData.cachedEnqueCount = 0;
8788
continue;
8889
}
8990
readerData.readWithoutShift++;
9091
// TODO originally was 256
91-
if (readerData.readWithoutShift == queue.storage.size()) {
92+
if (readerData.readWithoutShift == queue.storage->size()) {
9293
this->shiftAllQueues();
9394
}
9495

9596
std::atomic_thread_fence(std::memory_order_seq_cst);
9697
const std::size_t readIndex
97-
= queue.groupData.readRank.fetch_add(1, std::memory_order_acq_rel)
98-
% queue.storage.size();
98+
= queue.groupData->readRank.fetch_add(1, std::memory_order_acq_rel)
99+
% queue.storage->size();
99100
readerData.lastReadSuccessful = true;
100101
/*const auto x = queue.storage[readIndex].getData().written.load();
101102
if (queue.storage[readIndex].getData().storage.size() == 0) {
102103
throw std::runtime_error("Attempting to read empty container.");
103104
}*/
104-
return &queue.storage[readIndex].getData();
105+
return &queue.storage.get()[readIndex].getData();
105106
}
106107
readerData.lastReadSuccessful = false;
107108
BackoffScheme(0, 1).backoff();

include/ipfixprobe/outputPlugin/outputStorage/mcOutputStorage.hpp

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,23 +41,23 @@ class MCOutputStorage : public OutputStorage<ElementType> {
4141
const Reference<OutputContainer<ElementType>>& container,
4242
const uint8_t writerId) noexcept override
4343
{
44-
Queue& queue = m_queues[writerId];
44+
Queue& queue = m_queues[writerId].get();
4545
const std::size_t enqueCount = queue.enqueCount.load(std::memory_order_acquire);
46-
const std::size_t writeIndex = enqueCount % queue.storage.size();
47-
if (enqueCount >= queue.storage.size()
48-
&& enqueCount - queue.storage.size() >= queue.cachedFinishedIndex) {
46+
const std::size_t writeIndex = enqueCount % queue.storage->size();
47+
if (enqueCount >= queue.storage->size()
48+
&& enqueCount - queue.storage->size() >= queue.cachedFinishedIndex) {
4949
queue.cachedFinishedIndex
50-
= queue.groupData.finishedIndex.load(std::memory_order_acquire);
50+
= queue.groupData->finishedIndex.load(std::memory_order_acquire);
5151
}
52-
if (enqueCount >= queue.storage.size()
53-
&& enqueCount - queue.storage.size() >= queue.cachedFinishedIndex) {
52+
if (enqueCount >= queue.storage->size()
53+
&& enqueCount - queue.storage->size() >= queue.cachedFinishedIndex) {
5454
// this->m_allocationBuffer->deallocate(container.getCounter(), writerId);
5555
BackoffScheme(0, 1).backoff();
5656
return false;
5757
}
5858

5959
// std::atomic_thread_fence(std::memory_order_seq_cst);
60-
queue.storage[writeIndex].assign(container, this->makeDeallocationCallback(writerId));
60+
queue.storage.get()[writeIndex].assign(container, this->makeDeallocationCallback(writerId));
6161
// this->assignAndDeallocate(queue.storage[writeIndex], container, writerId);
6262
/*queue.storage[writeIndex].assign(
6363
std::move(Reference<OutputContainer<ElementType>>(*container)),
@@ -72,7 +72,8 @@ class MCOutputStorage : public OutputStorage<ElementType> {
7272
{
7373
ReaderData& readerData = m_readersData[readerIndex].get();
7474
if (readerData.lastReadSuccessful) {
75-
m_queues[readerData.lastQueueIndex % m_queues.size()].groupData.readsFinished++;
75+
m_queues[readerData.lastQueueIndex % m_queues.size()]
76+
->groupData->readsFinished.fetch_add(1, std::memory_order_acq_rel);
7677
}
7778

7879
if (readerData.shiftQueue) {
@@ -82,10 +83,10 @@ class MCOutputStorage : public OutputStorage<ElementType> {
8283
}
8384
for (uint8_t queueShifts = 0; queueShifts < this->m_expectedWritersCount; queueShifts++) {
8485
const uint8_t currentQueueIndex = readerData.lastQueueIndex % m_queues.size();
85-
Queue& queue = m_queues[currentQueueIndex];
86+
Queue& queue = m_queues[currentQueueIndex].get();
8687
queue.sync();
8788
const std::size_t dequeTry
88-
= queue.groupData.dequeueTries.fetch_add(1, std::memory_order_acq_rel);
89+
= queue.groupData->dequeueTries.fetch_add(1, std::memory_order_acq_rel);
8990
// const std::size_t d_x = readerData.cachedEnqueCounts[currentQueueIndex];
9091
// const std::size_t d_enqueCount = queue.enqueCount.load();
9192
if (dequeTry >= readerData.cachedEnqueCounts[currentQueueIndex]) {
@@ -94,7 +95,7 @@ class MCOutputStorage : public OutputStorage<ElementType> {
9495
}
9596
// const std::size_t d_y = readerData.cachedEnqueCounts[currentQueueIndex];
9697
if (dequeTry >= readerData.cachedEnqueCounts[currentQueueIndex]) {
97-
queue.groupData.dequeueTries.fetch_sub(1, std::memory_order_acq_rel);
98+
queue.groupData->dequeueTries.fetch_sub(1, std::memory_order_acq_rel);
9899
readerData.lastQueueIndex++;
99100
readerData.readWithoutShift = 0;
100101
// readerData.cachedEnqueCount = 0;
@@ -103,22 +104,22 @@ class MCOutputStorage : public OutputStorage<ElementType> {
103104
readerData.readWithoutShift++;
104105
// TODO originally was 256
105106
// bool d_s = false;
106-
if (readerData.readWithoutShift == queue.storage.size()) {
107+
if (readerData.readWithoutShift == queue.storage->size()) {
107108
this->shiftAllQueues();
108109
// d_s = true;
109110
}
110111
// std::atomic_thread_fence(std::memory_order_seq_cst);
111112
const std::size_t readIndex
112-
= queue.groupData.readRank.fetch_add(1, std::memory_order_acq_rel)
113-
% queue.storage.size();
113+
= queue.groupData->readRank.fetch_add(1, std::memory_order_acq_rel)
114+
% queue.storage->size();
114115
// std::atomic_thread_fence(std::memory_order_seq_cst);
115116

116117
/*if (readerData.cachedEnqueCounts[currentQueueIndex] > queue.enqueCount) {
117118
throw std::runtime_error("XXXXX");
118119
}*/
119120

120121
readerData.lastReadSuccessful = true;
121-
return &queue.storage[readIndex].getData();
122+
return &queue.storage.get()[readIndex].getData();
122123
}
123124
readerData.lastReadSuccessful = false;
124125
BackoffScheme(0, 1).backoff();
@@ -128,7 +129,9 @@ class MCOutputStorage : public OutputStorage<ElementType> {
128129
bool finished() noexcept override
129130
{
130131
return !this->writersPresent()
131-
&& std::ranges::all_of(m_queues, [&](const Queue& queue) { return queue.finished(); });
132+
&& std::ranges::all_of(m_queues, [&](const CacheAlligned<Queue>& queue) {
133+
return queue->finished();
134+
});
132135
}
133136

134137
protected:
@@ -158,23 +161,23 @@ class MCOutputStorage : public OutputStorage<ElementType> {
158161
void sync() noexcept
159162
{
160163
const std::size_t readsFinished
161-
= groupData.readsFinished.load(std::memory_order_acquire);
162-
const std::size_t readIndex = groupData.readRank.load(std::memory_order_acquire);
164+
= groupData->readsFinished.load(std::memory_order_acquire);
165+
const std::size_t readIndex = groupData->readRank.load(std::memory_order_acquire);
163166
if (readIndex == readsFinished) {
164-
groupData.finishedIndex.store(readIndex, std::memory_order_release);
167+
groupData->finishedIndex.store(readIndex, std::memory_order_release);
165168
}
166169
}
167170

168171
bool finished() const noexcept
169172
{
170-
return groupData.finishedIndex.load(std::memory_order_acquire)
173+
return groupData->finishedIndex.load(std::memory_order_acquire)
171174
>= enqueCount.load(std::memory_order_acquire);
172175
}
173176

174177
std::atomic<uint64_t> enqueCount {0};
175178
uint64_t cachedFinishedIndex {0};
176-
std::span<Reference<OutputContainer<ElementType>>> storage;
177-
GroupData groupData;
179+
CacheAlligned<GroupData> groupData;
180+
CacheAlligned<std::span<Reference<OutputContainer<ElementType>>>> storage;
178181
// std::span<CacheAlligned<GroupData>> d_groupData {groupData.data(), groupData.capacity()};
179182
};
180183

@@ -187,7 +190,9 @@ class MCOutputStorage : public OutputStorage<ElementType> {
187190
}
188191

189192
std::mutex m_registrationMutex;
190-
boost::container::static_vector<Queue, OutputStorage<ElementType>::MAX_WRITERS_COUNT> m_queues;
193+
boost::container::
194+
static_vector<CacheAlligned<Queue>, OutputStorage<ElementType>::MAX_WRITERS_COUNT>
195+
m_queues;
191196
std::array<CacheAlligned<ReaderData>, OutputStorage<ElementType>::MAX_READERS_COUNT>
192197
m_readersData;
193198
// uint8_t m_queueShift {0};

0 commit comments

Comments
 (0)