Skip to content

Commit 879dd9f

Browse files
kkondakasimonelbaz
authored andcommitted
Fixed PrometheusSinkBufferWriter getBuffer() to return non-duplicate and sorted time series (opensearch-project#6358)
* Fixed PrometheusSinkBufferWriter getBuffer() to return non-duplicate and sorted timeseries Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Fixed CheckStyle error Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
1 parent 546294b commit 879dd9f

3 files changed

Lines changed: 95 additions & 22 deletions

File tree

data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkBufferWriter.java

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,45 @@
1515
import org.opensearch.dataprepper.common.sink.SinkFlushContext;
1616
import org.opensearch.dataprepper.common.sink.SinkMetrics;
1717

18-
import java.util.ArrayList;
18+
import java.util.HashMap;
1919
import java.util.List;
20+
import java.util.Map;
21+
import java.util.stream.Collectors;
2022

2123
public class PrometheusSinkBufferWriter implements SinkBufferWriter {
22-
23-
private final List<SinkBufferEntry> buffer;
24+
25+
// Each buffer entry is a metric.
26+
// Duplicate entries for the same metric at the same time is not allowed
27+
// If there are multiple entries in the buffer for the same metric (with different time stamps),
28+
// They must be sorted by time before sending to Prometheus
29+
private final Map<String, Map<Long, SinkBufferEntry>> buffer;
2430
private final SinkMetrics sinkMetrics;
2531

2632
public PrometheusSinkBufferWriter(SinkMetrics sinkMetrics) {
27-
buffer = new ArrayList<>();
33+
this.buffer = new HashMap<>();
2834
this.sinkMetrics = sinkMetrics;
2935
}
3036

3137
public boolean writeToBuffer(SinkBufferEntry bufferEntry) {
32-
buffer.add(bufferEntry);
38+
PrometheusTimeSeries timeSeries = ((PrometheusSinkBufferEntry)bufferEntry).getTimeSeries();
39+
if (timeSeries == null) {
40+
return false;
41+
}
42+
43+
buffer.computeIfAbsent(timeSeries.getMetricName(), k -> new HashMap<>())
44+
.put(timeSeries.getTimeStamp(), bufferEntry);
3345
return true;
3446
}
3547

48+
@Override
3649
public SinkFlushableBuffer getBuffer(final SinkFlushContext sinkFlushContext) {
37-
return new PrometheusSinkFlushableBuffer(buffer, sinkMetrics, sinkFlushContext);
50+
List<SinkBufferEntry> bufferList = buffer.values().stream()
51+
.flatMap(timeSeriesMap -> timeSeriesMap.entrySet().stream()
52+
.sorted(Map.Entry.comparingByKey())
53+
.map(Map.Entry::getValue))
54+
.collect(Collectors.toList());
55+
56+
buffer.clear();
57+
return new PrometheusSinkFlushableBuffer(bufferList, sinkMetrics, sinkFlushContext);
3858
}
39-
4059
}
41-
42-
43-

data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusTimeSeries.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,14 @@ public PrometheusTimeSeries(Metric metric, final boolean sanitizeNames) throws E
102102
processResourceAndScopeAttributes(metric);
103103
}
104104

