Skip to content

Commit 8567a50

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
++
1 parent 608b93d commit 8567a50

12 files changed

Lines changed: 56 additions & 86 deletions

include/ipfixprobe/outputPlugin/outputStorage/b2OutputStorage.hpp

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
4848
}
4949
}
5050

51-
uint8_t loopCounter = 0;
51+
// uint8_t loopCounter = 0;
5252
BackoffScheme backoffScheme(2, std::numeric_limits<std::size_t>::max());
5353
do {
5454
const bool overflowed = writerData.randomShift();
@@ -112,30 +112,31 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
112112
return this->getNextElement(readerData.bucketAllocation);
113113
}
114114

115-
uint8_t loopCounter = 0;
115+
// uint8_t loopCounter = 0;
116116
uint64_t cachedGeneration;
117117
uint16_t cachedBucketIndex;
118-
const uint16_t initialPosition = readerData.readPosition;
118+
// const uint16_t initialPosition = readerData.readPosition;
119+
BackoffScheme backoffScheme(0, std::numeric_limits<std::size_t>::max());
119120
do {
120121
readerData.shift(this->m_readerGroupSizes[readerGroupIndex], localReaderIndex);
121122

122-
auto& y = this->m_buckets[readerData.readPosition];
123+
// auto& y = this->m_buckets[readerData.readPosition];
123124
if (readerData.isOnBufferBegin(this->m_readerGroupSizes[readerGroupIndex])) {
124125
if (!this->writersPresent()) {
125126
readerData.generation++;
126-
updateLowestReaderGeneration(globalReaderIndex);
127+
updateLowestReaderGeneration();
127128
return nullptr;
128129
}
129130
if (!readerData.seenValidBucket) {
130-
updateLowestReaderGeneration(globalReaderIndex);
131-
std::this_thread::yield();
131+
updateLowestReaderGeneration();
132+
backoffScheme.backoff();
132133
readerData.skipLoop = true;
133134
return nullptr;
134135
}
135136
readerData.generation++;
136137
readerData.seenValidBucket = false;
137138
readerData.skipLoop = false;
138-
updateLowestReaderGeneration(globalReaderIndex);
139+
updateLowestReaderGeneration();
139140
}
140141
cachedGeneration = this->m_buckets[readerData.readPosition].generation;
141142
std::atomic_thread_fence(std::memory_order_acquire);
@@ -159,7 +160,7 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
159160
}*/
160161

