Skip to content

Commit 78a9990

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
++
1 parent d24f68e commit 78a9990

4 files changed

Lines changed: 91 additions & 11 deletions

File tree

include/ipfixprobe/outputPlugin/outputStorage/backoffScheme.hpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ namespace ipxp::output {
99
class BackoffScheme {
1010
public:
1111
explicit BackoffScheme(
12-
const uint32_t shortWaitThreshold,
13-
const uint32_t longWaitThreshold) noexcept
12+
const std::size_t shortWaitThreshold,
13+
const std::size_t longWaitThreshold) noexcept
1414
: m_shortWaitThreshold(shortWaitThreshold)
15-
, m_longWaitThreshold(shortWaitThreshold + longWaitThreshold)
15+
, m_longWaitThreshold(std::max(shortWaitThreshold + longWaitThreshold, longWaitThreshold))
1616
{
1717
}
1818

@@ -30,9 +30,9 @@ class BackoffScheme {
3030
}
3131

3232
private:
33-
const uint32_t m_shortWaitThreshold;
34-
const uint32_t m_longWaitThreshold;
35-
uint32_t m_waitCounter {0};
33+
const std::size_t m_shortWaitThreshold;
34+
const std::size_t m_longWaitThreshold;
35+
std::size_t m_waitCounter {0};
3636
};
3737

3838
} // namespace ipxp::output
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#pragma once
2+
3+
#include "backoffScheme.hpp"
4+
#include "doubleBufferedValue.hpp"
5+
#include "mqOutputStorage.hpp"
6+
#include "outputStorage.hpp"
7+
#include "rwSpinlock.hpp"
8+
#include "threadUtils.hpp"
9+
10+
#include <algorithm>
11+
#include <atomic>
12+
#include <condition_variable>
13+
#include <functional>
14+
#include <memory>
15+
#include <mutex>
16+
#include <optional>
17+
#include <ranges>
18+
#include <vector>
19+
20+
#include <boost/container/static_vector.hpp>
21+
22+
namespace ipxp::output {
23+
24+
class MQ2OutputStorage : public MQOutputStorage {
25+
public:
26+
explicit MQ2OutputStorage(const uint8_t writersCount) noexcept
27+
: MQOutputStorage(writersCount)
28+
{
29+
std::cout << std::endl;
30+
}
31+
32+
bool storeContainer(ContainerWrapper container, const uint8_t writerId) noexcept override
33+
{
34+
BackoffScheme backoff(3, std::numeric_limits<std::size_t>::max());
35+
while (!m_queues[writerId].tryWrite(
36+
std::move(container),
37+
*m_allocationBuffer,
38+
m_readerGroupsCount,
39+
std::numeric_limits<std::size_t>::max())) {
40+
backoff.backoff();
41+
}
42+
return true;
43+
}
44+
45+
std::optional<ReferenceCounterHandler<OutputContainer>> getContainer(
46+
const std::size_t readerGroupIndex,
47+
const uint8_t localReaderIndex,
48+
const uint8_t globalReaderIndex) noexcept override
49+
{
50+
const size_t tries = m_totalWritersCount / m_readerGroupSizes[readerGroupIndex] + 1;
51+
BackoffScheme backoff(3, 5);
52+
for (const auto _ : std::views::iota(0U, tries)) {
53+
const uint8_t sequenceIndex = m_readersData[globalReaderIndex]->sequenceIndex++;
54+
const uint8_t queueIndex = m_readersData[globalReaderIndex]
55+
->queueJumpSequence[sequenceIndex % MAX_WRITERS_COUNT];
56+
ContainerWrapper* container = m_queues[queueIndex].tryRead(readerGroupIndex);
57+
if (container != nullptr) {
58+
return std::make_optional<ReferenceCounterHandler<OutputContainer>>(
59+
getReferenceCounter(*container));
60+
}
61+
// std::this_thread::yield();
62+
backoff.backoff();
63+
}
64+
return std::nullopt;
65+
}
66+
67+
bool finished(const std::size_t readerGroupIndex) noexcept override
68+
{
69+
return m_readerGroupSizes[readerGroupIndex] > m_totalWritersCount
70+
|| (!writersPresent() && std::ranges::all_of(m_queues, [&](const Queue& queue) {
71+
return queue.finished();
72+
}));
73+
}
74+
75+
private:
76+
};
77+
78+
} // namespace ipxp::output

include/ipfixprobe/outputPlugin/outputStorage/mqOutputStorage.hpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class MQOutputStorage : public OutputStorage {
102102
// WriteLockGuard lockGuard(m_locks[writeQueueIndex]);
103103

104104
if (!m_queues[writerId]
105-
.tryWrite(std::move(container), *m_allocationBuffer, m_readerGroupsCount)) {
105+
.tryWrite(std::move(container), *m_allocationBuffer, m_readerGroupsCount, 3)) {
106106
return false;
107107
}
108108
return true;
@@ -140,7 +140,7 @@ class MQOutputStorage : public OutputStorage {
140140
}));
141141
}
142142

143-
private:
143+
protected:
144144
struct QueueIndex {
145145
const uint16_t initialOffset;
146146
uint8_t loops;
@@ -169,12 +169,13 @@ class MQOutputStorage : public OutputStorage {
169169
bool tryWrite(
170170
ContainerWrapper container,
171171
AllocationBufferBase<ReferenceCounter<OutputContainer>>& origin,
172-
const uint8_t readerGroupCount) noexcept
172+
const uint8_t readerGroupCount,
173+
const std::size_t longBackoffTries) noexcept
173174
{
174175
State* currentState = &m_stateBuffer.getCurrentValue();
175176

176177
if (currentState->written == currentState->writeBuffer.size()) {
177-
BackoffScheme backoff(7, 5);
178+
BackoffScheme backoff(7, longBackoffTries);
178179
while (!allReadersFinished()) {
179180
if (!backoff.backoff()) {
180181
container.deallocate(origin);

tests/performance/outputStorage/testOutputStorage.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <outputStorage/lfnbOutputStorage.hpp>
1616
#include <outputStorage/mc2OutputStorage.hpp>
1717
#include <outputStorage/mcOutputStorage.hpp>
18+
#include <outputStorage/mq2OutputStorage.hpp>
1819
#include <outputStorage/mqOutputStorage.hpp>
1920
#include <outputStorage/ringOutputStorage.hpp>
2021
#include <outputStorage/serializedOutputStorage.hpp>
@@ -224,7 +225,7 @@ TEST(TestOutputStorage, Debug)
224225
for (const auto testIndex : std::views::iota(0, 100)) {
225226
std::cout << " Debug Loop Iteration " << testIndex << "\n";
226227
// makePerformanceTest<ipxp::output::MCOutputStorage>("MCOutputStorage");
227-
stressTest<ipxp::output::MC2OutputStorage>(false);
228+
stressTest<ipxp::output::MQ2OutputStorage>(false);
228229
}
229230
}
230231

0 commit comments

Comments
 (0)