Skip to content

Commit 5d6ed88

Browse files
committed
Add support for basic-auth and no-auth for prometheus sink
Signed-off-by: ps48 <pshenoy36@gmail.com>
1 parent 637eda0 commit 5d6ed88

6 files changed

Lines changed: 391 additions & 133 deletions

File tree

Lines changed: 109 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -1,170 +1,159 @@
11
# Prometheus Sink
22

3-
This is the Data Prepper Prometheus sink plugin that sends records to http/https endpoints. You can use the sink to send data to arbitrary HTTP Endpoints which can be backed by prometheus.
4-
3+
This is the Data Prepper Prometheus sink plugin that sends metric data to Prometheus Remote Write endpoints. It supports both open source Prometheus and AWS Managed Prometheus (AMP).
54

65
## Usages
76

8-
The Prometheus sink should be configured as part of Data Prepper pipeline yaml file.
7+
The Prometheus sink should be configured as part of a Data Prepper pipeline YAML file.
98

10-
### Response status
9+
### Open Source Prometheus (No Auth)
1110

12-
* `200`: the request data has been successfully pushed to http endpoint.
13-
* `500`: internal server error while process the request data.
14-
* `400`: bad request error
15-
* `404`: the http endpoint is not reachable
16-
* `501`: the server does not recognize the request method and is incapable of supporting it for any resource
11+
To use with a vanilla Prometheus instance, provide an `http://` or `https://` URL. No `aws` block is needed.
1712

18-
### HTTP Basic authentication
19-
```
13+
Prometheus must be started with the `--web.enable-remote-write-receiver` flag.
14+
15+
```yaml
2016
pipeline:
2117
...
2218
sink:
23-
- prometheus:
24-
authentication:
25-
http_basic:
26-
username: my-user
27-
password: my_s3cr3t
19+
- prometheus:
20+
url: "http://localhost:9090/api/v1/write"
21+
threshold:
22+
max_events: 500
23+
flush_interval: 5s
2824
```
2925
30-
### HTTP Bearer token authentication
31-
```
26+
### Open Source Prometheus with Basic Auth
27+
28+
To authenticate with HTTP Basic credentials (e.g., when Prometheus is behind a reverse proxy with basic auth):
29+
30+
```yaml
3231
pipeline:
3332
...
3433
sink:
35-
- prometheus:
36-
authentication:
37-
bearer_token:
38-
client_id: 0oaafr4j79grYGC5d7
39-
client_secret: fFel-3FutCXAOndezEsOVlght6D6DR4OIt7G5D1_oJ6YtgU17JdyXmGf0M
40-
token_url: https://localhost/oauth2/default/v1/token
41-
grant_type: client_credentials
42-
scope: prometheusSink
34+
- prometheus:
35+
url: "http://localhost:9090/api/v1/write"
36+
authentication:
37+
http_basic:
38+
username: "promuser"
39+
password: "prompass"
4340
```
4441
45-
## Configuration
46-
47-
- `url` The http/https endpoint url which can bee backed by prometheus.
48-
49-
- `encoding` Default is snappy
42+
### AWS Managed Prometheus (AMP)
5043
51-
- `content_type` Default is application/x-protobuf
44+
To use with AMP, provide the `aws` configuration block. An `https://` URL is required when using AWS authentication.
5245

53-
- `remote_write_version` : Prometheus Remote.Writer version Version, Default is 0.1.0
54-
55-
- `proxy`(optional): A String of the address of a forward HTTP proxy. The format is like "<host-name-or-ip>:\<port\>". Examples: "example.com:8100", "http://example.com:8100", "112.112.112.112:8100". Note: port number cannot be omitted.
46+
```yaml
47+
pipeline:
48+
...
49+
sink:
50+
- prometheus:
51+
url: "https://aps-workspaces.us-east-2.amazonaws.com/workspaces/ws-xxxxxxxx-xxxx/api/v1/remote_write"
52+
aws:
53+
region: "us-east-2"
54+
sts_role_arn: "arn:aws:iam::123456789012:role/data-prepper-prometheus-role"
55+
threshold:
56+
max_events: 500
57+
flush_interval: 5s
58+
```
5659

