Skip to content

Commit 92aa5fd

Browse files
authored
Rebased to latest to resolve conflicts (#6365)
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
1 parent 6ef4e48 commit 92aa5fd

16 files changed

Lines changed: 775 additions & 142 deletions

File tree

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/common/sink/DefaultSinkBuffer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
import java.time.Instant;
1313

1414
public class DefaultSinkBuffer implements SinkBuffer {
15-
private final SinkBufferWriter sinkBufferWriter;
16-
private final long maxEvents;
17-
private final long maxRequestSize;
15+
protected final SinkBufferWriter sinkBufferWriter;
16+
protected final long maxEvents;
17+
protected final long maxRequestSize;
1818
private final long flushIntervalMs;
1919
private long lastFlushedTimeMs;
2020
private long numEvents;

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/common/sink/DefaultSinkOutputStrategy.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ public void flushBuffer() {
3333
long startTime = System.nanoTime();
3434
// getFlushableBuffer() should return the buffer contents
3535
SinkFlushableBuffer flushableBuffer = sinkBuffer.getFlushableBuffer(sinkFlushContext);
36+
if (flushableBuffer == null) {
37+
return;
38+
}
3639
List<Event> events = flushableBuffer.getEvents();
3740
try {
3841
SinkFlushResult flushResult = flushableBuffer.flush();

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/common/sink/SinkBufferWriter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,11 @@
88
public interface SinkBufferWriter {
99
public boolean writeToBuffer(SinkBufferEntry sinkBufferEntry);
1010
public SinkFlushableBuffer getBuffer(final SinkFlushContext sinkFlushContext);
11+
default boolean isMaxEventsLimitReached(final long maxEvents) {
12+
return false;
13+
}
14+
15+
default boolean willExceedMaxRequestSizeBytes(final SinkBufferEntry sinkBufferEntry, final long maxRequestSize) {
16+
return false;
17+
}
1118
}

data-prepper-plugins/prometheus-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/prometheus/PrometheusSinkAMPIT.java

Lines changed: 76 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@
6262

6363
import org.junit.jupiter.api.BeforeEach;
6464
import org.junit.jupiter.api.Test;
65+
import org.junit.jupiter.params.ParameterizedTest;
66+
import org.junit.jupiter.params.provider.ValueSource;
6567
import static org.awaitility.Awaitility.await;
6668
import static org.hamcrest.Matchers.lessThanOrEqualTo;
6769
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -216,6 +218,7 @@ void setUp() {
216218
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(60L);
217219
prometheusSinkConfig = mock(PrometheusSinkConfiguration.class);
218220
when(prometheusSinkConfig.getMaxRetries()).thenReturn(5);
221+
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(0));
219222
when(prometheusSinkConfig.getSanitizeNames()).thenReturn(false);
220223
when(prometheusSinkConfig.getUrl()).thenReturn(remoteWriteUrl);
221224
when(prometheusSinkConfig.getContentType()).thenReturn("application/x-protobuf");
@@ -292,16 +295,23 @@ private void getMetricsFromAMP(final String metricName, final String qs) throws
292295

293296
}
294297

295-
@Test
296-
void TestSumMetrics() throws Exception {
298+
@ParameterizedTest
299+
@ValueSource(ints = {0, 2, 5})
300+
void TestSumMetrics(final int window) throws Exception {
301+
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
302+
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
297303
PrometheusSink sink = createObjectUnderTest();
298304
long startTimeSeconds = testStartTime.getEpochSecond();
299305
Instant time = Instant.now();
300306
Collection<Record<Event>> records = getSumRecordList(NUM_RECORDS, sumMetricName, 0);
301307
sink.doOutput(records);
308+
Thread.sleep(window*1000);
302309

303310
await().atMost(Duration.ofSeconds(60))
304311
.untilAsserted(() -> {
312+
if (window > 0) {
313+
sink.doOutput(Collections.emptyList());
314+
}
305315
metricsInAMP = 0;
306316
Set<Double> expectedMetrics = new HashSet<>();
307317
for (Record record: records) {
@@ -393,18 +403,25 @@ void TestSumMetricsFailuresWithDLQ() throws Exception {
393403
verify(eventHandle, times(NUM_RECORDS)).release(eq(true));
394404
}
395405

396-
@Test
397-
void TestSumMetricsFailuresWithoutDLQ() throws Exception {
406+
@ParameterizedTest
407+
@ValueSource(ints = {0, 2, 5})
408+
void TestSumMetricsFailuresWithoutDLQ(final int window) throws Exception {
409+
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
410+
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
398411
when(thresholdConfig.getMaxEvents()).thenReturn(1);
399412
PrometheusSink sink = createObjectUnderTest();
400413

401414
long startTimeSeconds = testStartTime.getEpochSecond();
402415
Instant time = Instant.now();
403416
Collection<Record<Event>> records = getSumRecordList(NUM_RECORDS-1, sumMetricName, 1);
404417
sink.doOutput(records);
418+
Thread.sleep(window*1000);
405419

406420
await().atMost(Duration.ofSeconds(60))
407421
.untilAsserted(() -> {
422+
if (window > 0) {
423+
sink.doOutput(Collections.emptyList());
424+
}
408425
metricsInAMP = 0;
409426
Set<Double> expectedMetrics = new HashSet<>();
410427
for (Record record: records) {
@@ -467,16 +484,23 @@ private Collection<Record<Event>> getSumRecordList(int numberOfRecords, final St
467484
return records;
468485
}
469486

470-
@Test
471-
void TestGaugeMetrics() throws Exception {
487+
@ParameterizedTest
488+
@ValueSource(ints = {0, 2, 5})
489+
void TestGaugeMetrics(final int window) throws Exception {
490+
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
491+
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
472492

473493
PrometheusSink sink = createObjectUnderTest();
474494
Collection<Record<Event>> records = getGaugeRecordList(NUM_RECORDS);
475495
sink.doOutput(records);
496+
Thread.sleep(window*1000);
476497

477498
long startTimeSeconds = testStartTime.getEpochSecond();
478499
await().atMost(Duration.ofSeconds(60))
479500
.untilAsserted(() -> {
501+
if (window > 0) {
502+
sink.doOutput(Collections.emptyList());
503+
}
480504
metricsInAMP = 0;
481505
long endTimeSeconds = Instant.now().getEpochSecond();
482506
getMetricsFromAMP(gaugeMetricName, "");
@@ -504,18 +528,25 @@ void TestGaugeMetrics() throws Exception {
504528
verify(eventHandle, times(NUM_RECORDS)).release(eq(true));
505529
}
506530

507-
@Test
508-
void TestGaugeMetricsWithMaxRequestSizeLimitAndFlushTimeout() throws Exception {
531+
@ParameterizedTest
532+
@ValueSource(ints = {0, 2, 5})
533+
void TestGaugeMetricsWithMaxRequestSizeLimitAndFlushTimeout(final int window) throws Exception {
534+
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
535+
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
509536

510537
when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(220L);
511538
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(20L);
512539
PrometheusSink sink = createObjectUnderTest();
513540
Collection<Record<Event>> records = getGaugeRecordList(NUM_RECORDS);
514541
sink.doOutput(records);
542+
Thread.sleep(window*1000);
515543

516544
long startTimeSeconds = testStartTime.getEpochSecond();
517545
await().atMost(Duration.ofSeconds(60))
518546
.untilAsserted(() -> {
547+
if (window > 0) {
548+
sink.doOutput(Collections.emptyList());
549+
}
519550
metricsInAMP = 0;
520551
sink.doOutput(Collections.emptyList());
521552
long endTimeSeconds = Instant.now().getEpochSecond();
@@ -569,16 +600,23 @@ private Collection<Record<Event>> getGaugeRecordList(int numberOfRecords) {
569600
return records;
570601
}
571602

572-
@Test
573-
void TestSummaryMetrics() throws Exception {
603+
@ParameterizedTest
604+
@ValueSource(ints = {0, 2, 5})
605+
void TestSummaryMetrics(final int window) throws Exception {
606+
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
607+
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
574608

575609
PrometheusSink sink = createObjectUnderTest();
576610
Collection<Record<Event>> records = getSummaryRecordList(NUM_RECORDS);
577611
sink.doOutput(records);
612+
Thread.sleep(window*1000);
578613

579614
long startTimeSeconds = testStartTime.getEpochSecond();
580615
await().atMost(Duration.ofSeconds(60))
581616
.untilAsserted(() -> {
617+
if (window > 0) {
618+
sink.doOutput(Collections.emptyList());
619+
}
582620
long endTimeSeconds = Instant.now().getEpochSecond()+10;
583621
metricsInAMP = 0;
584622
getMetricsFromAMP(summaryMetricName, "summary");
@@ -674,16 +712,23 @@ private Collection<Record<Event>> getSummaryRecordList(int numberOfRecords) {
674712
return records;
675713
}
676714

677-
@Test
678-
void TestHistogramMetrics() throws Exception {
715+
@ParameterizedTest
716+
@ValueSource(ints = {0, 2, 5})
717+
void TestHistogramMetrics(final int window) throws Exception {
718+
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
719+
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
679720

680721
PrometheusSink sink = createObjectUnderTest();
681722
Collection<Record<Event>> records = getHistogramRecordList(NUM_RECORDS);
682723
sink.doOutput(records);
724+
Thread.sleep(window*1000);
683725

684726
long startTimeSeconds = testStartTime.getEpochSecond();
685727
await().atMost(Duration.ofSeconds(60))
686728
.untilAsserted(() -> {
729+
if (window > 0) {
730+
sink.doOutput(Collections.emptyList());
731+
}
687732
metricsInAMP = 0;
688733
long endTimeSeconds = Instant.now().getEpochSecond()+10;
689734
getMetricsFromAMP(histogramMetricName, "histogram");
@@ -734,16 +779,23 @@ void TestHistogramMetrics() throws Exception {
734779
}
735780

736781

737-
@Test
738-
void TestExponentialHistogramMetrics() throws Exception {
782+
@ParameterizedTest
783+
@ValueSource(ints = {0, 2, 5})
784+
void TestExponentialHistogramMetrics(final int window) throws Exception {
785+
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
786+
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
739787

740788
PrometheusSink sink = createObjectUnderTest();
741789
Collection<Record<Event>> records = getExponentialHistogramRecordList(NUM_RECORDS);
742790
sink.doOutput(records);
791+
Thread.sleep(window*1000);
743792

744793
long startTimeSeconds = testStartTime.getEpochSecond();
745794
await().atMost(Duration.ofSeconds(60))
746795
.untilAsserted(() -> {
796+
if (window > 0) {
797+
sink.doOutput(Collections.emptyList());
798+
}
747799
metricsInAMP = 0;
748800
long endTimeSeconds = Instant.now().getEpochSecond()+10;
749801
getMetricsFromAMP(exponentialHistogramMetricName, "exphistogram");
@@ -797,8 +849,11 @@ void TestExponentialHistogramMetrics() throws Exception {
797849
verify(eventHandle, times(NUM_RECORDS)).release(eq(true));
798850
}
799851

800-
@Test
801-
public void TestMultipleMetrics() throws Exception {
852+
@ParameterizedTest
853+
@ValueSource(ints = {0, 2, 5})
854+
public void TestMultipleMetrics(final int window) throws Exception {
855+
lenient().when(thresholdConfig.getFlushInterval()).thenReturn(6L);
856+
when(prometheusSinkConfig.getOutOfOrderWindow()).thenReturn(Duration.ofSeconds(window));
802857
when(thresholdConfig.getMaxEvents()).thenReturn(1);
803858
long startTimeSeconds = testStartTime.getEpochSecond();
804859
PrometheusSink sink = createObjectUnderTest();
@@ -808,9 +863,13 @@ public void TestMultipleMetrics() throws Exception {
808863
records.addAll(getGaugeRecordList(NUM_RECORDS/5));
809864
records.addAll(getSumRecordList(NUM_RECORDS/5, sumMetricName, 0));
810865
sink.doOutput(records);
866+
Thread.sleep(window*1000);
811867

812868
await().atMost(Duration.ofSeconds(60))
813869
.untilAsserted(() -> {
870+
if (window > 0) {
871+
sink.doOutput(Collections.emptyList());
872+
}
814873

815874
int totalMetrics = 0;
816875
metricsInAMP = 0;
@@ -836,9 +895,8 @@ public void TestMultipleMetrics() throws Exception {
836895
assertThat(metricsInAMP, greaterThanOrEqualTo(1));
837896
totalMetrics += metricsInAMP;
838897

839-
assertThat(totalMetrics, greaterThanOrEqualTo(NUM_RECORDS));
898+
verify(metricsSuccessCounter, times(10)).increment(1);
840899
});
841-
verify(metricsSuccessCounter, times(10)).increment(1);
842900
}
843901

844902
private Collection<Record<Event>> getHistogramRecordList(int numberOfRecords) {

data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/configuration/PrometheusSinkConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class PrometheusSinkConfiguration {
3434
private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(60);
3535
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(60);
3636
private static final Duration DEFAULT_IDLE_TIMEOUT = Duration.ofSeconds(60);
37+
private static final Duration DEFAULT_OUT_OF_ORDER_WINDOW = Duration.ofSeconds(5);
3738

3839
@JsonProperty("aws")
3940
@NotNull
@@ -44,6 +45,9 @@ public class PrometheusSinkConfiguration {
4445
@JsonProperty("url")
4546
private String url;
4647

48+
@JsonProperty("out_of_order_window")
49+
private Duration outOfOrderWindow = DEFAULT_OUT_OF_ORDER_WINDOW;
50+
4751
@JsonProperty("max_retries")
4852
private int maxRetries = DEFAULT_MAX_RETRIES;
4953

@@ -108,6 +112,10 @@ public String getContentType() {
108112
return contentType;
109113
}
110114

115+
public Duration getOutOfOrderWindow() {
116+
return outOfOrderWindow;
117+
}
118+
111119
public String getRemoteWriteVersion() {
112120
return remoteWriteVersion;
113121
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
package org.opensearch.dataprepper.plugins.sink.prometheus.service;
11+
12+
import org.opensearch.dataprepper.common.sink.DefaultSinkBuffer;
13+
import org.opensearch.dataprepper.common.sink.SinkBufferWriter;
14+
import org.opensearch.dataprepper.common.sink.SinkBufferEntry;
15+
16+
public class PrometheusSinkBuffer extends DefaultSinkBuffer {
17+
18+
public PrometheusSinkBuffer(final long maxEvents, final long maxRequestSize,
19+
final long flushIntervalMs, final SinkBufferWriter sinkBufferWriter) {
20+
21+
super(maxEvents, maxRequestSize, flushIntervalMs, sinkBufferWriter);
22+
}
23+
24+
@Override
25+
public boolean isMaxEventsLimitReached() {
26+
return sinkBufferWriter.isMaxEventsLimitReached(maxEvents);
27+
}
28+
29+
@Override
30+
public boolean willExceedMaxRequestSizeBytes(final SinkBufferEntry sinkBufferEntry) {
31+
return sinkBufferWriter.willExceedMaxRequestSizeBytes(sinkBufferEntry, maxRequestSize);
32+
}
33+
34+
}

data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkBufferEntry.java

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,7 @@
1313
import org.opensearch.dataprepper.common.sink.SinkBufferEntry;
1414
import org.opensearch.dataprepper.model.event.Event;
1515
import org.opensearch.dataprepper.model.event.EventType;
16-
import org.opensearch.dataprepper.model.metric.ExponentialHistogram;
17-
import org.opensearch.dataprepper.model.metric.Gauge;
1816
import org.opensearch.dataprepper.model.metric.Metric;
19-
import org.opensearch.dataprepper.model.metric.Histogram;
20-
import org.opensearch.dataprepper.model.metric.Sum;
21-
import org.opensearch.dataprepper.model.metric.Summary;
2217

2318
public class PrometheusSinkBufferEntry implements SinkBufferEntry {
2419

@@ -51,30 +46,7 @@ public Event getEvent() {
5146

5247
private PrometheusTimeSeries getTimeSeriesForEvent(final boolean sanitizeNames) throws Exception {
5348
if (event.getMetadata().getEventType().equals(EventType.METRIC.toString())) {
54-
try {
55-
PrometheusTimeSeries timeSeries = new PrometheusTimeSeries((Metric)event, sanitizeNames);
56-
if (event instanceof Gauge) {
57-
final Gauge gauge = (Gauge) event;
58-
timeSeries.addGaugeMetric(gauge);
59-
} else if (event instanceof Sum) {
60-
final Sum sum = (Sum) event;
61-
timeSeries.addSumMetric(sum);
62-
} else if (event instanceof Summary) {
63-
final Summary summary = (Summary) event;
64-
timeSeries.addSummaryMetric(summary);
65-
} else if (event instanceof Histogram) {
66-
final Histogram histogram = (Histogram) event;
67-
timeSeries.addHistogramMetric(histogram);
68-
} else if (event instanceof ExponentialHistogram) {
69-
final ExponentialHistogram exponentialHistogram = (ExponentialHistogram) event;
70-
timeSeries.addExponentialHistogramMetric(exponentialHistogram);
71-
} else {
72-
throw new RuntimeException("Unknown metric type");
73-
}
74-
return timeSeries;
75-
} catch (Exception e) {
76-
throw e;
77-
}
49+
return new PrometheusTimeSeries((Metric)event, sanitizeNames);
7850
}
7951
throw new RuntimeException("Not metric type");
8052
}

0 commit comments

Comments
 (0)