Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-prepper-plugins/prometheus-sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies {
implementation project(':data-prepper-plugin-framework')
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation project(':data-prepper-test:test-common')
testImplementation project(':data-prepper-test:plugin-test-framework')
testImplementation project(':data-prepper-core')
testImplementation project(':data-prepper-plugin-framework')
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

package org.opensearch.dataprepper.plugins.sink.prometheus;

import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.metric.JacksonSum;
import org.opensearch.dataprepper.model.metric.JacksonSummary;
import org.opensearch.dataprepper.model.metric.JacksonGauge;
Expand Down Expand Up @@ -94,6 +95,7 @@

@ExtendWith(MockitoExtension.class)
public class PrometheusSinkAMPIT {
private static final String TEST_PIPELINE_NAME = "testPipeline";
private static final long NANO_MULTIPLIER = 1_000_000_000L;
private static final int TEST_READ_BATCH_TIMEOUT = 500;
private static final int TEST_PROCESSOR_THREADS = 1;
Expand Down Expand Up @@ -130,6 +132,8 @@ public class PrometheusSinkAMPIT {
private EventHandle eventHandle;
@Mock
private Pipeline dlqPipeline;
@Mock
private PipelineDescription pipelineDescription;

private String awsRegion;
private String awsRole;
Expand All @@ -146,6 +150,7 @@ public class PrometheusSinkAMPIT {
private WebClient webClient;
private PrometheusSinkThresholdConfig thresholdConfig;


@BeforeEach
void setUp() {
webClient = WebClient.builder()
Expand All @@ -157,6 +162,7 @@ void setUp() {
.build();

eventHandle = mock(EventHandle.class);
pipelineDescription = mock(PipelineDescription.class);
awsCredentialsProvider = DefaultCredentialsProvider.create();
metricsInAMP = 0;
testStartTime = Instant.now();
Expand All @@ -174,13 +180,11 @@ void setUp() {
awsConfig = mock(AwsConfig.class);
when(pluginSetting.getPipelineName()).thenReturn("pipeline");
when(pluginSetting.getName()).thenReturn("name");


metricsSuccessCounter = mock(Counter.class);
metricsFailedCounter = mock(Counter.class);
requestsSuccessCounter = mock(Counter.class);
requestsFailedCounter = mock(Counter.class);

summary = mock(DistributionSummary.class);

when(pluginMetrics.counter(eq("sinkRequestsSucceeded"))).thenReturn(requestsSuccessCounter);
Expand Down Expand Up @@ -222,7 +226,7 @@ void setUp() {
}

private PrometheusSink createObjectUnderTest() {
return new PrometheusSink(pluginSetting, pluginMetrics, pluginFactory, prometheusSinkConfig, awsCredentialsSupplier);
return new PrometheusSink(pluginSetting, pluginMetrics, pipelineDescription, prometheusSinkConfig, awsCredentialsSupplier);
}

private void getMetricsFromAMP(final String metricName, final String qs) throws Exception {
Expand All @@ -242,7 +246,7 @@ private void getMetricsFromAMP(final String metricName, final String qs) throws
queryStr = metricName+"|"+metricName+"_sum|"+metricName+"_count|attrKey1|attrKey2";
query = "{__name__=~\""+queryStr+"\"}";
} else if (qs.equals("sum")) {
queryStr = metricName+"|"+metricName+"_sum";
queryStr = metricName+"|"+metricName+"_total";
query = "{__name__=~\""+queryStr+"\"}";
} else {
query = metricName;
Expand All @@ -263,6 +267,7 @@ private void getMetricsFromAMP(final String metricName, final String qs) throws
headersBuilder.add(k, v);
});
});

HttpRequest request = HttpRequest.of(headersBuilder.build(), HttpData.ofAscii(getUrlQuery));
webClient.execute(request).aggregate()
.thenAccept(response -> {
Expand Down Expand Up @@ -328,6 +333,7 @@ void TestSumMetrics() throws Exception {

@Test
void TestSumMetricsFailuresWithDLQ() throws Exception {
when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);
dlqPipeline = mock(Pipeline.class);
doAnswer(a -> {
Collection<Record<Event>> records = (Collection<Record<Event>>)a.getArgument(0);
Expand Down Expand Up @@ -443,7 +449,7 @@ private Collection<Record<Event>> getSumRecordList(int numberOfRecords, final St
.withStartTime(convertUnixNanosToISO8601(startTimeNanos))
.withIsMonotonic(true)
.withUnit("1")
.withAggregationTemporality("delta")
.withAggregationTemporality("AGGREGATION_TEMPORALITY_CUMULATIVE")
.withResource(Map.of("attributes", Map.of("attrKey1", 1, "attrKey2", Map.of("attrKey3", "attrValue3"))))
.withValue((double)metricValue+i)
.withEventHandle(eventHandle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.dataprepper.model.codec.CompressionEngine;
import com.linecorp.armeria.client.ClientFactory;
import com.linecorp.armeria.client.ClientOptions;
import com.google.common.annotations.VisibleForTesting;

import javax.annotation.Nonnull;
import java.nio.charset.StandardCharsets;
Expand All @@ -46,6 +47,9 @@ public class PrometheusHttpSender {
private static final Logger LOG = LoggerFactory.getLogger(PrometheusHttpSender.class);
private static final int DEFAULT_MAX_REQUEST_SIZE = 1024*1024; // 1MB
private static final Set<Integer> RETRYABLE_STATUS_CODES = Set.of(429, 502, 503, 504);
private static final int BACKOFF_INITIAL_DELAY_MS = 100;
private static final int BACKOFF_MAX_DELAY_MS = 10_000;
private static final double BACKOFF_DEFAULT_JITTER = 0.2;

private final PrometheusSigV4Signer signer;
private final WebClient webClient;
Expand All @@ -61,14 +65,20 @@ public class PrometheusHttpSender {
* @param awsCredentialsSupplier the AWS credentials supplier
* @param config The configuration for the Prometheus sink plugin.
*/
public PrometheusHttpSender(@Nonnull final AwsCredentialsSupplier awsCredentialsSupplier, @Nonnull final PrometheusSinkConfiguration config, @Nonnull final SinkMetrics sinkMetrics, final long connectionTimeoutMillis, final long idleTimeoutMillis) {
public PrometheusHttpSender(@Nonnull final AwsCredentialsSupplier awsCredentialsSupplier, @Nonnull final PrometheusSinkConfiguration config, @Nonnull final SinkMetrics sinkMetrics) {
this(awsCredentialsSupplier, buildWebClient(config), config, sinkMetrics);
}

@VisibleForTesting
public PrometheusHttpSender(@Nonnull final AwsCredentialsSupplier awsCredentialsSupplier, @Nonnull WebClient webClient, @Nonnull final PrometheusSinkConfiguration config, @Nonnull final SinkMetrics sinkMetrics) {
this.signer = config.getAwsConfig() != null ? new PrometheusSigV4Signer(awsCredentialsSupplier, config) : null;
this.webClient = buildWebClient(config);
this.webClient = webClient;
this.compressionEngine = config.getEncoding().getCompressionEngine();
this.sinkMetrics = sinkMetrics;
this.config = config;
this.connectionTimeoutMillis = connectionTimeoutMillis;
this.idleTimeoutMillis = idleTimeoutMillis;
this.connectionTimeoutMillis = config.getConnectionTimeout().toMillis();
this.idleTimeoutMillis = config.getIdleTimeout().toMillis();

}

/**
Expand All @@ -86,7 +96,7 @@ public PrometheusHttpSender(@Nonnull final AwsCredentialsSupplier awsCredentials
private static WebClient buildWebClient(final PrometheusSinkConfiguration config) {
final RetryRuleWithContent<HttpResponse> retryRule = RetryRuleWithContent.<HttpResponse>builder()
.onStatus((ctx, status) -> RETRYABLE_STATUS_CODES.contains(status.code()))
.thenBackoff(Backoff.exponential(100, 10_000).withJitter(0.2));
.thenBackoff(Backoff.exponential(BACKOFF_INITIAL_DELAY_MS, BACKOFF_MAX_DELAY_MS).withJitter(BACKOFF_DEFAULT_JITTER));

final long estimatedContentLimit = Math.max(1, DEFAULT_MAX_REQUEST_SIZE) * (config.getMaxRetries() + 1);
final int safeContentLimit = (int) Math.min(estimatedContentLimit, Integer.MAX_VALUE);
Expand Down Expand Up @@ -125,7 +135,7 @@ public PrometheusPushResult pushToEndpoint(final byte[] payload) {
.thenApply(response -> {
final long latency = System.currentTimeMillis() - startTime;
sinkMetrics.recordRequestSize(compressedBufferData.length);
LOG.error("Response received in {}ms. Status: {}", latency, response.status());
LOG.debug("Response received in {}ms. Status: {}", latency, response.status());

int statusCode = response.status().code();
final byte[] responseBytes = response.content().array();
Expand Down Expand Up @@ -168,6 +178,7 @@ private HttpRequest buildHttpRequest(final byte[] payload) {
.path(sdkHttpRequest.getUri().getRawPath())
.authority(sdkHttpRequest.getUri().getAuthority());


// Preserve all original headers from the signed request without modification
sdkHttpRequest.headers().forEach((k, vList) -> {
// Add each header value individually to preserve exact format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package org.opensearch.dataprepper.plugins.sink.prometheus;

import org.opensearch.dataprepper.aws.api.AwsConfig;
import org.opensearch.dataprepper.plugins.sink.prometheus.configuration.PrometheusSinkConfiguration;
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
Expand Down Expand Up @@ -41,15 +42,24 @@ class PrometheusSigV4Signer {
this.region = config.getAwsConfig().getAwsRegion();

this.config = config;
this.credentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder()
.withRegion(region)
.withStsRoleArn(config.getAwsConfig().getAwsStsRoleArn())
.withStsExternalId(config.getAwsConfig().getAwsStsExternalId())
.build());
this.credentialsProvider = awsCredentialsSupplier.getProvider(convertToCredentialOptions(config.getAwsConfig()));

this.endpointUri = URI.create(url);
}

private static AwsCredentialsOptions convertToCredentialOptions(final AwsConfig awsConfig) {
if (awsConfig == null) {
return AwsCredentialsOptions.builder().build();
}
return AwsCredentialsOptions.builder()
.withRegion(awsConfig.getAwsRegion())
.withStsRoleArn(awsConfig.getAwsStsRoleArn())
.withStsExternalId(awsConfig.getAwsStsExternalId())
.withStsHeaderOverrides(awsConfig.getAwsStsHeaderOverrides())
.build();
}


/**
* Constructs a SigV4 signer helper.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.AbstractSink;
Expand All @@ -48,7 +48,7 @@ public class PrometheusSink extends AbstractSink<Record<Event>> {
@DataPrepperPluginConstructor
public PrometheusSink(final PluginSetting pluginSetting,
final PluginMetrics pluginMetrics,
final PluginFactory pluginFactory,
final PipelineDescription pipelineDescription,
final PrometheusSinkConfiguration prometheusSinkConfiguration,
final AwsCredentialsSupplier awsCredentialsSupplier) {
super(pluginSetting);
Expand All @@ -58,9 +58,7 @@ public PrometheusSink(final PluginSetting pluginSetting,
Region region = (awsConfig != null) ? awsConfig.getAwsRegion() : awsCredentialsSupplier.getDefaultRegion().get();

sinkMetrics = new DefaultSinkMetrics(pluginMetrics, "Metric");
httpSender = new PrometheusHttpSender(awsCredentialsSupplier, prometheusSinkConfiguration, sinkMetrics,
prometheusSinkConfiguration.getConnectionTimeout().toMillis(),
prometheusSinkConfiguration.getIdleTimeout().toMillis());
httpSender = new PrometheusHttpSender(awsCredentialsSupplier, prometheusSinkConfiguration, sinkMetrics);

PrometheusSinkThresholdConfig thresholdConfig = prometheusSinkConfiguration.getThresholdConfig();

Expand All @@ -69,8 +67,7 @@ public PrometheusSink(final PluginSetting pluginSetting,
sinkMetrics,
httpSender,
getFailurePipeline(),
pluginMetrics,
pluginSetting);
pipelineDescription);
}

private static AwsCredentialsOptions convertToCredentialOptions(final AwsConfig awsConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class PrometheusSinkThresholdConfig {
private int maxEvents = DEFAULT_MAX_EVENTS;

@JsonProperty("max_request_size")
private String maxRequestSize = DEFAULT_MAX_REQUEST_SIZE;
private ByteCount maxRequestSize = ByteCount.parse(DEFAULT_MAX_REQUEST_SIZE);

@JsonProperty("flush_interval")
@DurationMin(seconds = 1)
Expand All @@ -44,7 +44,7 @@ public int getMaxEvents() {
}

public long getMaxRequestSizeBytes() {
return ByteCount.parse(maxRequestSize).getBytes();
return maxRequestSize.getBytes();
}

public long getFlushInterval() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
import org.opensearch.dataprepper.common.sink.SinkMetrics;
import org.opensearch.dataprepper.common.sink.SinkBufferEntry;
import org.opensearch.dataprepper.common.sink.ReentrantLockStrategy;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
Expand All @@ -33,24 +32,24 @@


public class PrometheusSinkService extends DefaultSinkOutputStrategy {

static final String PLUGIN_NAME = "prometheus";
private static final Logger LOG = LoggerFactory.getLogger(PrometheusSinkService.class);
public static final String PROMETHEUS_SINK_RECORDS_SUCCESS_COUNTER = "prometheusSinkRecordsNumberOfSuccessful";

public static final String PROMETHEUS_SINK_RECORDS_FAILED_COUNTER = "prometheusSinkRecordsNumberOfFailed";
private final PrometheusHttpSender httpSender;
private final PluginSetting pluginSetting;
private final PipelineDescription pipelineDescription;
private final List<Record<Event>> dlqRecords;
private final boolean sanitizeNames;
private HeadlessPipeline dlqPipeline;
private boolean dropIfNoDLQConfigured;
private String pluginName;

public PrometheusSinkService(final PrometheusSinkConfiguration prometheusSinkConfiguration,
final SinkMetrics sinkMetrics,
final PrometheusHttpSender httpSender,
final HeadlessPipeline dlqPipeline,
final PluginMetrics pluginMetrics,
final PluginSetting pluginSetting) {
final PipelineDescription pipelineDescription) {
super(new ReentrantLockStrategy(),
new DefaultSinkBuffer(prometheusSinkConfiguration.getThresholdConfig().getMaxEvents(),
prometheusSinkConfiguration.getThresholdConfig().getMaxRequestSizeBytes(),
Expand All @@ -63,7 +62,7 @@ public PrometheusSinkService(final PrometheusSinkConfiguration prometheusSinkCon
this.dlqPipeline = dlqPipeline;
this.dlqRecords = new ArrayList<>();
this.httpSender = httpSender;
this.pluginSetting = pluginSetting;
this.pipelineDescription = pipelineDescription;
}

public void addFailedEventsToDLQ(final List<Event> events, final Throwable ex) {
Expand Down Expand Up @@ -104,8 +103,8 @@ public void addFailedEventsToDlq(final List<Event> failedEvents, final Throwable
}
event.updateFailureMetadata()
.with("statusCode", statusCode)
.with("pluginName", pluginSetting.getName())
.with("pipelineName", pluginSetting.getPipelineName());
.with("pluginName", PLUGIN_NAME)
.with("pipelineName", pipelineDescription.getPipelineName());
if (ex != null) {
event.updateFailureMetadata()
.with("message", ex.getMessage());
Expand Down
Loading
Loading