Skip to content

Commit bb61dbe

Browse files
TomasLongokaimst
andauthored
Otel metrics source http service (#6604)
* Add HTTP service to otel_metrics_source Integrates an HTTP (non-gRPC) service into the OTel metrics source plugin, mirroring the existing pattern from otel_trace_source and otel_logs_source. Both gRPC and HTTP services now run on the same Armeria server. Key changes: - Add ArmeriaHttpService for handling HTTP metric export requests - Add HttpExceptionHandler for HTTP-specific error handling - Support compression, authentication, throttling, and health checks for HTTP - Add configurable http_path option - Refactor OTelMetricsSource to directly configure the server - Remove ConvertConfiguration (inlined into source) - Split monolithic test into focused test classes (gRPC, HTTP, RetryInfo) - Add E2E tests for HTTP, gRPC, protobuf, and unframed requests Signed-off-by: Tomas Longo <tlongo@sternad.de> Signed-off-by: Kai Sternad <kai@sternad.de> * Kepp constant handling consistent with other sources Signed-off-by: Kai Sternad <kai@sternad.de> * Add guard to httpPath, deduplicate health check 1. createServer() — configureHttpService() is now guarded with if (getHttpPath() != null), preventing the NPE 2. HTTP health check — moved from configureHttpService() into createServer(), so it registers when either httpPath or enableUnframedRequests is set (matching the OTelLogsSource pattern) 3. configureHttpService() — removed the duplicate health check registration Signed-off-by: Kai Sternad <kai@sternad.de> * Move healthCheck back to configureHttpService Signed-off-by: Kai Sternad <kai@sternad.de> * Incorporate review findings Signed-off-by: Tomas Longo <tlongo@sternad.de> * Deduplicate output format, decompose createServer Signed-off-by: Kai Sternad <kai@sternad.de> --------- Signed-off-by: Tomas Longo <tlongo@sternad.de> Signed-off-by: Kai Sternad <kai@sternad.de> Co-authored-by: Kai Sternad <kai@sternad.de>
1 parent bfda518 commit bb61dbe

22 files changed

Lines changed: 1849 additions & 1464 deletions

File tree

data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSourceConfigFixture.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS;
1515
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.BASIC_AUTH_PASSWORD;
1616
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.BASIC_AUTH_USERNAME;
17+
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.CONFIG_HTTP_PATH;
1718

1819
import java.util.Map;
1920

@@ -41,6 +42,7 @@ public static OTelLogsSourceConfig createDefaultConfig() {
4142
public static OTelLogsSourceConfig.OTelLogsSourceConfigBuilder createDefaultConfigBuilder() {
4243
return OTelLogsSourceConfig.builder()
4344
.healthCheck(true)
45+
.httpPath(CONFIG_HTTP_PATH)
4446
.port(DEFAULT_PORT)
4547
.enableUnframedRequests(false)
4648
.ssl(false)

data-prepper-plugins/otel-metrics-source/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ dependencies {
1515
implementation project(':data-prepper-plugins:armeria-common')
1616
implementation project(':data-prepper-plugins:otel-proto-common')
1717
implementation project(':data-prepper-plugins:http-common')
18+
implementation project(':data-prepper-plugins:http-source-common' )
1819
testImplementation project(':data-prepper-api').sourceSets.test.output
1920
implementation libs.opentelemetry.proto
2021
implementation libs.commons.io
@@ -34,6 +35,8 @@ dependencies {
3435
testImplementation 'org.assertj:assertj-core:3.27.7'
3536
testImplementation libs.commons.io
3637
testImplementation 'org.skyscreamer:jsonassert:1.5.3'
38+
implementation 'org.projectlombok:lombok:1.18.26'
39+
annotationProcessor 'org.projectlombok:lombok:1.18.26'
3740
}
3841

3942
jacocoTestCoverageVerification {

data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/ConvertConfiguration.java

Lines changed: 0 additions & 27 deletions
This file was deleted.

data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
510

611
package org.opensearch.dataprepper.plugins.source.otelmetrics;

data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java

Lines changed: 242 additions & 42 deletions
Large diffs are not rendered by default.

data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java

Lines changed: 25 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
510

611
package org.opensearch.dataprepper.plugins.source.otelmetrics;
712

813
import com.fasterxml.jackson.annotation.JsonProperty;
914
import jakarta.validation.constraints.AssertTrue;
1015
import jakarta.validation.constraints.Size;
16+
import lombok.AllArgsConstructor;
17+
import lombok.Builder;
18+
import lombok.Getter;
19+
import lombok.NoArgsConstructor;
20+
1121
import org.apache.commons.lang3.StringUtils;
1222
import org.opensearch.dataprepper.model.types.ByteCount;
1323
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
@@ -17,6 +27,11 @@
1727

1828
import java.util.Set;
1929

30+
31+
@Builder
32+
@AllArgsConstructor
33+
@NoArgsConstructor
34+
@Getter
2035
public class OTelMetricsSourceConfig {
2136
static final String REQUEST_TIMEOUT = "request_timeout";
2237
static final String PORT = "port";
@@ -52,6 +67,9 @@ public class OTelMetricsSourceConfig {
5267
static final String UNAUTHENTICATED_HEALTH_CHECK = "unauthenticated_health_check";
5368
private static final String NAME_KEY = "name";
5469
private static final String SERVICE_NAME_KEY = "service_name";
70+
static final String HTTP_PATH = "http_path";
71+
static final String AUTHENTICATION = "authentication";
72+
static final String MAX_REQUEST_LENGTH = "max_request_length";
5573

5674
@JsonProperty(REQUEST_TIMEOUT)
5775
private int requestTimeoutInMillis = DEFAULT_REQUEST_TIMEOUT_MS;
@@ -111,7 +129,7 @@ public class OTelMetricsSourceConfig {
111129
@JsonProperty(MAX_CONNECTION_COUNT)
112130
private int maxConnectionCount = DEFAULT_MAX_CONNECTION_COUNT;
113131

114-
@JsonProperty("authentication")
132+
@JsonProperty(AUTHENTICATION)
115133
private PluginModel authentication;
116134

117135
@JsonProperty(UNAUTHENTICATED_HEALTH_CHECK)
@@ -120,12 +138,16 @@ public class OTelMetricsSourceConfig {
120138
@JsonProperty(COMPRESSION)
121139
private CompressionOption compression = CompressionOption.NONE;
122140

123-
@JsonProperty("max_request_length")
141+
@JsonProperty(MAX_REQUEST_LENGTH)
124142
private ByteCount maxRequestLength;
125143

126144
@JsonProperty(RETRY_INFO)
127145
private RetryInfoConfig retryInfo;
128146

147+
@JsonProperty(HTTP_PATH)
148+
@Size(min = 1, message = "path length should be at least 1")
149+
private String httpPath;
150+
129151
@AssertTrue(message = "buffer_partition_keys only supports 'name' and 'service_name'. 'name' is mandatory")
130152
boolean isBufferKeysValid() {
131153
if (bufferPartitionKeys == null) {
@@ -173,106 +195,8 @@ private boolean isSSLCertificateLocatedInS3() {
173195
sslKeyFile.toLowerCase().startsWith(S3_PREFIX);
174196
}
175197

176-
public int getRequestTimeoutInMillis() {
177-
return requestTimeoutInMillis;
178-
}
179-
180-
public int getPort() {
181-
return port;
182-
}
183-
184-
public String getPath() {
185-
return path;
186-
}
187-
188-
public boolean hasHealthCheck() {
189-
return healthCheck;
190-
}
191-
192-
public Set<String> getBufferPartitionKeys() {
193-
return bufferPartitionKeys;
194-
}
195-
196198
public boolean enableHttpHealthCheck() {
197-
return enableUnframedRequests() && hasHealthCheck();
198-
}
199-
200-
public boolean hasProtoReflectionService() {
201-
return protoReflectionService;
202-
}
203-
204-
public boolean enableUnframedRequests() {
205-
return enableUnframedRequests;
206-
}
207-
208-
public boolean isSsl() {
209-
return ssl;
210-
}
211-
212-
public OTelOutputFormat getOutputFormat() {
213-
return outputFormat;
214-
}
215-
216-
public boolean useAcmCertForSSL() {
217-
return useAcmCertForSSL;
218-
}
219-
220-
public long getAcmCertIssueTimeOutMillis() {
221-
return acmCertIssueTimeOutMillis;
222-
}
223-
224-
public String getSslKeyCertChainFile() {
225-
return sslKeyCertChainFile;
226-
}
227-
228-
public String getSslKeyFile() {
229-
return sslKeyFile;
230-
}
231-
232-
public String getAcmCertificateArn() {
233-
return acmCertificateArn;
234-
}
235-
236-
public String getAcmPrivateKeyPassword() {
237-
return acmPrivateKeyPassword;
238-
}
239-
240-
public boolean isSslCertAndKeyFileInS3() {
241-
return sslCertAndKeyFileInS3;
242-
}
243-
244-
public String getAwsRegion() {
245-
return awsRegion;
246-
}
247-
248-
public int getThreadCount() {
249-
return threadCount;
250-
}
251-
252-
public int getMaxConnectionCount() {
253-
return maxConnectionCount;
254-
}
255-
256-
public PluginModel getAuthentication() { return authentication; }
257-
258-
public boolean isUnauthenticatedHealthCheck() {
259-
return unauthenticatedHealthCheck;
260-
}
261-
262-
public CompressionOption getCompression() {
263-
return compression;
264-
}
265-
266-
public ByteCount getMaxRequestLength() {
267-
return maxRequestLength;
268-
}
269-
270-
public RetryInfoConfig getRetryInfo() {
271-
return retryInfo;
272-
}
273-
274-
public void setRetryInfo(RetryInfoConfig retryInfo) {
275-
this.retryInfo = retryInfo;
199+
return isEnableUnframedRequests() && isHealthCheck();
276200
}
277201
}
278202

data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/certificate/CertificateProviderFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public CertificateProviderFactory(final OTelMetricsSourceConfig oTelMetricsSourc
4242

4343
public CertificateProvider getCertificateProvider() {
4444
// ACM Cert for SSL takes preference
45-
if (oTelMetricsSourceConfig.useAcmCertForSSL()) {
45+
if (oTelMetricsSourceConfig.isUseAcmCertForSSL()) {
4646
LOG.info("Using ACM certificate and private key for SSL/TLS.");
4747
final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder()
4848
.addCredentialsProvider(DefaultCredentialsProvider.create()).build();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.otelmetrics.http;
12+
13+
import java.time.Instant;
14+
import java.util.Collection;
15+
16+
import org.opensearch.dataprepper.exceptions.BadRequestException;
17+
import org.opensearch.dataprepper.exceptions.BufferWriteException;
18+
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
19+
import org.opensearch.dataprepper.metrics.PluginMetrics;
20+
import org.opensearch.dataprepper.model.buffer.Buffer;
21+
import org.opensearch.dataprepper.model.metric.Metric;
22+
import org.opensearch.dataprepper.model.record.Record;
23+
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import com.linecorp.armeria.server.ServiceRequestContext;
28+
import com.linecorp.armeria.server.annotation.ConsumesJson;
29+
import com.linecorp.armeria.server.annotation.ConsumesProtobuf;
30+
import com.linecorp.armeria.server.annotation.Post;
31+
32+
import io.micrometer.core.instrument.Counter;
33+
import io.micrometer.core.instrument.DistributionSummary;
34+
import io.micrometer.core.instrument.Timer;
35+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
36+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
37+
38+
public class ArmeriaHttpService {
39+
private static final Logger LOG = LoggerFactory.getLogger(ArmeriaHttpService.class);
40+
41+
public static final String REQUESTS_RECEIVED = "requestsReceived";
42+
public static final String SUCCESS_REQUESTS = "successRequests";
43+
public static final String PAYLOAD_SIZE = "payloadSize";
44+
public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration";
45+
46+
private final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder;
47+
private final Buffer<Record<? extends Metric>> buffer;
48+
49+
private final int bufferWriteTimeoutInMillis;
50+
51+
private final Counter requestsReceivedCounter;
52+
private final Counter successRequestsCounter;
53+
private final DistributionSummary payloadSizeSummary;
54+
private final Timer requestProcessDuration;
55+
56+
public ArmeriaHttpService(Buffer<Record<? extends Metric>> buffer,
57+
final PluginMetrics pluginMetrics,
58+
final int bufferWriteTimeoutInMillis,
59+
final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder ) {
60+
this.buffer = buffer;
61+
this.oTelProtoDecoder = oTelProtoDecoder;
62+
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
63+
64+
requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
65+
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
66+
payloadSizeSummary = pluginMetrics.summary(PAYLOAD_SIZE);
67+
requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION);
68+
}
69+
70+
// no path provided. Will be set by config.
71+
@Post("")
72+
@ConsumesJson
73+
@ConsumesProtobuf
74+
public ExportMetricsServiceResponse exportMetrics(ExportMetricsServiceRequest request) {
75+
requestsReceivedCounter.increment();
76+
payloadSizeSummary.record(request.getSerializedSize());
77+
78+
requestProcessDuration.record(() -> processRequest(request));
79+
80+
return ExportMetricsServiceResponse.newBuilder().build();
81+
}
82+
83+
private void processRequest(final ExportMetricsServiceRequest request) {
84+
final Collection<Record<? extends Metric>> metrics;
85+
86+
try {
87+
metrics = oTelProtoDecoder.parseExportMetricsServiceRequest(request, Instant.now());
88+
} catch (Exception e) {
89+
LOG.warn(DataPrepperMarkers.SENSITIVE, "Failed to parse the request with error {}. Request body: {}", e, request);
90+
throw new BadRequestException(e.getMessage(), e);
91+
}
92+
93+
try {
94+
if (buffer.isByteBuffer()) {
95+
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
96+
} else {
97+
buffer.writeAll(metrics, bufferWriteTimeoutInMillis);
98+
}
99+
} catch (Exception e) {
100+
if (ServiceRequestContext.current().isTimedOut()) {
101+
LOG.warn("Exception writing to buffer but request already timed out.", e);
102+
return;
103+
}
104+
105+
LOG.error("Failed to write the request of size {} due to:", request.toString().length(), e);
106+
throw new BufferWriteException(e.getMessage(), e);
107+
}
108+
109+
if (ServiceRequestContext.current().isTimedOut()) {
110+
LOG.warn("Buffer write completed successfully but request already timed out.");
111+
return;
112+
}
113+
114+
successRequestsCounter.increment();
115+
}
116+
}

0 commit comments

Comments
 (0)