Skip to content

Commit f15f99d

Browse files
Add flush_threshold/flush_interval logic to GelfWriter
1 parent 1806bbb commit f15f99d

4 files changed

Lines changed: 49 additions & 9 deletions

File tree

doc/09-object-types.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1317,6 +1317,8 @@ Configuration Attributes:
13171317
diconnect\_timeout | Duration | **Optional.** Timeout to wait for any outstanding data to be flushed to GELF before disconnecting. Defaults to `10s`.
13181318
source | String | **Optional.** Source name for this instance. Defaults to `icinga2`.
13191319
enable\_send\_perfdata | Boolean | **Optional.** Enable performance data for 'CHECK RESULT' events.
1320+
flush\_interval | Duration | **Optional.** How long to buffer data points before sending. Defaults to `15s`.
1321+
flush\_threshold | Number | **Optional.** How many bytes to buffer
13201322
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
13211323
enable\_tls | Boolean | **Optional.** Whether to use a TLS stream. Defaults to `false`.
13221324
insecure\_noverify | Boolean | **Optional.** Disable TLS peer verification.

lib/perfdata/gelfwriter.cpp

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: GPL-2.0-or-later
33

44
#include "perfdata/gelfwriter.hpp"
5+
#include "base/defer.hpp"
56
#include "perfdata/gelfwriter-ti.cpp"
67
#include "icinga/service.hpp"
78
#include "icinga/notification.hpp"
@@ -90,6 +91,13 @@ void GelfWriter::Resume()
9091
/* Register exception handler for WQ tasks. */
9192
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
9293

94+
/* Setup timer for periodically flushing m_DataBuffer */
95+
m_FlushTimer = Timer::Create();
96+
m_FlushTimer->SetInterval(GetFlushInterval());
97+
m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
98+
m_FlushTimer->Start();
99+
m_FlushTimer->Reschedule(0);
100+
93101
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort(), m_SslContext, !GetInsecureNoverify()};
94102

95103
/* Register event handlers. */
@@ -360,19 +368,38 @@ void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& g
360368
{
361369
AssertOnWorkQueue();
362370

363-
std::ostringstream msgbuf;
364-
msgbuf << gelfMessage;
365-
msgbuf << '\0';
371+
Log(LogDebug, "GelfWriter")
372+
<< "Checkable '" << checkable->GetName() << "' sending message '" << gelfMessage << "'.";
366373

367-
auto log = msgbuf.str();
374+
m_MsgBuf.GetData().reserve(m_MsgBuf.GetLength() + gelfMessage.GetLength() + 1);
375+
m_MsgBuf += gelfMessage;
376+
m_MsgBuf += '\0';
368377

369-
try {
370-
Log(LogDebug, "GelfWriter")
371-
<< "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
378+
if (GetFlushThreshold() <= m_MsgBuf.GetLength()) {
379+
Flush();
380+
}
381+
}
382+
383+
/**
384+
* Queues a Flush on the work-queue if none is queued yet.
385+
*/
386+
void GelfWriter::FlushTimeout()
387+
{
388+
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
389+
return;
390+
}
391+
392+
m_WorkQueue.Enqueue([&]() {
393+
Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }};
394+
Flush();
395+
});
396+
}
372397

373-
m_Connection->Send(boost::asio::const_buffer{log.data(), log.length()});
398+
void GelfWriter::Flush()
399+
{
400+
try {
401+
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf.GetData(), {})));
374402
} catch (const PerfdataWriterConnection::Stopped& ex) {
375403
Log(LogDebug, "GelfWriter") << ex.what();
376-
return;
377404
}
378405
}

lib/perfdata/gelfwriter.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ class GelfWriter final : public ObjectImpl<GelfWriter>
3434

3535
private:
3636
PerfdataWriterConnection::Ptr m_Connection;
37+
Timer::Ptr m_FlushTimer;
38+
std::atomic_bool m_FlushTimerInQueue{false};
39+
String m_MsgBuf;
3740
WorkQueue m_WorkQueue{10000000, 1};
3841
Shared<boost::asio::ssl::context>::Ptr m_SslContext;
3942

@@ -46,6 +49,8 @@ class GelfWriter final : public ObjectImpl<GelfWriter>
4649

4750
String ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts);
4851
void SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage);
52+
void FlushTimeout();
53+
void Flush();
4954

5055
void AssertOnWorkQueue();
5156

lib/perfdata/gelfwriter.ti

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ class GelfWriter : ConfigObject
2424
[config] bool enable_send_perfdata {
2525
default {{{ return false; }}}
2626
};
27+
[config] int flush_interval {
28+
default {{{ return 15; }}}
29+
};
30+
[config] std::size_t flush_threshold {
31+
default {{{ return 2 * 1024 * 1024; }}}
32+
};
2733

2834
[config] double disconnect_timeout {
2935
default {{{ return 10; }}}

0 commit comments

Comments
 (0)