Skip to content

Add Prometheus Sink#6229

Merged
kkondaka merged 7 commits into
opensearch-project:mainfrom
kkondaka:amp-sink
Nov 13, 2025
Merged

Add Prometheus Sink#6229
kkondaka merged 7 commits into
opensearch-project:mainfrom
kkondaka:amp-sink

Conversation

@kkondaka

@kkondaka kkondaka commented Nov 1, 2025

Copy link
Copy Markdown
Collaborator

Description

Add Prometheus Sink.
Only supports Amazon Promethus Sink as the destination with AWS credentials.

Issues Resolved

Resolves #3028

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
  • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
@KarstenSchnitter

Copy link
Copy Markdown
Collaborator

How much effort would be required to make this compatible with a standard Prometheus instance, e.g. prom/prometheus?

@kkondaka

kkondaka commented Nov 4, 2025

Copy link
Copy Markdown
Collaborator Author

@KarstenSchnitter I think it should work with any prometheus server once we support different auth mechanisms. May be the simplest one maybe username/password auth support

Comment on lines +100 to +102
.with("statusCode", statusCode)
.with("pluginName", pluginSetting.getName())
.with("pipelineName", pluginSetting.getPipelineName());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this metadata be made into variables? Seems like common metadata all sinks may use

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. Is it OK to do this as a separate followup PR?

@KarstenSchnitter

Copy link
Copy Markdown
Collaborator

@kkondaka can you add basic auth support? Other authentications can be added on request.

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kkondaka ! This is a great improvement. I took an initial look and left some comments.

Comment thread data-prepper-plugins/prometheus-sink/build.gradle Outdated
Region region = (awsConfig != null) ? awsConfig.getAwsRegion() : awsCredentialsSupplier.getDefaultRegion().get();

sinkMetrics = new DefaultSinkMetrics(pluginMetrics, "metric");
httpSender = new PrometheusHttpSender(awsCredentialsSupplier, prometheusSinkConfiguration, sinkMetrics, prometheusSinkConfiguration.getConnectionTimeoutMs(), prometheusSinkConfiguration.getIdleTimeoutMs());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's start moving toward dependency injection. This is support now. You can see an example of this in PR #6190. I added DI to the DynamoDB source coordinator.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do this in a fast-followup PR

@@ -29,7 +37,7 @@

public class PrometheusSinkTest {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have at least one test that uses the test plugin framework to verify behavior. See GrokProcessorIT as an example. At the very least it will get the automatic tests that verify that the plugin can load.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do this in a fast-followup PR

graytaylor0
graytaylor0 previously approved these changes Nov 10, 2025
}