105+
public long getTimeStamp() {
106+
return timestamp;
107+
}
108+
109+
public String getMetricName() {
110+
return metricName;
111+
}
112+
105113
private void processAttributes(Map<String, Object> attributesMap, String prefix) {
106114
if (attributesMap == null) return;
107115

data-prepper-plugins/prometheus-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkBufferWriterTest.java

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@
1212

1313
import org.opensearch.dataprepper.model.metric.JacksonGauge;
1414
import org.opensearch.dataprepper.common.sink.SinkMetrics;
15+
import org.opensearch.dataprepper.model.metric.Gauge;
16+
import org.opensearch.dataprepper.model.event.Event;
1517

1618
import org.mockito.Mock;
1719
import static org.mockito.Mockito.mock;
1820
import org.junit.jupiter.api.Test;
1921
import org.junit.jupiter.api.BeforeEach;
22+
import static org.junit.jupiter.api.Assertions.assertTrue;
23+
import static org.hamcrest.Matchers.equalTo;
2024
import static org.hamcrest.CoreMatchers.sameInstance;
2125
import static org.hamcrest.MatcherAssert.assertThat;
2226

@@ -28,17 +32,17 @@ public class PrometheusSinkBufferWriterTest {
2832
@Mock
2933
private PrometheusSinkFlushContext sinkFlushContext;
3034

31-
private JacksonGauge gauge;
35+
private JacksonGauge gauge1;
3236
private PrometheusSinkBufferEntry prometheusSinkBufferEntry;
3337
private PrometheusSinkBufferWriter prometheusSinkBufferWriter;
34-
35-
38+
39+
3640
@BeforeEach
3741
void setUp() throws Exception {
3842
sinkMetrics = mock(SinkMetrics.class);
3943
sinkFlushContext = mock(PrometheusSinkFlushContext.class);
40-
gauge = createGaugeMetric();
41-
prometheusSinkBufferEntry = new PrometheusSinkBufferEntry(gauge, true);
44+
gauge1 = createGaugeMetric("gauge1", Instant.now(), 1.0d);
45+
prometheusSinkBufferEntry = new PrometheusSinkBufferEntry(gauge1, true);
4246
}
4347

4448
PrometheusSinkBufferWriter createObjectUnderTest() {
@@ -50,18 +54,63 @@ public void testPrometheusSinkBufferWriter() throws Exception {
5054
prometheusSinkBufferWriter = createObjectUnderTest();
5155
prometheusSinkBufferWriter.writeToBuffer(prometheusSinkBufferEntry);
5256
PrometheusSinkFlushableBuffer prometheusSinkFlushableBuffer = (PrometheusSinkFlushableBuffer)prometheusSinkBufferWriter.getBuffer(sinkFlushContext);
53-
assertThat(prometheusSinkFlushableBuffer.getEvents().get(0), sameInstance(gauge));
57+
assertThat(prometheusSinkFlushableBuffer.getEvents().get(0), sameInstance(gauge1));
58+
}
59+
60+
@Test
61+
public void testPrometheusSinkBufferWriterWithDuplicateTimeEntries() throws Exception {
62+
prometheusSinkBufferWriter = createObjectUnderTest();
63+
Instant t2 = Instant.now().plusSeconds(5);
64+
// Same metric with same name but different value, only most recent one is kept
65+
Gauge gauge2 = createGaugeMetric("gauge2", t2, 10.0d);
66+
Gauge gauge3 = createGaugeMetric("gauge2", t2, 20.0d);
67+
PrometheusSinkBufferEntry entry2 = new PrometheusSinkBufferEntry(gauge2, true);
68+
PrometheusSinkBufferEntry entry3 = new PrometheusSinkBufferEntry(gauge3, true);
69+
prometheusSinkBufferWriter.writeToBuffer(prometheusSinkBufferEntry);
70+
prometheusSinkBufferWriter.writeToBuffer(entry2);
71+
prometheusSinkBufferWriter.writeToBuffer(entry3);
72+
PrometheusSinkFlushContext sinkFlushContext = mock(PrometheusSinkFlushContext.class);
73+
PrometheusSinkFlushableBuffer prometheusSinkFlushableBuffer = (PrometheusSinkFlushableBuffer)prometheusSinkBufferWriter.getBuffer(sinkFlushContext);
74+
assertThat(prometheusSinkFlushableBuffer.getEvents().size(), equalTo(2));
75+
Event ev1 = prometheusSinkFlushableBuffer.getEvents().get(0);
76+
Event ev2 = prometheusSinkFlushableBuffer.getEvents().get(1);
77+
assertTrue(ev1 == gauge1 || ev1 == gauge3);
78+
assertTrue(ev2 == gauge1 || ev2 == gauge3);
5479
}
5580

56-
private JacksonGauge createGaugeMetric() {
81+
@Test
82+
public void testPrometheusSinkBufferWriterWithOutOfOrderEntries() throws Exception {
83+
prometheusSinkBufferWriter = createObjectUnderTest();
84+
// Same metric with same name but different value and different times but out of order times
85+
// Expected result is sorted by time
86+
Instant t2 = Instant.now().plusSeconds(50);
87+
Gauge gauge2 = createGaugeMetric("gauge1", t2, 10.0d);
88+
Instant t3 = t2.minusSeconds(150);
89+
Gauge gauge3 = createGaugeMetric("gauge1", t3, 20.0d);
90+
PrometheusSinkBufferEntry entry2 = new PrometheusSinkBufferEntry(gauge2, true);
91+
PrometheusSinkBufferEntry entry3 = new PrometheusSinkBufferEntry(gauge3, true);
92+
prometheusSinkBufferWriter.writeToBuffer(prometheusSinkBufferEntry);
93+
prometheusSinkBufferWriter.writeToBuffer(entry2);
94+
prometheusSinkBufferWriter.writeToBuffer(entry3);
95+
PrometheusSinkFlushContext sinkFlushContext = mock(PrometheusSinkFlushContext.class);
96+
PrometheusSinkFlushableBuffer prometheusSinkFlushableBuffer = (PrometheusSinkFlushableBuffer)prometheusSinkBufferWriter.getBuffer(sinkFlushContext);
97+
assertThat(prometheusSinkFlushableBuffer.getEvents().size(), equalTo(3));
98+
assertThat(prometheusSinkFlushableBuffer.getEvents().get(0), sameInstance(gauge3));
99+
assertThat(prometheusSinkFlushableBuffer.getEvents().get(1), sameInstance(gauge1));
100+
assertThat(prometheusSinkFlushableBuffer.getEvents().get(2), sameInstance(gauge2));
101+
102+
}
103+
104+
105+
private JacksonGauge createGaugeMetric(final String name, final Instant time, final double value) {
57106
return JacksonGauge.builder()
58-
.withName("gauge")
107+
.withName(name)
59108
.withDescription("Test Gauge Metric")
60-
.withTimeReceived(Instant.now())
61-
.withTime(Instant.now().plusSeconds(10).toString())
62-
.withStartTime(Instant.now().plusSeconds(5).toString())
109+
.withTimeReceived(time)
110+
.withTime(time.plusSeconds(10).toString())
111+
.withStartTime(time.plusSeconds(5).toString())
63112
.withUnit("1")
64-
.withValue(1.0d)
113+
.withValue(value)
65114
.build(false);
66115
}
67116

0 commit comments

Comments
 (0)