Skip to content

Commit e09ccfc

Browse files
Add flush_threshold/flush_interval logic to GraphiteWriter
1 parent cf699b2 commit e09ccfc

4 files changed

Lines changed: 61 additions & 15 deletions

File tree

doc/09-object-types.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1350,6 +1350,8 @@ Configuration Attributes:
13501350
service\_name\_template | String | **Optional.** Metric prefix for service name. Defaults to `icinga2.$host.name$.services.$service.name$.$service.check_command$`.
13511351
enable\_send\_thresholds | Boolean | **Optional.** Send additional threshold metrics. Defaults to `false`.
13521352
enable\_send\_metadata | Boolean | **Optional.** Send additional metadata metrics. Defaults to `false`.
1353+
flush\_interval | Duration | **Optional.** How long to buffer data points before sending. Defaults to `15s`.
1354+
flush\_threshold | Number | **Optional.** How many bytes to buffer before forcing a flush to the backend. Defaults to `2MiB`.
13531355
enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
13541356

13551357
Additional usage examples can be found [here](14-features.md#graphite-carbon-cache-writer).

lib/perfdata/graphitewriter.cpp

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: GPL-2.0-or-later
33

44
#include "perfdata/graphitewriter.hpp"
5+
#include "base/defer.hpp"
56
#include "perfdata/graphitewriter-ti.cpp"
67
#include "icinga/service.hpp"
78
#include "icinga/checkcommand.hpp"
@@ -85,6 +86,13 @@ void GraphiteWriter::Resume()
8586
/* Register exception handler for WQ tasks. */
8687
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
8788

89+
/* Setup timer for periodically flushing m_DataBuffer */
90+
m_FlushTimer = Timer::Create();
91+
m_FlushTimer->SetInterval(GetFlushInterval());
92+
m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { FlushTimeout(); });
93+
m_FlushTimer->Start();
94+
m_FlushTimer->Reschedule(0);
95+
8896
m_Connection = new PerfdataWriterConnection{this, GetHost(), GetPort()};
8997

9098
/* Register event handlers. */
@@ -101,6 +109,8 @@ void GraphiteWriter::Pause()
101109
{
102110
m_HandleCheckResults.disconnect();
103111

112+
m_FlushTimer->Stop(true);
113+
104114
std::promise<void> queueDonePromise;
105115

106116
m_WorkQueue.Enqueue([&]() {
@@ -199,10 +209,10 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
199209
CONTEXT("Processing check result for '" << checkable->GetName() << "'");
200210

201211
for (auto& [name, val] : metadata) {
202-
SendMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd());
212+
AddMetric(checkable, prefix + ".metadata", name, val, cr->GetExecutionEnd());
203213
}
204214

205-
SendPerfdata(checkable, prefix + ".perfdata", cr);
215+
AddPerfdata(checkable, prefix + ".perfdata", cr);
206216
});
207217
}
208218

@@ -213,7 +223,7 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
213223
* @param prefix Metric prefix string
214224
* @param cr Check result including performance data
215225
*/
216-
void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr)
226+
void GraphiteWriter::AddPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr)
217227
{
218228
AssertOnWorkQueue();
219229

@@ -245,17 +255,17 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
245255
String escapedKey = EscapeMetricLabel(pdv->GetLabel());
246256
double ts = cr->GetExecutionEnd();
247257

248-
SendMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts);
258+
AddMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts);
249259

