Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@

public interface CompressionEngine {
OutputStream createOutputStream(OutputStream outputStream) throws IOException;
default byte[] compress(byte[] payload) throws IOException {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can give an implementation here. Something like this.

default byte[] compress(byte[] payload) throws IOException {
    try (ByteArrayOutputStream compressedBytesStream = new ByteArrayOutputStream();
         OutputStream compressedOut = createOutputStream(payloadOutputStream)) {
        compressedOut.write(payload);
        compressedOut.close();
        return compressedBytesStream.toByteArray();
    }
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like Stream compression is different from block compression. This implementation won't work for Prometheus.

throw new RuntimeException("Unsupported Operation");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.codec;

import org.junit.jupiter.api.Test;
import static org.mockito.Mockito.mock;
import org.mockito.invocation.InvocationOnMock;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.nio.charset.StandardCharsets;

public class CompressionEngineTest {
@Test
void defaultCompressionEngineTest() {
CompressionEngine compressionEngine = mock(CompressionEngine.class, InvocationOnMock::callRealMethod);
byte[] bytes = "test".getBytes(StandardCharsets.UTF_8);
assertThrows(RuntimeException.class, () -> compressionEngine.compress(bytes));
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/
package org.opensearch.dataprepper.common.sink;

import java.time.Instant;
Expand Down Expand Up @@ -45,7 +49,7 @@ public boolean isMaxEventsLimitReached() {

@Override
public boolean exceedsFlushTimeInterval() {
long curTime = Instant.now().toEpochMilli();
long curTime = System.currentTimeMillis();
return (curTime - lastFlushedTimeMs >= flushIntervalMs);
}

Expand All @@ -56,6 +60,9 @@ public boolean willExceedMaxRequestSizeBytes(final SinkBufferEntry sinkBufferEnt

@Override
public SinkFlushableBuffer getFlushableBuffer(final SinkFlushContext sinkFlushContext) {
numEvents = 0;
currentRequestSize = 0L;
lastFlushedTimeMs = System.currentTimeMillis();
return sinkBufferWriter.getBuffer(sinkFlushContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
public class DefaultSinkFlushResult implements SinkFlushResult {
private final List<Event> events;
private final Throwable exception;
private final int statusCode;

public DefaultSinkFlushResult(final List<Event> events, final Throwable exception) {
public DefaultSinkFlushResult(final List<Event> events, final int statusCode, final Throwable exception) {
this.events = events;
this.exception = exception;
this.statusCode = statusCode;
}

public List<Event> getEvents() {
Expand All @@ -25,5 +27,9 @@ public List<Event> getEvents() {
public Throwable getException() {
return exception;
}

public int getStatusCode() {
return statusCode;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,26 @@ public DefaultSinkOutputStrategy(final LockStrategy lockStrategy, final SinkBuff

public void flushBuffer() {
long startTime = System.nanoTime();
// getBuffer() should return the buffer contents
// getFlushableBuffer() should return the buffer contents
SinkFlushableBuffer flushableBuffer = sinkBuffer.getFlushableBuffer(sinkFlushContext);
List<Event> events = flushableBuffer.getEvents();
try {
SinkFlushResult flushResult = flushableBuffer.flush();
if (flushResult == null) { // success
sinkMetrics.recordRequestLatency((double)(System.nanoTime() - startTime));
List<Event> events = flushableBuffer.getEvents();
for (final Event event: events) {
event.getEventHandle().release(true);
}
} else {
// flush Result should contain the events that are
// failed to be delivered, so that these events can be forwarded to DLQ
addFailedEventsToDlq(flushResult.getEvents(), flushResult.getException());
addFailedEventsToDlq(flushResult.getEvents(), flushResult.getException(), flushResult.getStatusCode());
}
} catch (Exception e) {
// Add list of events to DLQ
addFailedEventsToDlq(flushableBuffer.getEvents(), e);
sinkMetrics.incrementRequestsFailedCounter(1);
sinkMetrics.incrementEventsFailedCounter(events.size());
addFailedEventsToDlq(events, e, 0);
}
}

Expand Down Expand Up @@ -91,7 +93,7 @@ public void execute(Collection<Record<Event>> records) {
}
} catch (Exception ex) {
LOG.warn(NOISY, "Failed process the event ", ex);
addFailedEventsToDlq(List.of(event), ex);
addFailedEventsToDlq(List.of(event), ex, 0);
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@

public interface SinkDlqHandler {
void flushDlqList();
void addFailedEventsToDlq(final List<Event> events, final Throwable ex);
void addFailedEventsToDlq(final List<Event> events, final Throwable ex, final int statusCode);

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
public interface SinkFlushResult {
List<Event> getEvents();
Throwable getException();
int getStatusCode();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.dataprepper.model.codec.CompressionEngine;
import org.xerial.snappy.SnappyOutputStream;
import org.xerial.snappy.Snappy;

import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -16,4 +17,9 @@ public class SnappyCompressionEngine implements CompressionEngine {
public OutputStream createOutputStream(final OutputStream outputStream) throws IOException {
return new SnappyOutputStream(outputStream);
}

@Override
public byte[] compress(byte[] payload) throws IOException {
return Snappy.compress(payload);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import org.mockito.Mock;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.equalTo;

import java.util.List;
import java.util.Random;

public class DefaultSinkFlushResultTest {
private List<Event> events;
Expand All @@ -22,16 +24,21 @@ public class DefaultSinkFlushResultTest {

private DefaultSinkFlushResult defaultSinkFlushResult;

private int statusCode;

private DefaultSinkFlushResult createObjectUnderTest() {
return new DefaultSinkFlushResult(events, exception);
return new DefaultSinkFlushResult(events, statusCode, exception);
}

@Test
public void test_basic() {
exception = mock(Throwable.class);
events = List.of();
Random random = new Random();
statusCode = random.nextInt(500);
defaultSinkFlushResult = createObjectUnderTest();
assertThat(defaultSinkFlushResult.getEvents(), sameInstance(events));
assertThat(defaultSinkFlushResult.getException(), sameInstance(exception));
assertThat(defaultSinkFlushResult.getStatusCode(), equalTo(statusCode));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public void flushDlqList() {
dlqEvents.clear();
}

public void addFailedEventsToDlq(final List<Event> events, final Throwable ex) {
public void addFailedEventsToDlq(final List<Event> events, final Throwable ex, final int statusCode) {
dlqEvents.addAll(events);
}

Expand Down
52 changes: 48 additions & 4 deletions data-prepper-plugins/prometheus-sink/build.gradle
Original file line number Diff line number Diff line change
@@ -1,9 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/


plugins {
id 'java'
id 'java-library'
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
}
}

dependencies {
implementation project(':data-prepper-api')
implementation project(path: ':data-prepper-plugins:common')
implementation 'com.arpnetworking.metrics:prometheus-remote-protocol:1.0.1'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(':data-prepper-plugins:otel-proto-common')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
Expand All @@ -13,16 +40,26 @@ dependencies {
implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'software.amazon.awssdk:auth'
implementation 'io.prometheus:client:0.0.10'
implementation libs.commons.lang3
implementation project(':data-prepper-plugins:failures-common')
implementation 'org.apache.httpcomponents.client5:httpclient5:5.2'
implementation 'org.apache.httpcomponents.core5:httpcore5:5.3.3'
implementation 'io.github.acm19:aws-request-signing-apache-interceptor:3.0.0'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.4.2'
implementation 'org.xerial.snappy:snappy-java:1.1.10.1'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:apache-client'
implementation 'software.amazon.awssdk:sts'
implementation 'software.amazon.awssdk:acm'
implementation 'com.github.scribejava:scribejava-core:8.3.3'
implementation project(':data-prepper-plugin-framework')
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation project(':data-prepper-test:test-common')
testImplementation project(':data-prepper-core')
testImplementation project(':data-prepper-plugin-framework')
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'

implementation libs.armeria.core
}

test {
Expand All @@ -45,16 +82,23 @@ configurations {
integrationTestRuntime.extendsFrom testRuntime
}

/*
* To run:
* ./gradlew data-prepper-plugins:prometheus-sink:integrationTest -Dtests.aws.region=<region> -Dtests.aws.role=<role> -Dtests.prometheus.url=<workspace-url>
* workspace-url looks like https://aps-workspaces.<region>.amazonaws.com/workspaces/<workspace-id>
* (do not include "api/v1/remote_write")
*/
task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.prometheus.sink.http.endpoint', System.getProperty('tests.prometheus.sink.http.endpoint')

systemProperty 'tests.prometheus.url', System.getProperty('tests.prometheus.url')

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does one run these integration tests locally? Please add instructions.

systemProperty 'tests.aws.region', System.getProperty('tests.aws.region')
systemProperty 'tests.aws.role', System.getProperty('tests.aws.role')
filter {
includeTestsMatching '*IT'
}
}
}
Loading
Loading