Add Prometheus Sink#6229
Conversation
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
|
How much effort would be required to make this compatible with a standard Prometheus instance, e.g. |
|
@KarstenSchnitter I think it should work with any prometheus server once we support different auth mechanisms. May be the simplest one maybe username/password auth support |
| .with("statusCode", statusCode) | ||
| .with("pluginName", pluginSetting.getName()) | ||
| .with("pipelineName", pluginSetting.getPipelineName()); |
There was a problem hiding this comment.
Should this metadata be made into variables? Seems like common metadata all sinks may use
There was a problem hiding this comment.
We could. Is it OK to do this as a separate followup PR?
|
@kkondaka can you add basic auth support? Other authentications can be added on request. |
| Region region = (awsConfig != null) ? awsConfig.getAwsRegion() : awsCredentialsSupplier.getDefaultRegion().get(); | ||
|
|
||
| sinkMetrics = new DefaultSinkMetrics(pluginMetrics, "metric"); | ||
| httpSender = new PrometheusHttpSender(awsCredentialsSupplier, prometheusSinkConfiguration, sinkMetrics, prometheusSinkConfiguration.getConnectionTimeoutMs(), prometheusSinkConfiguration.getIdleTimeoutMs()); |
There was a problem hiding this comment.
Let's start moving toward dependency injection. This is support now. You can see an example of this in PR #6190. I added DI to the DynamoDB source coordinator.
There was a problem hiding this comment.
I will do this in a fast-followup PR
| @@ -29,7 +37,7 @@ | |||
|
|
|||
| public class PrometheusSinkTest { | |||
There was a problem hiding this comment.
We should have at least one test that uses the test plugin framework to verify behavior. See GrokProcessorIT as an example. At the very least it will get the automatic tests that verify that the plugin can load.
There was a problem hiding this comment.
I will do this in a fast-followup PR
| } | ||
|
|
||
| public String toString() { | ||
| String result = ""; |
There was a problem hiding this comment.
Let's not manually build out JSON. Use Jackson.
There was a problem hiding this comment.
Actually, this is not needed. Removed it.
| .maxTotalAttempts(config.getMaxRetries() + 1); | ||
|
|
||
| final long httpTimeoutMs = Math.min( | ||
| Math.max(config.getThresholdConfig().getFlushInterval() * 2, 3_000), 10_000 |
There was a problem hiding this comment.
You have magic numbers here. Make these into constants and use a Duration to derive it for readability.
There was a problem hiding this comment.
They are copied from XRay Sink :-) And not even used. Removed it.
|
|
||
| final RequestHeadersBuilder headersBuilder = RequestHeaders.builder() | ||
| .method(HttpMethod.POST) | ||
| .scheme(signedRequest.getUri().getScheme()) |
There was a problem hiding this comment.
The logic for the HTTP request should not be tied to the signing. Get the URI independently of AWS SDK signing.
There was a problem hiding this comment.
Agreed. Again copied from XRay. And I will work on supporting opensource prometheus server which doesn't use AWS signing. Will address this issue then.
| .authority(signedRequest.getUri().getAuthority()); | ||
|
|
||
| // Preserve all original headers from the signed request without modification | ||
| signedRequest.headers().forEach((k, vList) -> { |
There was a problem hiding this comment.
Adding signing headers should be done by an AWS-specific class. Either the current signer or a new class.
Actually, isn't adding the headers the only thing that the AWS-specific signing class really needs to do?
There was a problem hiding this comment.
Agreed. Please see above.
| * | ||
| * @param payload - batch the batch of spans to send | ||
| */ | ||
| public Pair<Boolean, Integer> pushToEndPoint(final byte[] payload) { |
There was a problem hiding this comment.
Let's avoid Pair classes on public methods because left and right don't have much meaning outside the pair. Just make a simple POJO model.
Also, we should not require the AWS SDK for general HTTP methods.
| } | ||
| String encodedQuery = URLEncoder.encode(query, StandardCharsets.UTF_8); | ||
| String getUrlQuery = "query=" + query + "&start="+testStartTime+"&end="+endTime+"&step=1s"; | ||
| String getUrl = queryRangeUrl+"?query=" + encodedQuery + "&start="+testStartTime+"&end="+endTime+"&step=1s"; |
There was a problem hiding this comment.
Use a URL builder instead of direct string manipulation. Maybe Armeria has one. If not, Spring has one that Data Prepper uses in other locations.
There was a problem hiding this comment.
For some reason query string URLBuilder() is not working. Since this is test code, I think it is OK to keep it this way for now.
| assertThat(actualMetricValue, equalTo(10+j)); | ||
| } else if (metric.get("quantile") != null) { | ||
| String q = (String)metric.get("quantile"); | ||
| if (q.equals("0.5")) { |
There was a problem hiding this comment.
What does this mean? How does one understand these conditions?
There was a problem hiding this comment.
Quantile is a well known term in OTEL. - https://opentelemetry.io/docs/specs/otel/metrics/data-model/
| classpath = sourceSets.integrationTest.runtimeClasspath | ||
| systemProperty 'tests.prometheus.sink.http.endpoint', System.getProperty('tests.prometheus.sink.http.endpoint') | ||
|
|
||
| systemProperty 'tests.prometheus.url', System.getProperty('tests.prometheus.url') |
There was a problem hiding this comment.
How does one run these integration tests locally? Please add instructions.
| * | ||
| * @param payload - batch the batch of spans to send | ||
| */ | ||
| public Pair<Boolean, Integer> pushToEndPoint(final byte[] payload) { |
|
|
||
| @NotNull | ||
| @JsonProperty("url") | ||
| private String url; |
There was a problem hiding this comment.
It looks like you removed the insecure flag. We should require that this url start with https:// if you aren't going to keep that.
We should keep the insecure_skip_verify since this is helpful in many situations where there is not a root CA.
There was a problem hiding this comment.
I will work on supporting "insecure" and some auth mechanism to work with opensource prometheus server.
There was a problem hiding this comment.
Also, "root CA" is not applicable to AWS, right? Will support other auth mechanisms in a different PR
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
|
|
||
| public interface CompressionEngine { | ||
| OutputStream createOutputStream(OutputStream outputStream) throws IOException; | ||
| default byte[] compress(byte[] payload) throws IOException { |
There was a problem hiding this comment.
We can give an implementation here. Something like this.
default byte[] compress(byte[] payload) throws IOException {
try (ByteArrayOutputStream compressedBytesStream = new ByteArrayOutputStream();
OutputStream compressedOut = createOutputStream(payloadOutputStream)) {
compressedOut.write(payload);
compressedOut.close();
return compressedBytesStream.toByteArray();
}
}
There was a problem hiding this comment.
Looks like Stream compression is different from block compression. This implementation won't work for Prometheus.
Signed-off-by: Kondaka <krishkdk@amazon.com>
|
@kkondaka , Thank you for this great new feature! Let's address some of the remaining comments in a follow-on PR. |
| * @param awsCredentialsSupplier the AWS credentials supplier | ||
| * @param config The configuration for the Prometheus sink plugin. | ||
| */ | ||
| public PrometheusHttpSender(@Nonnull final AwsCredentialsSupplier awsCredentialsSupplier, @Nonnull final PrometheusSinkConfiguration config, @Nonnull final SinkMetrics sinkMetrics, final long connectionTimeoutMillis, final long idleTimeoutMillis) { |
There was a problem hiding this comment.
Is there a unit test for this class?
There was a problem hiding this comment.
Will add it in the followup PR
| private static WebClient buildWebClient(final PrometheusSinkConfiguration config) { | ||
| final RetryRuleWithContent<HttpResponse> retryRule = RetryRuleWithContent.<HttpResponse>builder() | ||
| .onStatus((ctx, status) -> RETRYABLE_STATUS_CODES.contains(status.code())) | ||
| .thenBackoff(Backoff.exponential(100, 10_000).withJitter(0.2)); |
There was a problem hiding this comment.
Perhaps, these values could be moved to the Sink configuration.
There was a problem hiding this comment.
No need for configuring this for now. I will make them constants.
Description
Add Prometheus Sink.
Only supports Amazon Promethus Sink as the destination with AWS credentials.
Issues Resolved
Resolves #3028
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.