250260
if (GetEnableSendThresholds()) {
251261
if (!pdv->GetCrit().IsEmpty())
252-
SendMetric(checkable, prefix, escapedKey + ".crit", pdv->GetCrit(), ts);
262+
AddMetric(checkable, prefix, escapedKey + ".crit", pdv->GetCrit(), ts);
253263
if (!pdv->GetWarn().IsEmpty())
254-
SendMetric(checkable, prefix, escapedKey + ".warn", pdv->GetWarn(), ts);
264+
AddMetric(checkable, prefix, escapedKey + ".warn", pdv->GetWarn(), ts);
255265
if (!pdv->GetMin().IsEmpty())
256-
SendMetric(checkable, prefix, escapedKey + ".min", pdv->GetMin(), ts);
266+
AddMetric(checkable, prefix, escapedKey + ".min", pdv->GetMin(), ts);
257267
if (!pdv->GetMax().IsEmpty())
258-
SendMetric(checkable, prefix, escapedKey + ".max", pdv->GetMax(), ts);
268+
AddMetric(checkable, prefix, escapedKey + ".max", pdv->GetMax(), ts);
259269
}
260270
}
261271
}
@@ -269,7 +279,7 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
269279
* @param value Metric value
270280
* @param ts Timestamp when the check result was created
271281
*/
272-
void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
282+
void GraphiteWriter::AddMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
273283
{
274284
AssertOnWorkQueue();
275285

@@ -284,11 +294,34 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
284294
// do not send \n to debug log
285295
msgbuf << "\n";
286296

297+
m_MsgBuf += std::move(msgbuf).str();
298+
299+
if (GetFlushThreshold() <= m_MsgBuf.GetLength()) {
300+
Flush();
301+
}
302+
}
303+
304+
/**
305+
* Queues a Flush on the work-queue if none is queued yet.
306+
*/
307+
void GraphiteWriter::FlushTimeout()
308+
{
309+
if (m_FlushTimerInQueue.exchange(true, std::memory_order_relaxed)) {
310+
return;
311+
}
312+
313+
m_WorkQueue.Enqueue([&]() {
314+
Defer resetFlushTimer{[&]() { m_FlushTimerInQueue.store(false, std::memory_order_relaxed); }};
315+
Flush();
316+
});
317+
}
318+
319+
void GraphiteWriter::Flush()
320+
{
287321
try {
288-
m_Connection->Send(asio::buffer(msgbuf.str()));
322+
m_Connection->Send(boost::asio::buffer(std::exchange(m_MsgBuf.GetData(), {})));
289323
} catch (const PerfdataWriterConnection::Stopped& ex) {
290324
Log(LogDebug, "GraphiteWriter") << ex.what();
291-
return;
292325
}
293326
}
294327

lib/perfdata/graphitewriter.hpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,18 @@ class GraphiteWriter final : public ObjectImpl<GraphiteWriter>
3636

3737
private:
3838
PerfdataWriterConnection::Ptr m_Connection;
39+
Timer::Ptr m_FlushTimer;
40+
std::atomic_bool m_FlushTimerInQueue{false};
41+
String m_MsgBuf;
3942
WorkQueue m_WorkQueue{10000000, 1};
4043

4144
boost::signals2::connection m_HandleCheckResults;
4245

4346
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
44-
void SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts);
45-
void SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr);
47+
void AddMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts);
48+
void AddPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr);
49+
void FlushTimeout();
50+
void Flush();
4651
static String EscapeMetric(const String& str);
4752
static String EscapeMetricLabel(const String& str);
4853
static Value EscapeMacroMetric(const Value& value);

lib/perfdata/graphitewriter.ti

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,14 @@ class GraphiteWriter : ConfigObject
2424
[config] String service_name_template {
2525
default {{{ return "icinga2.$host.name$.services.$service.name$.$service.check_command$"; }}}
2626
};
27-
[config] bool enable_send_thresholds;
28-
[config] bool enable_send_metadata;
27+
[config] bool enable_send_thresholds;
28+
[config] bool enable_send_metadata;
29+
[config] int flush_interval {
30+
default {{{ return 15; }}}
31+
};
32+
[config] std::size_t flush_threshold {
33+
default {{{ return 2 * 1024 * 1024; }}}
34+
};
2935

3036
[config] double disconnect_timeout {
3137
default {{{ return 10; }}}

0 commit comments

Comments
 (0)