Skip to content

Commit 4f5e5f9

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
++
1 parent c169297 commit 4f5e5f9

11 files changed

Lines changed: 207 additions & 31 deletions

include/ipfixprobe/outputPlugin/outputStorage/allocationBuffer2.hpp renamed to include/ipfixprobe/outputPlugin/outputStorage/allocationBuffer2.hppxxx

File renamed without changes.

include/ipfixprobe/outputPlugin/outputStorage/allocationBuffer3.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class AllocationBuffer3 : public AllocationBufferBase<ElementType> {
1919
explicit AllocationBuffer3(const std::size_t capacity, const uint8_t writersCount) noexcept
2020
: m_objectPool(capacity + writersCount * writersCount)
2121
{
22+
m_writersData.resize(writersCount);
2223
for (auto&& [queue, objects] : std::views::zip(
2324
m_queues,
2425
m_objectPool | std::views::chunk(m_objectPool.size() / writersCount))) {
@@ -94,7 +95,11 @@ class AllocationBuffer3 : public AllocationBufferBase<ElementType> {
9495
void unlock() noexcept { lock.clear(std::memory_order_release); }
9596
};
9697

98+
struct WriterData {};
99+
97100
std::vector<ElementType> m_objectPool;
101+
std::vector<CacheAlligned<WriterData>> m_writersData;
102+
98103
std::array<CacheAlligned<Queue>, 32> m_queues;
99104
std::atomic_uint64_t m_nextQueue {0};
100105
};

include/ipfixprobe/outputPlugin/outputStorage/allocationBufferBase.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,19 @@ class AllocationBufferBase {
1313

1414
virtual void deallocate(ElementType* element, const uint8_t writerId) noexcept = 0;
1515

16-
virtual void unregisterWriter() noexcept {}
16+
virtual void unregisterWriter([[maybe_unused]] const uint8_t writerId) noexcept {}
1717

18-
virtual void registerWriter() noexcept {}
18+
virtual void registerWriter([[maybe_unused]] const uint8_t writerId) noexcept {}
1919

2020
virtual ~AllocationBufferBase() = default;
2121

22-
void replace(ElementType*& oldValue, ElementType* newValue, const uint8_t writerId) noexcept
22+
/*void replace(ElementType*& oldValue, ElementType* newValue, const uint8_t writerId) noexcept
2323
{
2424
if (oldValue != nullptr) {
2525
deallocate(oldValue, writerId);
2626
}
2727
oldValue = newValue;
28-
}
28+
}*/
2929
};
3030

3131
} // namespace ipxp::output
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
#pragma once
2+
3+
#include "allocationBufferBase.hpp"
4+
#include "backoffScheme.hpp"
5+
#include "cacheAlligned.hpp"
6+
#include "fastRandomGenerator.hpp"
7+
8+
#include <algorithm>
9+
#include <atomic>
10+
#include <barrier>
11+
#include <cstddef>
12+
#include <random>
13+
#include <ranges>
14+
#include <vector>
15+
16+
namespace ipxp::output {
17+
18+
template<typename ElementType>
19+
class AllocationBufferS : public AllocationBufferBase<ElementType> {
20+
public:
21+
explicit AllocationBufferS(const std::size_t capacity, const uint8_t writersCount) noexcept
22+
: m_objectPool(capacity + writersCount)
23+
{
24+
static_assert(
25+
std::atomic<HelpState>::is_always_lock_free,
26+
"HelpState must be lock-free atomic");
27+
// m_writersData.resize(writersCount);
28+
29+
ElementType* begin = m_objectPool.data();
30+
const std::size_t objectsPerWriter = m_objectPool.size() / writersCount;
31+
for (const auto _ : std::views::iota(0U, writersCount)) {
32+
m_writersData.emplace_back(begin, objectsPerWriter);
33+
// writerData.storage = std::span<ElementType>(begin, objectsPerWriter);
34+
begin += objectsPerWriter;
35+
}
36+
}
37+
38+
void unregisterWriter(const uint8_t writerId) noexcept override
39+
{
40+
std::atomic<HelpState>& state = m_helpStates[writerId].get();
41+
HelpState expected;
42+
HelpState desired;
43+
do {
44+
expected = state.load(std::memory_order_acquire);
45+
desired = expected;
46+
desired.stealingAllowed = true;
47+
} while (!state.compare_exchange_weak(
48+
expected,
49+
desired,
50+
std::memory_order_release,
51+
std::memory_order_acquire));
52+
}
53+
54+
ElementType* allocate(const uint8_t writerIndex) noexcept override
55+
{
56+
WriterData& writerData = m_writersData[writerIndex].get();
57+
handleStealRequest(writerIndex);
58+
if (writerData.storage.empty()) {
59+
steal(writerIndex);
60+
d_stolen++;
61+
}
62+
ElementType* res = writerData.storage.back();
63+
writerData.storage.pop_back();
64+
return res;
65+
}
66+
67+
void deallocate(ElementType* element, const uint8_t writerId) noexcept override
68+
{
69+
WriterData& writerData = m_writersData[writerId].get();
70+
writerData.storage.push_back(element);
71+
}
72+
73+
private:
74+
void handleStealRequest(const uint8_t writerIndex) noexcept
75+
{
76+
if (!m_helpStates[writerIndex]->load(std::memory_order_acquire).stealingRequested) {
77+
return;
78+
}
79+
m_helpStates[writerIndex]->store(HelpState {true, true}, std::memory_order_release);
80+
while (m_helpStates[writerIndex]->load(std::memory_order_acquire).stealingRequested) {
81+
BackoffScheme(1, 0).backoff();
82+
}
83+
m_helpStates[writerIndex]->store(HelpState {false, false}, std::memory_order_release);
84+
}
85+
86+
void steal(const uint8_t writerIndex) noexcept
87+
{
88+
WriterData& writerData = m_writersData[writerIndex].get();
89+
uint8_t stealVictimIndex = writerIndex;
90+
while (writerData.storage.empty()) {
91+
stealVictimIndex = (stealVictimIndex + 1) % m_writersData.size();
92+
if (!setStealRequest(stealVictimIndex)) {
93+
continue;
94+
}
95+
while (!isStealAllowed(stealVictimIndex)) {
96+
BackoffScheme(1, 0).backoff();
97+
}
98+
WriterData& stealVictimData = m_writersData[stealVictimIndex].get();
99+
for (std::size_t i = 0; i < stealVictimData.storage.size() / 2; i++) {
100+
ElementType* stolenElement = stealVictimData.storage.back();
101+
stealVictimData.storage.pop_back();
102+
writerData.storage.push_back(stolenElement);
103+
}
104+
clearStealRequest(stealVictimIndex);
105+
}
106+
}
107+
108+
bool setStealRequest(const uint8_t victimIndex) noexcept
109+
{
110+
HelpState expected;
111+
HelpState desired;
112+
do {
113+
expected = m_helpStates[victimIndex]->load(std::memory_order_acquire);
114+
if (expected.stealingRequested) {
115+
return false;
116+
}
117+
desired = HelpState {true, expected.stealingAllowed};
118+
} while (!m_helpStates[victimIndex]->compare_exchange_weak(
119+
expected,
120+
desired,
121+
std::memory_order_release,
122+
std::memory_order_acquire));
123+
return true;
124+
}
125+
126+
bool isStealAllowed(const uint8_t victimIndex) noexcept
127+
{
128+
return m_helpStates[victimIndex]->load(std::memory_order_acquire).stealingAllowed;
129+
}
130+
131+
void clearStealRequest(const uint8_t victimIndex) noexcept
132+
{
133+
HelpState currentValue = m_helpStates[victimIndex]->load(std::memory_order_acquire);
134+
currentValue.stealingRequested = false;
135+
m_helpStates[victimIndex]->store(currentValue, std::memory_order_release);
136+
}
137+
138+
struct WriterData {
139+
explicit WriterData(ElementType* begin, const std::size_t size) noexcept
140+
// : currentUserIndex(writerIndex)
141+
{
142+
storage.reserve(size);
143+
for (const auto _ : std::views::iota(0U, size)) {
144+
storage.push_back(begin++);
145+
}
146+
}
147+
148+
std::vector<ElementType*> storage;
149+
// std::atomic<uint8_t> currentUserIndex {0};
150+
};
151+
152+
struct HelpState {
153+
// uint16_t helpRequests {0};
154+
bool stealingRequested {false};
155+
bool stealingAllowed {false};
156+
};
157+
158+
std::vector<ElementType> m_objectPool;
159+
std::vector<CacheAlligned<WriterData>> m_writersData;
160+
std::array<CacheAlligned<std::atomic<HelpState>>, 32> m_helpStates;
161+
std::atomic<std::size_t> d_stolen {0};
162+
// FastRandomGenerator m_randomGenerator;
163+
};
164+
165+
} // namespace ipxp::output

include/ipfixprobe/outputPlugin/outputStorage/b2OutputStorage.hpp

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "fastRandomGenerator.hpp"
77
#include "outputStorage.hpp"
88
#include "spinlock.hpp"
9+
#include "threadUtils.hpp"
910

1011
#include <bit>
1112
#include <cstddef>
@@ -37,28 +38,19 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
3738
const uint16_t containersLeft = writerData.bucketAllocation.containersLeft();
3839
switch (containersLeft) {
3940
case 1: {
40-
/*this->m_allocationBuffer->replace(
41-
this->getNextElement(writerData.bucketAllocation),
42-
container,
43-
writerIndex);*/
4441
this->getNextElement(writerData.bucketAllocation)
4542
.assign(container, this->makeDeallocationCallback(writerIndex));
4643
}
4744
[[fallthrough]];
4845
case 0:
4946
break;
5047
default: {
51-
/*this->m_allocationBuffer->replace(
52-
this->getNextElement(writerData.bucketAllocation),
53-
container,
54-
writerIndex);*/
5548
this->getNextElement(writerData.bucketAllocation)
5649
.assign(container, this->makeDeallocationCallback(writerIndex));
5750
return true;
5851
}
5952
}
6053

61-
// uint8_t loopCounter = 0;
6254
BackoffScheme backoffScheme(2, std::numeric_limits<std::size_t>::max());
6355
do {
6456
const bool overflowed = writerData.randomShift();
@@ -87,7 +79,6 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
8779

8880
this->m_buckets[writerData.writePosition].bucketIndex = writerData.bucketAllocation.reset(
8981
this->m_buckets[writerData.writePosition].bucketIndex);
90-
// std::atomic_thread_fence(std::memory_order_release);
9182

9283
const uint64_t highestReaderGeneration
9384
= this->m_highestReaderGeneration.load(std::memory_order_acquire);
@@ -96,7 +87,6 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
9687
writerData.generation.store(
9788
highestReaderGeneration + BOutputStorage<ElementType>::WINDOW_SIZE,
9889
std::memory_order_release);
99-
// casMax(m_highestWriterGeneration, writerData.generation);
10090
}
10191
this->m_buckets[writerData.writePosition].generation.store(
10292
writerData.generation.load(std::memory_order_acquire),
@@ -120,10 +110,8 @@ class B2OutputStorage : public BOutputStorage<ElementType> {
120110
return &this->getNextElement(readerData.bucketAllocation).getData();
121111
}
122112

123-
// uint8_t loopCounter = 0;
124113
uint64_t cachedGeneration;
125114
uint16_t cachedBucketIndex;
126-
// const uint16_t initialPosition = readerData.readPosition;
127115
BackoffScheme backoffScheme(0, std::numeric_limits<std::size_t>::max());
128116
do {
129117
const bool overflowed = readerData.shift(this->m_expectedReadersCount, readerIndex);

include/ipfixprobe/outputPlugin/outputStorage/fastRandomGenerator.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
#pragma once
22

3+
#include <algorithm>
4+
#include <array>
5+
#include <atomic>
36
#include <cstddef>
47
#include <cstdint>
8+
#include <random>
59

610
namespace ipxp::output {
711

include/ipfixprobe/outputPlugin/outputStorage/outputStorage.hpp

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
#pragma once
22

33
// #include "../../processPlugin/flowRecord.hpp"
4-
#include "allocationBuffer.hpp"
4+
/*#include "allocationBuffer.hpp"
55
#include "allocationBuffer2.hpp"
66
#include "allocationBuffer3.hpp"
7-
#include "allocationBufferBase.hpp"
87
#include "allocationBufferR.hpp"
9-
#include "dummyAllocationBuffer.hpp"
8+
#include "dummyAllocationBuffer.hpp"*/
9+
#include "allocationBufferBase.hpp"
1010
#include "outputContainer.hpp"
1111
#include "referenceCounter.hpp"
1212
#include "spinlock.hpp"
1313

1414
#include <atomic>
1515
#include <condition_variable>
16+
#include <ranges>
1617

1718
#include <boost/container/static_vector.hpp>
1819

@@ -38,10 +39,21 @@ class OutputStorage {
3839
, m_expectedReadersCount(expectedReadersCount)
3940
, m_allocationBuffer(allocationBuffer)
4041
{
42+
if (STORAGE_CAPACITY % expectedWritersCount != 0) {
43+
throw std::runtime_error(
44+
"Storage capacity must be divisible by expected writers count");
45+
}
46+
4147
m_storage.reserve(STORAGE_CAPACITY);
42-
std::generate_n(std::back_inserter(m_storage), STORAGE_CAPACITY, [&]() {
43-
return Reference<OutputContainer<ElementType>>(*m_allocationBuffer->allocate(0));
44-
});
48+
for (const std::size_t writerIndex : std::views::iota(0U, expectedWritersCount)) {
49+
std::generate_n(
50+
std::back_inserter(m_storage),
51+
STORAGE_CAPACITY / expectedWritersCount,
52+
[&]() {
53+
return Reference<OutputContainer<ElementType>>(
54+
*m_allocationBuffer->allocate(writerIndex));
55+
});
56+
}
4557
}
4658

4759
virtual void registerReader([[maybe_unused]] const uint8_t readerIndex) noexcept
@@ -59,13 +71,14 @@ class OutputStorage {
5971
m_writersCount++;
6072
while (m_readersCount.load(std::memory_order_acquire) != m_expectedReadersCount)
6173
;
74+
6275
/*std::unique_lock<std::mutex> lock(m_registrationMutex);
6376
m_writersCount++;
6477
m_registrationCondition.notify_all();
6578
m_registrationCondition.wait(lock, [&]() { return m_readersCount > 0; });*/
6679
}
6780

68-
virtual void unregisterWriter([[maybe_unused]] const uint8_t writerId) noexcept
81+
virtual void unregisterWriter([[maybe_unused]] const uint8_t writerIndex) noexcept
6982
{
7083
m_writersCount--;
7184
}

include/ipfixprobe/outputPlugin/outputStorage/outputStorageRegistrar.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include "allocationBufferS.hpp"
34
#include "outputStorage.hpp"
45
#include "outputStorageReader.hpp"
56
#include "outputStorageReaderGroup.hpp"
@@ -21,10 +22,9 @@ class OutputStorageRegistrar {
2122
m_storages = std::make_shared<std::shared_ptr<OutputStorage<ElementType>>[]>(
2223
OutputStorage<ElementType>::MAX_READER_GROUPS_COUNT);
2324
m_allocationBuffer
24-
= std::make_shared<AllocationBuffer2<ReferenceCounter<OutputContainer<ElementType>>>>(
25+
= std::make_shared<AllocationBufferS<ReferenceCounter<OutputContainer<ElementType>>>>(
2526
OutputStorage<ElementType>::STORAGE_CAPACITY
26-
* OutputStorage<ElementType>::MAX_READER_GROUPS_COUNT
27-
+ writersCount,
27+
* OutputStorage<ElementType>::MAX_READER_GROUPS_COUNT,
2828
writersCount);
2929
}
3030

include/ipfixprobe/outputPlugin/outputStorage/outputStorageWriter.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class OutputStorageWriter {
2121
{
2222
m_currentContainer.getData().storage.clear();
2323
m_currentContainer.getData().readTimes = 0;
24-
m_allocationBuffer->registerWriter();
24+
m_allocationBuffer->registerWriter(writerIndex);
2525
for (std::size_t i = 0; i < OutputStorage<ElementType>::MAX_READER_GROUPS_COUNT; ++i) {
2626
OutputStorage<ElementType>* storage = m_storages[i].get();
2727
if (!storage) {
@@ -40,7 +40,7 @@ class OutputStorageWriter {
4040
}
4141
storage->unregisterWriter(m_writerIndex);
4242
}
43-
m_allocationBuffer->unregisterWriter();
43+
m_allocationBuffer->unregisterWriter(m_writerIndex);
4444
}
4545

4646
void push(ElementType element) noexcept

include/ipfixprobe/outputPlugin/outputStorage/referenceCounter.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// #include "outputContainer.hpp"
44

55
#include "allocationBufferBase.hpp"
6+
#include "cacheAlligned.hpp"
67

78
#include <atomic>
89
#include <cstddef>

0 commit comments

Comments
 (0)