Skip to content

Commit e725164

Browse files
Add flush_threshold/flush_interval logic to OpenTsdbWriter
1 parent f15f99d commit e725164

3 files changed

Lines changed: 38 additions & 4 deletions

File tree

lib/perfdata/opentsdbwriter.cpp

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: GPL-2.0-or-later
33

44
#include "perfdata/opentsdbwriter.hpp"
5+
#include "base/defer.hpp"
56
#include "perfdata/opentsdbwriter-ti.cpp"
67
#include "icinga/service.hpp"
78
#include "icinga/checkcommand.hpp"
@@ -88,6 +89,13 @@ void OpenTsdbWriter::Resume()
8889
<< "Exception during OpenTsdb operation: " << DiagnosticInformation(exp);
8990
});
9091

92+
/* Setup timer for periodically flushing m_DataBuffer */
93+
m_FlushTimer = Timer::Create();
94+
m_FlushTimer->SetInterval(GetFlushInterval());
95+
m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
96+
m_FlushTimer->Start();
97+
m_FlushTimer->Reschedule(0);
98+
9199
ReadConfigTemplate();
92100

93101
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()};
@@ -282,7 +290,9 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
282290
AddMetric(checkable, metric + ".latency", tags, cr->CalculateLatency(), ts);
283291
AddMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
284292

285-
SendMsgBuffer();
293+
if (GetFlushThreshold() <= m_MsgBuf.GetLength()) {
294+
SendMsgBuffer();
295+
}
286296
}
287297
);
288298
}
@@ -387,7 +397,22 @@ void OpenTsdbWriter::AddMetric(const Checkable::Ptr& checkable, const String& me
387397

388398
/* do not send \n to debug log */
389399
msgbuf << "\n";
390-
m_MsgBuf.append(msgbuf.str());
400+
m_MsgBuf += msgbuf.str();
401+
}
402+
403+
/**
404+
* Queues a Flush on the work-queue if none is queued yet.
405+
*/
406+
void OpenTsdbWriter::FlushTimeout()
407+
{
408+
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
409+
return;
410+
}
411+
412+
m_WorkQueue.Enqueue([&]() {
413+
Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }};
414+
SendMsgBuffer();
415+
});
391416
}
392417

393418
void OpenTsdbWriter::SendMsgBuffer()
@@ -398,7 +423,7 @@ void OpenTsdbWriter::SendMsgBuffer()
398423
<< "Flushing data buffer to OpenTsdb.";
399424

400425
try {
401-
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf, std::string{})));
426+
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf.GetData(), {})));
402427
} catch (const PerfdataWriterConnection::Stopped& ex) {
403428
Log(LogDebug, "OpenTsdbWriter") << ex.what();
404429
return;

lib/perfdata/opentsdbwriter.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ class OpenTsdbWriter final : public ObjectImpl<OpenTsdbWriter>
3535

3636
private:
3737
WorkQueue m_WorkQueue{10000000, 1};
38-
std::string m_MsgBuf;
38+
Timer::Ptr m_FlushTimer;
39+
std::atomic_bool m_FlushTimerInQueue{false};
40+
String m_MsgBuf;
3941
PerfdataWriterConnection::Ptr m_Connection;
4042

4143
boost::signals2::connection m_HandleCheckResults;
@@ -46,6 +48,7 @@ class OpenTsdbWriter final : public ObjectImpl<OpenTsdbWriter>
4648
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
4749
void AddMetric(const Checkable::Ptr& checkable, const String& metric,
4850
const std::map<String, String>& tags, double value, double ts);
51+
void FlushTimeout();
4952
void SendMsgBuffer();
5053
void AddPerfdata(const Checkable::Ptr& checkable, const String& metric,
5154
const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts);

lib/perfdata/opentsdbwriter.ti

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ class OpenTsdbWriter : ConfigObject
3131
[config] bool enable_generic_metrics {
3232
default {{{ return false; }}}
3333
};
34+
[config] int flush_interval {
35+
default {{{ return 15; }}}
36+
};
37+
[config] std::size_t flush_threshold {
38+
default {{{ return 2 * 1024 * 1024; }}}
39+
};
3440
[config] double disconnect_timeout {
3541
default {{{ return 10; }}}
3642
};

0 commit comments

Comments
 (0)