161162
protected:
162-
void updateLowestReaderGeneration(const uint8_t globalReaderIndex) noexcept
163+
void updateLowestReaderGeneration() noexcept
163164
{
164165
const auto readerGenerations
165166
= this->m_readersData
@@ -170,16 +171,6 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
170171
uint64_t,
171172
OutputStorage<ElementType>::MAX_READERS_COUNT>>();
172173
const uint64_t highestReaderGeneration = *std::ranges::max_element(readerGenerations);
173-
/*uint64_t expected;
174-
do {
175-
expected = m_highestReaderGeneration.load();
176-
if (highestReaderGeneration <= expected) {
177-
break;
178-
}
179-
} while (m_highestReaderGeneration.compare_exchange_weak(
180-
expected,
181-
highestReaderGeneration,
182-
std::memory_order_release));*/
183174
casMax(this->m_highestReaderGeneration, highestReaderGeneration);
184175
// m_highestReaderGeneration = highestReaderGeneration;
185176
const uint64_t lowestReaderGeneration = *std::ranges::min_element(readerGenerations);

include/ipfixprobe/outputPlugin/outputStorage/bOutputStorage.hpp

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
// #include "bucketAllocator.hpp"
4+
#include "backoffScheme.hpp"
45
#include "fastRandomGenerator.hpp"
56
#include "outputStorage.hpp"
67
#include "spinlock.hpp"
@@ -91,10 +92,6 @@ class BOutputStorage : public OutputStorage<ElementType> {
9192
ElementType*& getNextElement(BucketAllocation& position) noexcept
9293
{
9394
const uint64_t containerIndex = position.containerIndex++;
94-
if (position.bucketIndex * BUCKET_SIZE + containerIndex >= this->m_storage.size()
95-
|| containerIndex >= BUCKET_SIZE) {
96-
throw std::runtime_error("Should not happen");
97-
}
9895
return this->m_storage[position.bucketIndex * BUCKET_SIZE + containerIndex];
9996
}
10097

@@ -121,19 +118,18 @@ class BOutputStorage : public OutputStorage<ElementType> {
121118
return true;
122119
}
123120

124-
uint8_t loopCounter = 0;
121+
// uint8_t loopCounter = 0;
125122
// const uint16_t initialPosition = writerData.writePosition;
123+
BackoffScheme backoffScheme(0, std::numeric_limits<std::size_t>::max());
126124
do {
127125
const bool overflowed = writerData.randomShift();
128-
d_writerShifts++;
129126
if (overflowed) {
130127
writerData.cachedLowestReaderGeneration = m_lowestReaderGeneration.load();
131128
if (containersLeft == 0) {
132129
// container.deallocate(*m_allocationBuffer);
133130
this->m_allocationBuffer->deallocate(element, writerIndex);
134131
}
135-
d_writerYields++;
136-
std::this_thread::yield();
132+
backoffScheme.backoff();
137133
return false;
138134
}
139135

@@ -181,43 +177,36 @@ class BOutputStorage : public OutputStorage<ElementType> {
181177
ReaderData& readerData = m_readersData[globalReaderIndex].get();
182178
// const uint64_t readPosition = readerData.readPosition;
183179
if (readerData.bucketAllocation.containersLeft()) {
184-
if (!BucketAllocation::isValidBucketIndex(readerData.bucketAllocation.bucketIndex)) {
180+
/*if (!BucketAllocation::isValidBucketIndex(readerData.bucketAllocation.bucketIndex)) {
185181
throw std::runtime_error("Should not happen");
186-
}
182+
}*/
187183
return getNextElement(readerData.bucketAllocation);
188184
}
189185

190-
if (readerData.readPosition % this->m_readerGroupSizes[readerGroupIndex]
191-
!= localReaderIndex) {
192-
throw std::runtime_error("Should not happen");
193-
}
194-
195-
uint8_t loopCounter = 0;
186+
// uint8_t loopCounter = 0;
196187
uint64_t cachedGeneration;
197188
uint16_t cachedBucketIndex;
198-
const uint16_t initialPosition = readerData.readPosition;
189+
BackoffScheme backoffScheme(0, std::numeric_limits<std::size_t>::max());
199190
do {
200191
readerData.shift(this->m_readerGroupSizes[readerGroupIndex], localReaderIndex);
201-
d_readerShifts++;
202192

203-
auto& y = m_buckets[readerData.readPosition];
193+
// auto& y = m_buckets[readerData.readPosition];
204194
if (readerData.isOnBufferBegin(this->m_readerGroupSizes[readerGroupIndex])) {
205195
if (!this->writersPresent()) {
206196
readerData.generation++;
207-
updateLowestReaderGeneration(globalReaderIndex);
197+
updateLowestReaderGeneration();
208198
return nullptr;
209199
}
210200
if (!readerData.seenValidBucket) {
211-
updateLowestReaderGeneration(globalReaderIndex);
212-
std::this_thread::yield();
213-
d_readerYields++;
201+
updateLowestReaderGeneration();
202+
backoffScheme.backoff();
214203
readerData.skipLoop = true;
215204
return nullptr;
216205
}
217206
readerData.generation++;
218207
readerData.seenValidBucket = false;
219208
readerData.skipLoop = false;
220-
updateLowestReaderGeneration(globalReaderIndex);
209+
updateLowestReaderGeneration();
221210
}
222211
cachedGeneration = m_buckets[readerData.readPosition].generation;
223212
std::atomic_thread_fence(std::memory_order_acquire);
@@ -274,7 +263,7 @@ class BOutputStorage : public OutputStorage<ElementType> {
274263
}
275264
};
276265

277-
void updateLowestReaderGeneration(const uint8_t globalReaderIndex) noexcept
266+
void updateLowestReaderGeneration() noexcept
278267
{
279268
boost::container::static_vector<uint64_t, OutputStorage<ElementType>::MAX_READERS_COUNT>
280269
readerGenerations = m_readersData
@@ -376,15 +365,6 @@ class BOutputStorage : public OutputStorage<ElementType> {
376365
static_vector<std::atomic_uint64_t, OutputStorage<ElementType>::MAX_WRITERS_COUNT>
377366
m_alreadyReadGroupPositions;
378367
std::mutex m_registrationMutex;
379-
std::size_t d_writerShifts {0};
380-
std::size_t d_writerYields {0};
381-
std::size_t d_readerShifts {0};
382-
std::size_t d_readerYields {0};
383-
std::size_t d_readerJumps {0};
384-
std::size_t d_writerJumps {0};
385-
std::size_t d_writerGenerationBigger {0};
386-
std::size_t d_readerGenerationBigger {0};
387-
std::size_t d_writerInitialJumps {0};
388368
/*std::array<std::atomic<uint64_t>, ALLOCATION_BUFFER_CAPACITY / BUCKET_SIZE>
389369
m_writersFinished; std::atomic_uint64_t m_nextWritePos {0}; std::atomic_uint64_t
390370
m_confirmedPos {0}; std::atomic_uint64_t m_writtenPos {0}; bool m_initialized {false};*/

include/ipfixprobe/outputPlugin/outputStorage/backoffScheme.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class BackoffScheme {
2121
if (m_waitCounter < m_shortWaitThreshold) {
2222
for (volatile const auto _ : std::views::iota(0, 10'000)) {}
2323
} else if (m_waitCounter < m_longWaitThreshold) {
24+
return false;
2425
std::this_thread::yield();
2526
} else {
2627
return false;

include/ipfixprobe/outputPlugin/outputStorage/ffq2OutputStorage.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class FFQ2OutputStorage : public FFQOutputStorage<ElementType> {
4141

4242
ElementType* read(
4343
const std::size_t readerGroupIndex,
44-
const uint8_t localReaderIndex,
44+
[[maybe_unused]] const uint8_t localReaderIndex,
4545
const uint8_t globalReaderIndex) noexcept override
4646
{
4747
BackoffScheme backoffScheme(30, std::numeric_limits<std::size_t>::max());

include/ipfixprobe/outputPlugin/outputStorage/ffqOutputStorage.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class FFQOutputStorage : public OutputStorage<ElementType> {
4747

4848
ElementType* read(
4949
const std::size_t readerGroupIndex,
50-
const uint8_t localReaderIndex,
50+
[[maybe_unused]] const uint8_t localReaderIndex,
5151
const uint8_t globalReaderIndex) noexcept override
5252
{
5353
BackoffScheme backoffScheme(30, 1);

include/ipfixprobe/outputPlugin/outputStorage/lfnbOutputStorage.hpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,13 @@ class LFNBOutputStorage : public OutputStorage<ElementType> {
4545
const uint64_t writePosition
4646
= sequentialWritePosition % OutputStorage<ElementType>::ALLOCATION_BUFFER_CAPACITY;
4747

48+
BackoffScheme backoffScheme(0, std::numeric_limits<std::size_t>::max());
4849
while (m_writersFinished[writePosition / BUCKET_SIZE].load(std::memory_order_acquire)
4950
/ BUCKET_SIZE
5051
!= sequentialWritePosition
5152
/ OutputStorage<ElementType>::ALLOCATION_BUFFER_CAPACITY
5253
|| !bucketIsRead(writePosition / BUCKET_SIZE)) {
53-
std::this_thread::yield();
54+
backoffScheme.backoff();
5455
}
5556

5657
this->m_allocationBuffer->replace(this->m_storage[writePosition], element, writerId);
@@ -72,15 +73,17 @@ class LFNBOutputStorage : public OutputStorage<ElementType> {
7273
const uint64_t sequentialReadPosition = m_readerGroupPositions[readerGroupIndex]++;
7374
const uint64_t readPosition
7475
= sequentialReadPosition % OutputStorage<ElementType>::ALLOCATION_BUFFER_CAPACITY;
76+
BackoffScheme backoffScheme(0, std::numeric_limits<std::size_t>::max());
7577
while (
7678
(m_readersFinished[readPosition / BUCKET_SIZE].load(std::memory_order_acquire)
7779
/ (BUCKET_SIZE * this->m_readerGroupsCount)
7880
!= sequentialReadPosition / OutputStorage<ElementType>::ALLOCATION_BUFFER_CAPACITY
7981
|| !bucketIsWritten(readPosition / BUCKET_SIZE))
8082
&& this->writersPresent()) {
81-
std::this_thread::yield();
83+
backoffScheme.backoff();
8284
}
8385

86+
// TODO Maybe Remove
8487
std::atomic_thread_fence(std::memory_order_acquire);
8588
if (sequentialReadPosition >= m_nextWritePos.load()) {
8689
readerData.lastReadPosition = std::nullopt;
@@ -93,7 +96,7 @@ class LFNBOutputStorage : public OutputStorage<ElementType> {
9396
return this->m_storage[readPosition];
9497
}
9598

96-
bool finished(const std::size_t readerGroupIndex) noexcept override
99+
bool finished([[maybe_unused]] const std::size_t readerGroupIndex) noexcept override
97100
{
98101
return !this->writersPresent()
99102
&& std::ranges::all_of(m_readerGroupPositions, [&](const auto& position) {

include/ipfixprobe/outputPlugin/outputStorage/mc2OutputStorage.hpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class MC2OutputStorage : public MCOutputStorage<ElementType> {
3838

3939
ElementType* read(
4040
const std::size_t readerGroupIndex,
41-
const uint8_t localReaderIndex,
41+
[[maybe_unused]] const uint8_t localReaderIndex,
4242
const uint8_t globalReaderIndex) noexcept override
4343
{
4444
typename MCOutputStorage<ElementType>::ReaderData& readerData
@@ -59,12 +59,12 @@ class MC2OutputStorage : public MCOutputStorage<ElementType> {
5959
typename MCOutputStorage<ElementType>::Queue& queue = this->m_queues[currentQueueIndex];
6060
queue.sync(readerGroupIndex);
6161
const std::size_t dequeCount = queue.groupData[readerGroupIndex]->dequeueCount++;
62-
const std::size_t d_x = readerData.cachedEnqueCounts[currentQueueIndex];
62+
// const std::size_t d_x = readerData.cachedEnqueCounts[currentQueueIndex];
6363
// const std::size_t d_enqueCount = queue.enqueCount.load();
6464
if (dequeCount >= readerData.cachedEnqueCounts[currentQueueIndex]) {
6565
readerData.cachedEnqueCounts[currentQueueIndex] = queue.enqueCount;
6666
}
67-
const std::size_t d_y = readerData.cachedEnqueCounts[currentQueueIndex];
67+
// const std::size_t d_y = readerData.cachedEnqueCounts[currentQueueIndex];
6868
if (dequeCount >= readerData.cachedEnqueCounts[currentQueueIndex]) {
6969
queue.groupData[readerGroupIndex]->dequeueCount--;
7070
readerData.lastQueueIndex++;
@@ -74,10 +74,10 @@ class MC2OutputStorage : public MCOutputStorage<ElementType> {
7474
}
7575
readerData.readWithoutShift++;
7676
// TODO originally was 256
77-
bool d_s = false;
77+
// bool d_s = false;
7878
if (readerData.readWithoutShift == queue.storage.size()) {
7979
this->shiftAllQueues();
80-
d_s = true;
80+
// d_s = true;
8181
}
8282
// std::atomic_thread_fence(std::memory_order_seq_cst);
8383
const std::size_t readIndex
@@ -99,11 +99,11 @@ class MC2OutputStorage : public MCOutputStorage<ElementType> {
9999
return queue.storage[readIndex];
100100
}
101101
readerData.lastReadSuccessful = false;
102-
std::this_thread::yield();
102+
BackoffScheme(0, 1).backoff();
103103
return nullptr;
104104
}
105105

106-
bool finished(const std::size_t readerGroupIndex) noexcept override
106+
bool finished([[maybe_unused]] const std::size_t readerGroupIndex) noexcept override
107107
{
108108
return !this->writersPresent()
109109
&& std::ranges::all_of(

include/ipfixprobe/outputPlugin/outputStorage/mcOutputStorage.hpp

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class MCOutputStorage : public OutputStorage<ElementType> {
6363
if (queue.enqueCount >= queue.storage.size()
6464
&& queue.enqueCount - queue.storage.size() >= queue.cachedLowestHeadIndex) {
6565
this->m_allocationBuffer->deallocate(element, writerId);
66-
std::this_thread::yield();
66+
BackoffScheme(0, 1).backoff();
6767
return false;
6868
}
6969

@@ -77,7 +77,7 @@ class MCOutputStorage : public OutputStorage<ElementType> {
7777

7878
ElementType* read(
7979
const std::size_t readerGroupIndex,
80-
const uint8_t localReaderIndex,
80+
[[maybe_unused]] const uint8_t localReaderIndex,
8181
const uint8_t globalReaderIndex) noexcept override
8282
{
8383
ReaderData& readerData = m_readersData[globalReaderIndex].get();
@@ -97,12 +97,12 @@ class MCOutputStorage : public OutputStorage<ElementType> {
9797
Queue& queue = m_queues[currentQueueIndex];
9898
queue.sync(readerGroupIndex);
9999
const std::size_t dequeCount = queue.groupData[readerGroupIndex]->dequeueCount++;
100-
const std::size_t d_x = readerData.cachedEnqueCounts[currentQueueIndex];
101-
const std::size_t d_enqueCount = queue.enqueCount.load();
100+
// const std::size_t d_x = readerData.cachedEnqueCounts[currentQueueIndex];
101+
// const std::size_t d_enqueCount = queue.enqueCount.load();
102102
if (dequeCount >= readerData.cachedEnqueCounts[currentQueueIndex]) {
103103
readerData.cachedEnqueCounts[currentQueueIndex] = queue.enqueCount;
104104
}
105-
const std::size_t d_y = readerData.cachedEnqueCounts[currentQueueIndex];
105+
// const std::size_t d_y = readerData.cachedEnqueCounts[currentQueueIndex];
106106
if (dequeCount >= readerData.cachedEnqueCounts[currentQueueIndex]) {
107107
queue.groupData[readerGroupIndex]->dequeueCount--;
108108
readerData.lastQueueIndex++;
@@ -112,17 +112,16 @@ class MCOutputStorage : public OutputStorage<ElementType> {
112112
}
113113
readerData.readWithoutShift++;
114114
// TODO originally was 256
115-
bool d_s = false;
115+
// bool d_s = false;
116116
if (readerData.readWithoutShift == queue.storage.size()) {
117117
this->shiftAllQueues();
118-
d_s = true;
118+
// d_s = true;
119119
}
120120
// std::atomic_thread_fence(std::memory_order_seq_cst);
121121
const std::size_t readIndex
122122
= queue.groupData[readerGroupIndex]->headIndex++ % queue.storage.size();
123123
// std::atomic_thread_fence(std::memory_order_seq_cst);
124124

125-
auto& y = queue.groupData[readerGroupIndex];
126125
/*if (readerData.cachedEnqueCounts[currentQueueIndex] > queue.enqueCount) {
127126
throw std::runtime_error("XXXXX");
128127
}*/
@@ -131,11 +130,11 @@ class MCOutputStorage : public OutputStorage<ElementType> {
131130
return queue.storage[readIndex];
132131
}
133132
readerData.lastReadSuccessful = false;
134-
std::this_thread::yield();
133+
BackoffScheme(0, 1).backoff();
135134
return nullptr;
136135
}
137136

138-
bool finished(const std::size_t readerGroupIndex) noexcept override
137+
bool finished([[maybe_unused]] const std::size_t readerGroupIndex) noexcept override
139138
{
140139
return !this->writersPresent()
141140
&& std::ranges::all_of(m_queues, [&](const Queue& queue) { return queue.finished(); });

0 commit comments

Comments
 (0)