Skip to content

Commit 3eaf52a

Browse files
authored
Add Prometheus Sink (#6229)
* Add Prometheus Sink Signed-off-by: Kondaka <krishkdk@amazon.com> * Remove debug statements Signed-off-by: Kondaka <krishkdk@amazon.com> * Make Prometheus sink Experimental Signed-off-by: Kondaka <krishkdk@amazon.com> * Add sanitize_names config option to sanitize metric/label names Signed-off-by: Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Kondaka <krishkdk@amazon.com> * Cleaned up HTTP sender and Sigv4Signer Signed-off-by: Kondaka <krishkdk@amazon.com> * Added check for https in valid config Signed-off-by: Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent d2184d8 commit 3eaf52a

53 files changed

Lines changed: 2845 additions & 2176 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/CompressionEngine.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,7 @@
1010

1111
public interface CompressionEngine {
1212
OutputStream createOutputStream(OutputStream outputStream) throws IOException;
13+
default byte[] compress(byte[] payload) throws IOException {
14+
throw new RuntimeException("Unsupported Operation");
15+
}
1316
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.codec;
7+
8+
import org.junit.jupiter.api.Test;
9+
import static org.mockito.Mockito.mock;
10+
import org.mockito.invocation.InvocationOnMock;
11+
import static org.junit.jupiter.api.Assertions.assertThrows;
12+
13+
import java.nio.charset.StandardCharsets;
14+
15+
public class CompressionEngineTest {
16+
@Test
17+
void defaultCompressionEngineTest() {
18+
CompressionEngine compressionEngine = mock(CompressionEngine.class, InvocationOnMock::callRealMethod);
19+
byte[] bytes = "test".getBytes(StandardCharsets.UTF_8);
20+
assertThrows(RuntimeException.class, () -> compressionEngine.compress(bytes));
21+
}
22+
}

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/common/sink/DefaultSinkBuffer.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1-
/*
2-
* Copyright OpenSearch Contributors
3-
* SPDX-License-Identifier: Apache-2.0
4-
*/
5-
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
610
package org.opensearch.dataprepper.common.sink;
711

812
import java.time.Instant;
@@ -45,7 +49,7 @@ public boolean isMaxEventsLimitReached() {
4549

4650
@Override
4751
public boolean exceedsFlushTimeInterval() {
48-
long curTime = Instant.now().toEpochMilli();
52+
long curTime = System.currentTimeMillis();
4953
return (curTime - lastFlushedTimeMs >= flushIntervalMs);
5054
}
5155

@@ -56,6 +60,9 @@ public boolean willExceedMaxRequestSizeBytes(final SinkBufferEntry sinkBufferEnt
5660

5761
@Override
5862
public SinkFlushableBuffer getFlushableBuffer(final SinkFlushContext sinkFlushContext) {
63+
numEvents = 0;
64+
currentRequestSize = 0L;
65+
lastFlushedTimeMs = System.currentTimeMillis();
5966
return sinkBufferWriter.getBuffer(sinkFlushContext);
6067
}
6168

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/common/sink/DefaultSinkFlushResult.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
public class DefaultSinkFlushResult implements SinkFlushResult {
1313
private final List<Event> events;
1414
private final Throwable exception;
15+
private final int statusCode;
1516

16-
public DefaultSinkFlushResult(final List<Event> events, final Throwable exception) {
17+
public DefaultSinkFlushResult(final List<Event> events, final int statusCode, final Throwable exception) {
1718
this.events = events;
1819
this.exception = exception;
20+
this.statusCode = statusCode;
1921
}
2022

2123
public List<Event> getEvents() {
@@ -25,5 +27,9 @@ public List<Event> getEvents() {
2527
public Throwable getException() {
2628
return exception;
2729
}
30+
31+
public int getStatusCode() {
32+
return statusCode;
33+
}
2834
}
2935

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/common/sink/DefaultSinkOutputStrategy.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,26 @@ public DefaultSinkOutputStrategy(final LockStrategy lockStrategy, final SinkBuff
3131

3232
public void flushBuffer() {
3333
long startTime = System.nanoTime();
34-
// getBuffer() should return the buffer contents
34+
// getFlushableBuffer() should return the buffer contents
3535
SinkFlushableBuffer flushableBuffer = sinkBuffer.getFlushableBuffer(sinkFlushContext);
36+
List<Event> events = flushableBuffer.getEvents();
3637
try {
3738
SinkFlushResult flushResult = flushableBuffer.flush();
3839
if (flushResult == null) { // success
3940
sinkMetrics.recordRequestLatency((double)(System.nanoTime() - startTime));
40-
List<Event> events = flushableBuffer.getEvents();
4141
for (final Event event: events) {
4242
event.getEventHandle().release(true);
4343
}
4444
} else {
4545
// flush Result should contain the events that are
4646
// failed to be delivered, so that these events can be forwarded to DLQ
47-
addFailedEventsToDlq(flushResult.getEvents(), flushResult.getException());
47+
addFailedEventsToDlq(flushResult.getEvents(), flushResult.getException(), flushResult.getStatusCode());
4848
}
4949
} catch (Exception e) {
5050
// Add list of events to DLQ
51-
addFailedEventsToDlq(flushableBuffer.getEvents(), e);
51+
sinkMetrics.incrementRequestsFailedCounter(1);
52+
sinkMetrics.incrementEventsFailedCounter(events.size());
53+
addFailedEventsToDlq(events, e, 0);
5254
}
5355
}
5456

@@ -91,7 +93,7 @@ public void execute(Collection<Record<Event>> records) {
9193
}
9294
} catch (Exception ex) {
9395
LOG.warn(NOISY, "Failed process the event ", ex);
94-
addFailedEventsToDlq(List.of(event), ex);
96+
addFailedEventsToDlq(List.of(event), ex, 0);
9597
}
9698
}
9799
} finally {

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/common/sink/SinkDlqHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@
1111

1212
public interface SinkDlqHandler {
1313
void flushDlqList();
14-
void addFailedEventsToDlq(final List<Event> events, final Throwable ex);
14+
void addFailedEventsToDlq(final List<Event> events, final Throwable ex, final int statusCode);
1515

1616
}

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/common/sink/SinkFlushResult.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@
1212
public interface SinkFlushResult {
1313
List<Event> getEvents();
1414
Throwable getException();
15+
int getStatusCode();
1516
}

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/codec/SnappyCompressionEngine.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.opensearch.dataprepper.model.codec.CompressionEngine;
99
import org.xerial.snappy.SnappyOutputStream;
10+
import org.xerial.snappy.Snappy;
1011

1112
import java.io.IOException;
1213
import java.io.OutputStream;
@@ -16,4 +17,9 @@ public class SnappyCompressionEngine implements CompressionEngine {
1617
public OutputStream createOutputStream(final OutputStream outputStream) throws IOException {
1718
return new SnappyOutputStream(outputStream);
1819
}
20+
21+
@Override
22+
public byte[] compress(byte[] payload) throws IOException {
23+
return Snappy.compress(payload);
24+
}
1925
}

data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/common/sink/DefaultSinkFlushResultTest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
import org.mockito.Mock;
1313
import static org.hamcrest.CoreMatchers.sameInstance;
1414
import static org.hamcrest.MatcherAssert.assertThat;
15+
import static org.hamcrest.CoreMatchers.equalTo;
1516

1617
import java.util.List;
18+
import java.util.Random;
1719

1820
public class DefaultSinkFlushResultTest {
1921
private List<Event> events;
@@ -22,16 +24,21 @@ public class DefaultSinkFlushResultTest {
2224

2325
private DefaultSinkFlushResult defaultSinkFlushResult;
2426

27+
private int statusCode;
28+
2529
private DefaultSinkFlushResult createObjectUnderTest() {
26-
return new DefaultSinkFlushResult(events, exception);
30+
return new DefaultSinkFlushResult(events, statusCode, exception);
2731
}
2832

2933
@Test
3034
public void test_basic() {
3135
exception = mock(Throwable.class);
3236
events = List.of();
37+
Random random = new Random();
38+
statusCode = random.nextInt(500);
3339
defaultSinkFlushResult = createObjectUnderTest();
3440
assertThat(defaultSinkFlushResult.getEvents(), sameInstance(events));
3541
assertThat(defaultSinkFlushResult.getException(), sameInstance(exception));
42+
assertThat(defaultSinkFlushResult.getStatusCode(), equalTo(statusCode));
3643
}
3744
}

data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/common/sink/DefaultSinkOutputStrategyTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ public void flushDlqList() {
299299
dlqEvents.clear();
300300
}
301301

302-
public void addFailedEventsToDlq(final List<Event> events, final Throwable ex) {
302+
public void addFailedEventsToDlq(final List<Event> events, final Throwable ex, final int statusCode) {
303303
dlqEvents.addAll(events);
304304
}
305305

0 commit comments

Comments
 (0)