public String toString() {
String result = "";

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not manually build out JSON. Use Jackson.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is not needed. Removed it.

.maxTotalAttempts(config.getMaxRetries() + 1);

final long httpTimeoutMs = Math.min(
Math.max(config.getThresholdConfig().getFlushInterval() * 2, 3_000), 10_000

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have magic numbers here. Make these into constants and use a Duration to derive it for readability.

@kkondaka kkondaka Nov 10, 2025

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are copied from XRay Sink :-) And not even used. Removed it.


final RequestHeadersBuilder headersBuilder = RequestHeaders.builder()
.method(HttpMethod.POST)
.scheme(signedRequest.getUri().getScheme())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic for the HTTP request should not be tied to the signing. Get the URI independently of AWS SDK signing.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Again copied from XRay. And I will work on supporting opensource prometheus server which doesn't use AWS signing. Will address this issue then.

.authority(signedRequest.getUri().getAuthority());

// Preserve all original headers from the signed request without modification
signedRequest.headers().forEach((k, vList) -> {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding signing headers should be done by an AWS-specific class. Either the current signer or a new class.

Actually, isn't adding the headers the only thing that the AWS-specific signing class really needs to do?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Please see above.

*
* @param payload - batch the batch of spans to send
*/
public Pair<Boolean, Integer> pushToEndPoint(final byte[] payload) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid Pair classes on public methods because left and right don't have much meaning outside the pair. Just make a simple POJO model.

Also, we should not require the AWS SDK for general HTTP methods.

}
String encodedQuery = URLEncoder.encode(query, StandardCharsets.UTF_8);
String getUrlQuery = "query=" + query + "&start="+testStartTime+"&end="+endTime+"&step=1s";
String getUrl = queryRangeUrl+"?query=" + encodedQuery + "&start="+testStartTime+"&end="+endTime+"&step=1s";

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a URL builder instead of direct string manipulation. Maybe Armeria has one. If not, Spring has one that Data Prepper uses in other locations.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason query string URLBuilder() is not working. Since this is test code, I think it is OK to keep it this way for now.

assertThat(actualMetricValue, equalTo(10+j));
} else if (metric.get("quantile") != null) {
String q = (String)metric.get("quantile");
if (q.equals("0.5")) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean? How does one understand these conditions?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quantile is a well known term in OTEL. - https://opentelemetry.io/docs/specs/otel/metrics/data-model/

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.prometheus.sink.http.endpoint', System.getProperty('tests.prometheus.sink.http.endpoint')

systemProperty 'tests.prometheus.url', System.getProperty('tests.prometheus.url')

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does one run these integration tests locally? Please add instructions.

*
* @param payload - batch the batch of spans to send
*/
public Pair<Boolean, Integer> pushToEndPoint(final byte[] payload) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to pushToEndpoint.


@NotNull
@JsonProperty("url")
private String url;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you removed the insecure flag. We should require that this url start with https:// if you aren't going to keep that.

We should keep the insecure_skip_verify since this is helpful in many situations where there is not a root CA.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will work on supporting "insecure" and some auth mechanism to work with opensource prometheus server.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, "root CA" is not applicable to AWS, right? Will support other auth mechanisms in a different PR

Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>

public interface CompressionEngine {
OutputStream createOutputStream(OutputStream outputStream) throws IOException;
default byte[] compress(byte[] payload) throws IOException {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can give an implementation here. Something like this.

default byte[] compress(byte[] payload) throws IOException {
    try (ByteArrayOutputStream compressedBytesStream = new ByteArrayOutputStream();
         OutputStream compressedOut = createOutputStream(payloadOutputStream)) {
        compressedOut.write(payload);
        compressedOut.close();
        return compressedBytesStream.toByteArray();
    }
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like Stream compression is different from block compression. This implementation won't work for Prometheus.

Signed-off-by: Kondaka <krishkdk@amazon.com>
@dlvenable

Copy link
Copy Markdown
Member

@kkondaka , Thank you for this great new feature! Let's address some of the remaining comments in a follow-on PR.

@kkondaka kkondaka merged commit 3eaf52a into opensearch-project:main Nov 13, 2025
78 of 80 checks passed
* @param awsCredentialsSupplier the AWS credentials supplier
* @param config The configuration for the Prometheus sink plugin.
*/
public PrometheusHttpSender(@Nonnull final AwsCredentialsSupplier awsCredentialsSupplier, @Nonnull final PrometheusSinkConfiguration config, @Nonnull final SinkMetrics sinkMetrics, final long connectionTimeoutMillis, final long idleTimeoutMillis) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a unit test for this class?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add it in the followup PR

private static WebClient buildWebClient(final PrometheusSinkConfiguration config) {
final RetryRuleWithContent<HttpResponse> retryRule = RetryRuleWithContent.<HttpResponse>builder()
.onStatus((ctx, status) -> RETRYABLE_STATUS_CODES.contains(status.code()))
.thenBackoff(Backoff.exponential(100, 10_000).withJitter(0.2));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, these values could be moved to the Sink configuration.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for configuring this for now. I will make them constants.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support Prometheus as a Sink storage (remote-write model)

5 participants