57-
- `http_method` (Optional) : HttpMethod to be used. Default is POST.
60+
### Response Status
5861

59-
- `auth_type` (Optional): Authentication type configuration. By default, this runs an unauthenticated server.
62+
* `200`: The request data has been successfully pushed to the endpoint.
63+
* `400`: Bad request error.
64+
* `404`: The endpoint is not reachable.
65+
* `429`: Too many requests (retried automatically).
66+
* `500`: Internal server error.
67+
* `502`, `503`, `504`: Server errors (retried automatically).
6068

61-
- `username`(optional): A string of username required for basic authentication
69+
## Configuration
6270

63-
- `password`(optional): A string of password required for basic authentication
71+
### Required
6472

65-
- `client_id`: It is the client id is the public identifier of your authorization server.
73+
| Option | Description |
74+
|--------|-------------|
75+
| `url` | The Prometheus Remote Write endpoint URL. Supports `http://` and `https://` schemes. When `aws` is configured, `https://` is required. |
6676

67-
- `client_secret` : It is a secret known only to the application and the authorization server.
77+
### Optional
6878

69-
- `token_url`: The End point URL of the OAuth server.(Eg: /oauth2/default/v1/token)
79+
| Option | Default | Description |
80+
|--------|---------|-------------|
81+
| `aws` | `null` | AWS configuration for SigV4 signing. When present, requests are signed with AWS credentials. See [AWS Configuration](#aws-configuration). |
82+
| `authentication` | `null` | HTTP Basic authentication credentials. See [Authentication](#authentication). Cannot be used with `aws`. |
83+
| `encoding` | `snappy` | Compression encoding. Currently only `snappy` is supported. |
84+
| `content_type` | `application/x-protobuf` | Content type of the request body. Currently only `application/x-protobuf` is supported. |
85+
| `remote_write_version` | `0.1.0` | Prometheus Remote Write protocol version. Currently only `0.1.0` is supported. |
86+
| `max_retries` | `5` | Maximum number of retry attempts for failed requests. Uses exponential backoff with jitter on retryable status codes (429, 502, 503, 504). |
87+
| `request_timeout` | `60s` | HTTP request timeout. Must be between 1s and 600s. |
88+
| `connection_timeout` | `60s` | TCP connection timeout. Must be between 1s and 600s. |
89+
| `idle_timeout` | `60s` | Connection idle timeout. Must be between 1s and 600s. |
90+
| `out_of_order_time_window` | `10s` | Time window for handling out-of-order metric samples. |
91+
| `sanitize_names` | `true` | Whether to sanitize metric names to be Prometheus-compliant. |
92+
| `threshold` | See below | Buffer threshold configuration. See [Threshold Configuration](#threshold-configuration). |
7093

71-
- `grant_type` (Optional) : This grant type refers to the way an application gets an access token. Example: client_credentials/refresh_token
94+
### <a name="threshold-configuration">Threshold Configuration</a>
7295

73-
- `scope` (Optional) : This scope limit an application's access to a user's account.
96+
| Option | Default | Description |
97+
|--------|---------|-------------|
98+
| `max_events` | `500` | Maximum number of events to buffer before flushing. |
99+
| `max_request_size` | `1048576` (1 MB) | Maximum request size in bytes before flushing. |
100+
| `flush_interval` | `10000` (ms) | Maximum time in milliseconds to wait before flushing the buffer. |
74101

75-
- `aws` (Optional) : AWS configurations. See [AWS Configuration](#aws_configuration) for details. SigV4 is enabled by default when this option is used. If this option is present, `aws_` options are not expected to be present. If any of `aws_` options are present along with this, error is thrown.
102+
### <a name="aws-configuration">AWS Configuration</a>
76103

77-
- `custom_header` (Optional) : A Map<String, List<String> for custom headers such as AWS Sagemaker etc
104+
When the `aws` block is present, requests are automatically signed with AWS SigV4. An `https://` URL is required.
78105

79-
- `dlq_file`(optional): A String of absolute file path for DLQ failed output records. Defaults to null.
80-
If not provided, failed records will be written into the default data-prepper log file (`logs/Data-Prepper.log`). If the `dlq` option is present along with this, an error is thrown.
106+
| Option | Required | Description |
107+
|--------|----------|-------------|
108+
| `region` | No | The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). |
109+
| `sts_role_arn` | No | The STS role to assume for requests to AWS. Defaults to null, which uses [standard SDK credential behavior](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). |
110+
| `sts_header_overrides` | No | A map of header overrides to make when assuming the IAM role. |
111+
| `sts_external_id` | No | An optional external ID to use when assuming the IAM role. |
81112

82-
- `dlq` (optional): DLQ configurations. See [DLQ](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/failures-common/src/main/java/org/opensearch/dataprepper/plugins/dlq/README.md) for details. If the `dlq_file` option is present along with this, an error is thrown.
113+
### <a name="authentication">Authentication</a>
83114

84-
- `max_retries`(optional): A number indicating the maximum number of times Prometheus Sink should try to push the data to the Http arbitrary endpoint before considering it as failure. Defaults to `Integer.MAX_VALUE`.
115+
The `authentication` block supports HTTP Basic authentication. It cannot be used together with `aws` (SigV4 signing).
85116

86-
- `request_timout`(optional): A duration that represents the request timeout. Example: 1000ms, 5s etc
87-
### Prometheus Sink full pipeline
88-
```
89-
sink:
90-
- prometheus:
91-
url: http/https arbitrary endpoint url
92-
encoding: snappy
93-
content-type: application/x-protobuf
94-
remote-write-version: 0.1.0
95-
proxy: proxy url
96-
http_method: "POST"
97-
auth_type: "unauthenticated"
98-
authentication:
99-
http_basic:
100-
username: "username"
101-
password: "password"
102-
bearer_token:
103-
client_id: 0oaafr4j79segd7
104-
client_secret: fFel-3FutCXAOndezEsOVlghoJ6w0wNoaYtgU17JdyXmGf0M
105-
token_url: token url
106-
grant_type: client_credentials
107-
scope:
108-
insecure: false
109-
insecure_skip_verify: false
110-
ssl_certificate_file: "/full/path/to/certfile.crt"
111-
buffer_type: "in_memory"
112-
use_acm_cert_for_ssl: false
113-
acm_certificate_arn:
114-
custom_header:
115-
header: ["value"]
116-
dlq_file : <dlq file with full path>
117-
dlq:
118-
s3:
119-
bucket:
120-
key_path_prefix:
121-
aws:
122-
region: "us-east-2"
123-
sts_role_arn: "arn:aws:iam::1234567890:role/data-prepper-s3source-execution-role"
124-
sigv4: false
125-
max_retries: 5
126-
request_timout: 20s
127-
```
117+
| Option | Description |
118+
|--------|-------------|
119+
| `http_basic.username` | The username for HTTP Basic authentication. |
120+
| `http_basic.password` | The password for HTTP Basic authentication. |
128121

129-
### SSL
122+
### End-to-End Acknowledgements
130123

131-
* insecure_skip_verify(Optional) => A `boolean` that enables mTLS/SSL. Default is ```false```.
132-
* If set to false then the user has two options:
133-
* Use default trust. This can allow for reaching many endpoints. The user does not need to provide any .crt/.key files.
134-
* Allow the user to specify a .crt file for a certificate (no .key is required because this is the client). By the user providing the .crt file, the user is stating he trusts that certificate. We will still verify the signature match.
135-
* If set to true, then skip any verification of the certificate. The user does not need to provide a .crt or .key file.
136-
* insecure (Optional) => A `boolean` that allows http/https endpoints. Default is ```false```.
137-
* If set to false, then only https:// URLs are permitted. Throw an InvalidPluginConfigurationException if the URL is configured with an http:// scheme in the URL.
138-
* If set to true, then the user can provide both http:// https:// as the scheme.
139-
* ssl_certificate_file(Optional) => A `String` that represents the SSL certificate chain file path or AWS S3 path. S3 path example `s3://<bucketName>/<path>`. Required if `ssl` is set to `true` and `use_acm_certificate_for_ssl` is set to `false`.
140-
* ssl_key_file(Optional) => A `String` that represents the SSL key file path or AWS S3 path. S3 path example `s3://<bucketName>/<path>`. Only decrypted key file is supported. Required if `ssl` is set to `true` and `use_acm_certificate_for_ssl` is set to `false`.
141-
* use_acm_certificate_for_ssl(Optional) : A `boolean` that enables mTLS/SSL using certificate and private key from AWS Certificate Manager (ACM). Default is `false`.
142-
* acm_certificate_arn(Optional) : A `String` that represents the ACM certificate ARN. ACM certificate take preference over S3 or local file system certificate. Required if `use_acm_certificate_for_ssl` is set to `true`.
143-
* acm_private_key_password(Optional): A `String` that represents the ACM private key password which that will be used to decrypt the private key. If it's not provided, a random password will be generated.
144-
* acm_certificate_timeout_millis(Optional) : An `int` that represents the timeout in milliseconds for ACM to get certificates. Default value is `120000`.
124+
If the events received by the Prometheus sink have end-to-end acknowledgements enabled (tracked via EventHandle), then upon successful posting a positive acknowledgement is sent, otherwise a negative acknowledgement is sent.
145125

146-
### <a name="aws_configuration">AWS Configuration</a>
126+
## Not Yet Implemented
147127

148-
* `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
149-
* `sts_role_arn` (Optional) : The STS role to assume for requests to AWS. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).
150-
* `sts_header_overrides` (Optional): A map of header overrides to make when assuming the IAM role for the sink plugin.
151-
* `sts_external_id` (Optional): An optional external ID to use when assuming an IAM role.
152-
* `sigv4`: A boolean flag to sign the HTTP request with AWS credentials. Default to `false`. For aws_sigv4, we don't need any auth_type or ssl
128+
The following features have configuration classes defined but are **not currently wired up**:
153129

154-
### End-to-End acknowledgements
130+
- Bearer token / OAuth authentication
131+
- Proxy support
132+
- SSL/TLS certificate configuration
133+
- Custom headers
134+
- DLQ (Dead Letter Queue) file/plugin support
155135

156-
If the events received by the Prometheus Sink have end-to-end acknowledgements enabled (which is tracked using the presence of EventHandle in the event received for processing), then upon successful posting to OpenSearch or upon successful write to DLQ, a positive acknowledgement is sent to the acknowledgementSetManager, otherwise a negative acknowledgement is sent.
136+
These are planned for future development.
157137

158138
## Developer Guide
159139

160-
This plugin is compatible with Java 8. See
140+
This plugin is compatible with Java 11. See:
161141

162142
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
163-
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
164-
The integration tests for this plugin do not run as part of the Data Prepper build.
143+
- [Monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
165144

166-
The following command runs the integration tests:
145+
### Build and Test
167146

168-
```
147+
```bash
148+
# Run unit tests
149+
./gradlew :data-prepper-plugins:prometheus-sink:test
150+
151+
# Run integration tests
169152
./gradlew :data-prepper-plugins:prometheus-sink:integrationTest -Dtests.prometheus.sink.http.endpoint=<http-endpoint>
153+
154+
# Code formatting
155+
./gradlew :data-prepper-plugins:prometheus-sink:spotlessApply
156+
157+
# Checkstyle
158+
./gradlew :data-prepper-plugins:prometheus-sink:checkstyleMain :data-prepper-plugins:prometheus-sink:checkstyleTest
170159
```

data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/PrometheusHttpSender.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import javax.annotation.Nonnull;
4040
import java.nio.charset.StandardCharsets;
4141
import java.net.URI;
42+
import java.util.Base64;
4243
import java.util.Set;
4344

4445
/**
@@ -59,6 +60,7 @@ public class PrometheusHttpSender {
5960
private final long idleTimeoutMillis;
6061
private final CompressionEngine compressionEngine;
6162
private final PrometheusSinkConfiguration config;
63+
private final String authHeader;
6264

6365
/**
6466
* Constructor for the PrometheusHttpSender.
@@ -80,7 +82,16 @@ public PrometheusHttpSender(@Nonnull final AwsCredentialsSupplier awsCredentials
8082
this.config = config;
8183
this.connectionTimeoutMillis = config.getConnectionTimeout().toMillis();
8284
this.idleTimeoutMillis = config.getIdleTimeout().toMillis();
85+
this.authHeader = buildAuthHeader(config);
86+
}
8387

88+
private static String buildAuthHeader(final PrometheusSinkConfiguration config) {
89+
if (config.getAuthentication() != null && config.getAuthentication().getHttpBasic() != null) {
90+
final String credentials = config.getAuthentication().getHttpBasic().getUsername()
91+
+ ":" + config.getAuthentication().getHttpBasic().getPassword();
92+
return "Basic " + Base64.getEncoder().encodeToString(credentials.getBytes(StandardCharsets.UTF_8));
93+
}
94+
return null;
8495
}
8596

8697
/**
@@ -160,16 +171,20 @@ public PrometheusPushResult pushToEndpoint(final byte[] payload) {
160171
}
161172

162173
private SdkHttpFullRequest createSdkHttpRequest(final String url, @Nonnull final byte[] payload) {
163-
return SdkHttpFullRequest.builder()
174+
final SdkHttpFullRequest.Builder builder = SdkHttpFullRequest.builder()
164175
.method(SdkHttpMethod.POST)
165176
.uri(URI.create(url))
166-
.putHeader("Content-Encoding", config.getEncoding().toString())
177+
.putHeader("Content-Encoding", config.getEncoding().name().toLowerCase())
167178
.putHeader("Content-Type", config.getContentType())
168179
.putHeader("X-Prometheus-Remote-Write-Version", config.getRemoteWriteVersion())
169-
.putHeader("x-amz-content-sha256","required")
170-
.contentStreamProvider(() -> SdkBytes.fromByteArray(payload).asInputStream())
171-
.build();
172-
180+
.contentStreamProvider(() -> SdkBytes.fromByteArray(payload).asInputStream());
181+
if (authHeader != null) {
182+
builder.putHeader("Authorization", authHeader);
183+
}
184+
if (signer != null) {
185+
builder.putHeader("x-amz-content-sha256", "required");
186+
}
187+
return builder.build();
173188
}
174189

175190
private HttpRequest buildHttpRequest(final byte[] payload) {

data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/configuration/PrometheusSinkConfiguration.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,13 @@ public class PrometheusSinkConfiguration {
3737
private static final Duration DEFAULT_OUT_OF_ORDER_TIME_WINDOW = Duration.ofSeconds(10);
3838

3939
@JsonProperty("aws")
40-
@NotNull
4140
@Valid
4241
private AwsConfig awsConfig;
4342

43+
@JsonProperty("authentication")
44+
@Valid
45+
private AuthenticationOptions authentication;
46+
4447
@NotNull
4548
@JsonProperty("url")
4649
private String url;
@@ -100,6 +103,10 @@ public AwsConfig getAwsConfig() {
100103
return awsConfig;
101104
}
102105

106+
public AuthenticationOptions getAuthentication() {
107+
return authentication;
108+
}
109+
103110
public int getMaxRetries() {
104111
return maxRetries;
105112
}
@@ -132,11 +139,25 @@ public Duration getIdleTimeout() {
132139
return idleTimeout;
133140
}
134141

135-
@AssertTrue(message = "encoding or content_type or remote_write_version is incorrect.")
142+
@AssertTrue(message = "Cannot use both AWS SigV4 and authentication options. Choose one.")
143+
boolean isValidAuthConfig() {
144+
return !(awsConfig != null && authentication != null);
145+
}
146+
147+
@AssertTrue(message = "encoding or content_type or remote_write_version or url is incorrect.")
136148
boolean isValidConfig() {
137-
return url.startsWith("https://") &&
149+
final boolean validUrl = url.startsWith("https://") || url.startsWith("http://");
150+
return validUrl &&
138151
encoding == CompressionOption.SNAPPY &&
139152
contentType.equals(X_PROTOBUF) &&
140153
remoteWriteVersion.equals(DEFAULT_REMOTE_WRITE_VERSION);
141154
}
155+
156+
@AssertTrue(message = "AWS configuration requires an https:// URL.")
157+
boolean isValidAwsConfig() {
158+
if (awsConfig != null) {
159+
return url != null && url.startsWith("https://");
160+
}
161+
return true;
162+
}
142163
}

0 commit comments

Comments
 (0)