diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationConfig.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationConfig.java new file mode 100644 index 0000000000..8da071b996 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationConfig.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.testcustomauth; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class TestCustomAuthenticationConfig { + private final String customToken; + private final String header; + + @JsonCreator + public TestCustomAuthenticationConfig( + @JsonProperty("custom_token") String customToken, + @JsonProperty("header") String header) { + this.customToken = customToken; + this.header = header != null ? header : "authentication"; + } + + public String customToken() { + return customToken; + } + + public String header() { + return header; + } +} diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationProvider.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationProvider.java new file mode 100644 index 0000000000..5e4acda4c4 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationProvider.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.testcustomauth; + +import com.linecorp.armeria.server.HttpService; +import io.grpc.ServerInterceptor; + +import java.util.Optional; +import java.util.function.Function; + +public interface TestCustomAuthenticationProvider { + + String UNAUTHENTICATED_PLUGIN_NAME = "unauthenticated"; + + + ServerInterceptor getAuthenticationInterceptor(); + + default Optional> getHttpAuthenticationService() { + return Optional.empty(); + } +} + diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationProviderTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationProviderTest.java new file mode 100644 index 0000000000..4ab88dbe06 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomAuthenticationProviderTest.java @@ -0,0 +1,43 @@ +package org.opensearch.dataprepper.plugins.testcustomauth; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TestCustomAuthenticationProviderTest { + + private static final String TOKEN = "test-token"; + private static final String HEADER = "authentication"; + + @Mock + private TestCustomAuthenticationConfig config; + + private TestCustomGrpcAuthenticationProvider provider; + + @BeforeEach + void setUp() { + when(config.customToken()).thenReturn(TOKEN); + when(config.header()).thenReturn(HEADER); + + provider = new TestCustomGrpcAuthenticationProvider(config); + } + + @Test + void testGetHttpAuthenticationService_shouldReturnValidOptional() { + var optionalService = provider.getHttpAuthenticationService(); + Assertions.assertTrue(optionalService.isPresent()); + } + + @Test + void testGetAuthenticationInterceptor_shouldReturnNonNull() { + var interceptor = provider.getAuthenticationInterceptor(); + Assertions.assertNotNull(interceptor); + } +} + diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomBasicAuthenticationProviderTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomBasicAuthenticationProviderTest.java new file mode 100644 index 0000000000..ea431971f5 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomBasicAuthenticationProviderTest.java @@ -0,0 +1,138 @@ +package org.opensearch.dataprepper.plugins.testcustomauth; + +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import io.grpc.ServerInterceptors; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthGrpc; +import io.grpc.stub.StreamObserver; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; + +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestCustomBasicAuthenticationProviderTest { + private static final String TOKEN = UUID.randomUUID().toString(); + private static final String HEADER_NAME = "x-" + UUID.randomUUID(); + private static GrpcAuthenticationProvider grpcAuthenticationProvider; + + @RegisterExtension + static ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + TestCustomAuthenticationConfig config = mock(TestCustomAuthenticationConfig.class); + when(config.customToken()).thenReturn(TOKEN); + when(config.header()).thenReturn(HEADER_NAME); + + grpcAuthenticationProvider = new TestCustomGrpcAuthenticationProvider(config); + + GrpcServiceBuilder grpcServiceBuilder = GrpcService.builder() + .enableUnframedRequests(true) + .addService(ServerInterceptors.intercept( + new SampleHealthGrpcService(), + Collections.singletonList(grpcAuthenticationProvider.getAuthenticationInterceptor()))); + + sb.service(grpcServiceBuilder.build()); + } + }; + + private static class SampleHealthGrpcService extends HealthGrpc.HealthImplBase { + @Override + public void check(HealthCheckRequest request, StreamObserver responseObserver) { + responseObserver.onNext( + HealthCheckResponse.newBuilder().setStatus(HealthCheckResponse.ServingStatus.SERVING).build()); + responseObserver.onCompleted(); + } + } + + @Nested + class ConstructorTests { + TestCustomAuthenticationConfig config; + + @BeforeEach + void setUp() { + config = mock(TestCustomAuthenticationConfig.class); + } + + @Test + void constructor_with_null_config_throws() { + assertThrows(NullPointerException.class, () -> new TestCustomGrpcAuthenticationProvider(null)); + } + } + + @Nested + class WithServer { + @Test + void request_without_token_responds_Unauthorized() { + WebClient client = WebClient.of(server.httpUri()); + HttpRequest request = HttpRequest.of(RequestHeaders.builder() + .method(HttpMethod.POST) + .path("/grpc.health.v1.Health/Check") + .contentType(MediaType.JSON_UTF_8) + .build()); + + final AggregatedHttpResponse httpResponse = client.execute(request).aggregate().join(); + + assertThat(httpResponse.status(), equalTo(HttpStatus.UNAUTHORIZED)); + } + + @Test + void request_with_invalid_token_responds_Unauthorized() { + WebClient client = WebClient.builder(server.httpUri()) + .addHeader(HEADER_NAME, "invalid-token") + .build(); + + HttpRequest request = HttpRequest.of(RequestHeaders.builder() + .method(HttpMethod.POST) + .path("/grpc.health.v1.Health/Check") + .contentType(MediaType.JSON_UTF_8) + .build()); + + final AggregatedHttpResponse httpResponse = client.execute(request).aggregate().join(); + + assertThat(httpResponse.status(), equalTo(HttpStatus.UNAUTHORIZED)); + } + + @Test + void request_with_valid_token_responds_OK() { + WebClient client = WebClient.builder(server.httpUri()) + .addHeader(HEADER_NAME, TOKEN) + .build(); + + HttpRequest request = HttpRequest.of(RequestHeaders.builder() + .method(HttpMethod.POST) + .path("/grpc.health.v1.Health/Check") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.of(Charset.defaultCharset(), "{\"healthCheckConfig\":{\"serviceName\": \"test\"} }")); + + + final AggregatedHttpResponse httpResponse = client.execute(request).aggregate().join(); + + assertThat(httpResponse.status(), equalTo(HttpStatus.OK)); + } + } +} + diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomGrpcAuthenticationProvider.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomGrpcAuthenticationProvider.java new file mode 100644 index 0000000000..c35c013471 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestCustomGrpcAuthenticationProvider.java @@ -0,0 +1,86 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.testcustomauth; + +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.server.HttpService; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; + +import java.util.Optional; +import java.util.function.Function; + +@DataPrepperPlugin( + name = "test_custom_auth", + pluginType = GrpcAuthenticationProvider.class, + pluginConfigurationType = TestCustomAuthenticationConfig.class +) +public class TestCustomGrpcAuthenticationProvider implements GrpcAuthenticationProvider { + private final String token; + private final String header; + + @DataPrepperPluginConstructor + public TestCustomGrpcAuthenticationProvider(final TestCustomAuthenticationConfig config) { + this.token = config.customToken(); + this.header = config.header(); + } + + @Override + public ServerInterceptor getAuthenticationInterceptor() { + return new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next) { + + String auth = headers.get(Metadata.Key.of(header, Metadata.ASCII_STRING_MARSHALLER)); + + if (!isValid(auth)) { + call.close(Status.UNAUTHENTICATED.withDescription("Invalid token"), new Metadata()); + return new ServerCall.Listener<>() {}; + } + + return next.startCall(call, headers); + } + }; + } + + @Override + public Optional> getHttpAuthenticationService() { + return Optional.of(delegate -> (ctx, req) -> { + final String auth = req.headers().get(header); + if (!isValid(auth)) { + return HttpResponse.of( + HttpStatus.UNAUTHORIZED, + MediaType.PLAIN_TEXT_UTF_8, + "Unauthorized: Invalid or missing token" + ); + } + return delegate.serve(ctx, req); + }); + } + + /** + * Checks if the provided authentication token is valid. + * + * @param authHeader the value of the authentication header + * @return true if valid, false otherwise + */ + private boolean isValid(final String authHeader) { + return authHeader != null && authHeader.equals(token); + } +} + + diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestUnauthenticatedCustomAuthenticationProviderTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestUnauthenticatedCustomAuthenticationProviderTest.java new file mode 100644 index 0000000000..ee69d6255f --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestUnauthenticatedCustomAuthenticationProviderTest.java @@ -0,0 +1,90 @@ +package org.opensearch.dataprepper.plugins.testcustomauth; + +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import io.grpc.ServerInterceptors; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthGrpc; +import io.grpc.stub.StreamObserver; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; +import org.opensearch.dataprepper.plugins.UnauthenticatedGrpcAuthenticationProvider; + +import java.nio.charset.Charset; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +class TestUnauthenticatedCustomAuthenticationProviderTest { + private static GrpcAuthenticationProvider grpcAuthenticationProvider; + + @RegisterExtension + static ServerExtension server = new ServerExtension() { + @Override + protected void configure(final ServerBuilder sb) { + grpcAuthenticationProvider = new UnauthenticatedGrpcAuthenticationProvider(); + + final GrpcServiceBuilder grpcServiceBuilder = GrpcService + .builder() + .enableUnframedRequests(true) + .addService(ServerInterceptors.intercept(new SampleHealthGrpcService(), grpcAuthenticationProvider.getAuthenticationInterceptor())); + sb.service(grpcServiceBuilder.build()); + } + }; + + private static class SampleHealthGrpcService extends HealthGrpc.HealthImplBase { + @Override + public void check(final HealthCheckRequest request, final StreamObserver responseObserver) { + responseObserver.onNext( + HealthCheckResponse.newBuilder().setStatus(HealthCheckResponse.ServingStatus.SERVING).build()); + responseObserver.onCompleted(); + } + } + + @Test + void httpRequest_without_authentication_responds_OK() { + final WebClient client = WebClient.of(server.httpUri()); + + HttpRequest request = HttpRequest.of(RequestHeaders.builder() + .method(HttpMethod.POST) + .path("/grpc.health.v1.Health/Check") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.of(Charset.defaultCharset(), "{\"healthCheckConfig\":{\"serviceName\": \"test\"} }")); + + final AggregatedHttpResponse httpResponse = client.execute(request).aggregate().join(); + + assertThat(httpResponse.status(), equalTo(HttpStatus.OK)); + } + + @Test + void httpRequest_with_random_authentication_responds_OK() { + final WebClient client = WebClient.builder(server.httpUri()) + .addHeader("authorization", UUID.randomUUID().toString()) + .build(); + + HttpRequest request = HttpRequest.of(RequestHeaders.builder() + .method(HttpMethod.POST) + .path("/grpc.health.v1.Health/Check") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.of(Charset.defaultCharset(), "{\"healthCheckConfig\":{\"serviceName\": \"test\"} }")); + + final AggregatedHttpResponse httpResponse = client.execute(request).aggregate().join(); + + assertThat(httpResponse.status(), equalTo(HttpStatus.OK)); + } +} diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestUnauthenticatedCustomGrpcAuthenticationProvider.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestUnauthenticatedCustomGrpcAuthenticationProvider.java new file mode 100644 index 0000000000..42a09cdb34 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/plugins/testcustomauth/TestUnauthenticatedCustomGrpcAuthenticationProvider.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.testcustomauth; + +import io.grpc.ServerInterceptor; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.Metadata; +import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; + + +/** + * Plugin that allows unauthenticated gRPC access. + */ +@DataPrepperPlugin( + name = GrpcAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, + pluginType = GrpcAuthenticationProvider.class +) +public class TestUnauthenticatedCustomGrpcAuthenticationProvider implements GrpcAuthenticationProvider { + + @Override + public ServerInterceptor getAuthenticationInterceptor() { + return new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next) { + // No authentication is performed; allow the request to continue + return next.startCall(call, headers); + } + }; + } +} diff --git a/data-prepper-plugins/http-common/build.gradle b/data-prepper-plugins/http-common/build.gradle index 54fa5d346d..f9242b135b 100644 --- a/data-prepper-plugins/http-common/build.gradle +++ b/data-prepper-plugins/http-common/build.gradle @@ -5,7 +5,21 @@ dependencies { implementation 'org.apache.httpcomponents:httpcore:4.4.16' + compileOnly 'org.projectlombok:lombok:1.18.30' + annotationProcessor 'org.projectlombok:lombok:1.18.30' + implementation project(':data-prepper-api') + implementation project(':data-prepper-plugins:blocking-buffer') + implementation project(':data-prepper-plugins:armeria-common') + implementation project(':data-prepper-plugins:common') + implementation project(':data-prepper-plugins:http-source-common' ) + implementation project(':data-prepper-plugins:otel-proto-common') + implementation libs.opentelemetry.proto + implementation libs.armeria.core + implementation libs.armeria.grpc + implementation libs.grpc.inprocess + implementation libs.protobuf.util testImplementation testLibs.bundles.junit + } jacocoTestCoverageVerification { diff --git a/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/CreateServer.java b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/CreateServer.java new file mode 100644 index 0000000000..60bf6fba97 --- /dev/null +++ b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/CreateServer.java @@ -0,0 +1,231 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.server; + + +import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction; +import com.linecorp.armeria.common.util.BlockingTaskExecutor; +import com.linecorp.armeria.server.HttpService; +import com.linecorp.armeria.server.Server; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.encoding.DecodingService; +import com.linecorp.armeria.server.grpc.GrpcService; +import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; +import com.linecorp.armeria.server.healthcheck.HealthCheckService; +import com.linecorp.armeria.server.throttling.ThrottlingService; +import io.grpc.BindableService; +import io.grpc.MethodDescriptor; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import io.grpc.protobuf.services.ProtoReflectionService; +import org.opensearch.dataprepper.GrpcRequestExceptionHandler; +import org.opensearch.dataprepper.HttpRequestExceptionHandler; +import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; +import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; +import org.opensearch.dataprepper.http.LogThrottlingRejectHandler; +import org.opensearch.dataprepper.http.LogThrottlingStrategy; +import org.opensearch.dataprepper.http.certificate.CertificateProviderFactory; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.log.Log; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; +import org.opensearch.dataprepper.plugins.certificate.model.Certificate; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.slf4j.Logger; + +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.function.Function; + + +public class CreateServer { + private final ServerConfiguration serverConfiguration; + private final Logger LOG; + private final PluginMetrics pluginMetrics; + private String sourceName; + private String pipelineName; + + private static final String HTTP_HEALTH_CHECK_PATH = "/health"; + private static final String REGEX_HEALTH = "regex:^/(?!health$).*$"; + private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}"; + + private static final RetryInfoConfig DEFAULT_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(100), Duration.ofMillis(2000)); + + public CreateServer(final ServerConfiguration serverConfiguration, final Logger LOG, final PluginMetrics pluginMetrics, final String sourceName, final String pipelineName) { + this.serverConfiguration = serverConfiguration; + this.LOG = LOG; + this.pluginMetrics = pluginMetrics; + this.sourceName = sourceName; + this.pipelineName = pipelineName; + } + + public Server createGRPCServer(final GrpcAuthenticationProvider authenticationProvider, final BindableService grpcService, final CertificateProvider certificateProvider, final MethodDescriptor methodDescriptor) { + final List serverInterceptors = getAuthenticationInterceptor(authenticationProvider); + + final GrpcServiceBuilder grpcServiceBuilder = GrpcService + .builder() + .useClientTimeoutHeader(false) + .useBlockingTaskExecutor(true) + .exceptionHandler(createGrpExceptionHandler()); + + final String sourcePath = serverConfiguration.getPath(); + if (sourcePath != null) { + final String transformedSourcePath = sourcePath.replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); + grpcServiceBuilder.addService(transformedSourcePath, + ServerInterceptors.intercept(grpcService, serverInterceptors), methodDescriptor); + } else { + grpcServiceBuilder.addService(ServerInterceptors.intercept(grpcService, serverInterceptors)); + } + + if (serverConfiguration.hasHealthCheck()) { + LOG.info("Health check is enabled"); + grpcServiceBuilder.addService(new HealthGrpcService()); + } + + if (serverConfiguration.hasProtoReflectionService()) { + LOG.info("Proto reflection service is enabled"); + grpcServiceBuilder.addService(ProtoReflectionService.newInstance()); + } + + grpcServiceBuilder.enableUnframedRequests(serverConfiguration.enableUnframedRequests()); + + final ServerBuilder sb = Server.builder(); + sb.disableServerHeader(); + if (CompressionOption.NONE.equals(serverConfiguration.getCompression())) { + sb.service(grpcServiceBuilder.build()); + } else { + sb.service(grpcServiceBuilder.build(), DecodingService.newDecorator()); + } + + if (serverConfiguration.enableHttpHealthCheck()) { + sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build()); + } + + if(serverConfiguration.getAuthentication() != null) { + final Optional> optionalHttpAuthenticationService = + authenticationProvider.getHttpAuthenticationService(); + + if(serverConfiguration.isUnauthenticatedHealthCheck()) { + optionalHttpAuthenticationService.ifPresent(httpAuthenticationService -> + sb.decorator(REGEX_HEALTH, httpAuthenticationService)); + } else { + optionalHttpAuthenticationService.ifPresent(sb::decorator); + } + } + + sb.requestTimeoutMillis(serverConfiguration.getRequestTimeoutInMillis()); + if(serverConfiguration.getMaxRequestLength() != null) { + sb.maxRequestLength(serverConfiguration.getMaxRequestLength().getBytes()); + } + + // ACM Cert for SSL takes preference + if (serverConfiguration.isSsl() || serverConfiguration.useAcmCertForSSL()) { + LOG.info("SSL/TLS is enabled."); + final Certificate certificate = certificateProvider.getCertificate(); + sb.https(serverConfiguration.getPort()).tls( + new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)), + new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8) + ) + ); + } else { + LOG.warn("Creating " + sourceName + " without SSL/TLS. This is not secure."); + LOG.warn("In order to set up TLS for the " + sourceName + ", go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-trace-source#ssl"); + sb.http(serverConfiguration.getPort()); + } + + final BlockingTaskExecutor blockingTaskExecutor = BlockingTaskExecutor.builder() + .numThreads(serverConfiguration.getThreadCount()) + .threadNamePrefix(pipelineName + sourceName) + .build(); + sb.blockingTaskExecutor(blockingTaskExecutor, true); + + return sb.build(); + } + + public Server createHTTPServer(final Buffer> buffer, final CertificateProviderFactory certificateProviderFactory, final ArmeriaHttpAuthenticationProvider authenticationProvider, final HttpRequestExceptionHandler httpRequestExceptionHandler, final Object logService) { + final ServerBuilder sb = Server.builder(); + + sb.disableServerHeader(); + + if (serverConfiguration.isSsl()) { + LOG.info("Creating http source with SSL/TLS enabled."); + final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); + final Certificate certificate = certificateProvider.getCertificate(); + // TODO: enable encrypted key with password + sb.https(serverConfiguration.getPort()).tls( + new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)), + new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8) + ) + ); + } else { + LOG.warn("Creating http source without SSL/TLS. This is not secure."); + LOG.warn("In order to set up TLS for the http source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl"); + sb.http(serverConfiguration.getPort()); + } + + if(serverConfiguration.getAuthentication() != null) { + final Optional> optionalAuthDecorator = authenticationProvider.getAuthenticationDecorator(); + + if (serverConfiguration.isUnauthenticatedHealthCheck()) { + optionalAuthDecorator.ifPresent(authDecorator -> sb.decorator(REGEX_HEALTH, authDecorator)); + } else { + optionalAuthDecorator.ifPresent(sb::decorator); + } + } + + sb.maxNumConnections(serverConfiguration.getMaxConnectionCount()); + sb.requestTimeout(Duration.ofMillis(serverConfiguration.getRequestTimeoutInMillis())); + if(serverConfiguration.getMaxRequestLength() != null) { + sb.maxRequestLength(serverConfiguration.getMaxRequestLength().getBytes()); + } + final int threads = serverConfiguration.getThreadCount(); + final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads); + sb.blockingTaskExecutor(blockingTaskExecutor, true); + final int maxPendingRequests = serverConfiguration.getMaxPendingRequests(); + final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy( + maxPendingRequests, blockingTaskExecutor.getQueue()); + final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics); + + final String httpSourcePath = serverConfiguration.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); + sb.decorator(httpSourcePath, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler)); + + if (CompressionOption.NONE.equals(serverConfiguration.getCompression())) { + sb.annotatedService(httpSourcePath, logService, httpRequestExceptionHandler); + } else { + sb.annotatedService(httpSourcePath, logService, DecodingService.newDecorator(), httpRequestExceptionHandler); + } + + if (serverConfiguration.hasHealthCheck()) { + LOG.info("HTTP source health check is enabled"); + sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build()); + } + + return sb.build(); + } + + private GrpcExceptionHandlerFunction createGrpExceptionHandler() { + RetryInfoConfig retryInfo = serverConfiguration.getRetryInfo() != null + ? serverConfiguration.getRetryInfo() + : DEFAULT_RETRY_INFO; + + return new GrpcRequestExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay()); + } + + private List getAuthenticationInterceptor(final GrpcAuthenticationProvider authenticationProvider) { + final ServerInterceptor authenticationInterceptor = authenticationProvider.getAuthenticationInterceptor(); + if (authenticationInterceptor == null) { + return Collections.emptyList(); + } + return Collections.singletonList(authenticationInterceptor); + } +} +// \ No newline at end of file diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/health/HealthGrpcService.java b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/HealthGrpcService.java similarity index 92% rename from data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/health/HealthGrpcService.java rename to data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/HealthGrpcService.java index adce5be0b9..6642501936 100644 --- a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/health/HealthGrpcService.java +++ b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/HealthGrpcService.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.health; +package org.opensearch.dataprepper.plugins.server; import io.grpc.health.v1.HealthCheckRequest; import io.grpc.health.v1.HealthCheckResponse; diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/RetryInfoConfig.java b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/RetryInfoConfig.java similarity index 66% rename from data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/RetryInfoConfig.java rename to data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/RetryInfoConfig.java index 7418a909f3..9482abb884 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/RetryInfoConfig.java +++ b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/RetryInfoConfig.java @@ -1,10 +1,17 @@ -package org.opensearch.dataprepper.plugins.source.oteltrace; +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.server; import java.time.Duration; import com.fasterxml.jackson.annotation.JsonProperty; public class RetryInfoConfig { + final Duration DEFAULT_MIN_DELAY = Duration.ofMillis(100); + final Duration DEFAULT_MAX_DELAY = Duration.ofMillis(2000); @JsonProperty(value = "min_delay", defaultValue = "100ms") private Duration minDelay; @@ -12,8 +19,10 @@ public class RetryInfoConfig { @JsonProperty(value = "max_delay", defaultValue = "2s") private Duration maxDelay; - // Jackson needs this constructor - public RetryInfoConfig() {} + public RetryInfoConfig() { + this.minDelay = DEFAULT_MIN_DELAY; + this.maxDelay = DEFAULT_MAX_DELAY; + } public RetryInfoConfig(Duration minDelay, Duration maxDelay) { this.minDelay = minDelay; diff --git a/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/ServerConfiguration.java b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/ServerConfiguration.java new file mode 100644 index 0000000000..ffb9788059 --- /dev/null +++ b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/ServerConfiguration.java @@ -0,0 +1,110 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.server; + +import lombok.Getter; +import lombok.Setter; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; + +public class ServerConfiguration { + static final int DEFAULT_REQUEST_TIMEOUT_MS = 10000; + static final boolean DEFAULT_ENABLED_UNFRAMED_REQUESTS = false; + static final boolean DEFAULT_HEALTH_CHECK = false; + static final boolean DEFAULT_PROTO_REFLECTION_SERVICE = false; + static final boolean DEFAULT_SSL = true; + static final boolean DEFAULT_USE_ACM_CERT_FOR_SSL = false; + static final int DEFAULT_THREAD_COUNT = 200; + static final int DEFAULT_MAX_PENDING_REQUESTS = 1024; + static final int DEFAULT_MAX_CONNECTION_COUNT = 500; + static final double BUFFER_TIMEOUT_FRACTION = 0.8; + + @Setter + @Getter + private String path; + + @Setter + private boolean healthCheck = DEFAULT_HEALTH_CHECK; + + @Setter + private boolean protoReflectionService = DEFAULT_PROTO_REFLECTION_SERVICE; + + @Getter + @Setter + private int requestTimeoutInMillis = DEFAULT_REQUEST_TIMEOUT_MS; + + @Setter + private boolean enableUnframedRequests = DEFAULT_ENABLED_UNFRAMED_REQUESTS; + + @Setter + @Getter + private CompressionOption compression = CompressionOption.NONE; + + @Setter + @Getter + private PluginModel authentication; + + @Setter + @Getter + private boolean ssl = DEFAULT_SSL; + + @Setter + @Getter + private boolean unauthenticatedHealthCheck = false; + + @Getter + @Setter + private boolean useAcmCertForSSL = DEFAULT_USE_ACM_CERT_FOR_SSL; + + @Getter + @Setter + private ByteCount maxRequestLength; + + @Getter + @Setter + private Integer port; + + @Getter + @Setter + private RetryInfoConfig retryInfo; + + @Getter + @Setter + private int threadCount = DEFAULT_THREAD_COUNT; + + @Getter + @Setter + private int maxConnectionCount = DEFAULT_MAX_CONNECTION_COUNT; + + @Getter + @Setter + private int maxPendingRequests = DEFAULT_MAX_PENDING_REQUESTS; + + @Getter + @Setter + private int bufferTimeoutInMillis; + + public boolean hasHealthCheck() { + return healthCheck; + } + + public boolean enableHttpHealthCheck() { + return enableUnframedRequests() && hasHealthCheck(); + } + + public boolean hasProtoReflectionService() { + return protoReflectionService; + } + + public boolean enableUnframedRequests() { + return enableUnframedRequests; + } + + public boolean useAcmCertForSSL() { + return useAcmCertForSSL; + } +} diff --git a/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/server/CreateServerTest.java b/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/server/CreateServerTest.java new file mode 100644 index 0000000000..eaaae6a0df --- /dev/null +++ b/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/server/CreateServerTest.java @@ -0,0 +1,218 @@ +package org.opensearch.dataprepper.plugins.server; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.server.HttpService; +import com.linecorp.armeria.server.Server; +import com.linecorp.armeria.server.ServiceRequestContext; +import io.grpc.ServerInterceptor; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.HttpRequestExceptionHandler; +import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; +import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; +import org.opensearch.dataprepper.http.certificate.CertificateProviderFactory; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.log.Log; +import org.opensearch.dataprepper.model.metric.Metric; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; +import org.opensearch.dataprepper.plugins.certificate.model.Certificate; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoStandardCodec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class CreateServerTest { + private Logger LOG = LoggerFactory.getLogger(CreateServer.class); + ObjectMapper objectMapper; + private final String TEST_SSL_CERTIFICATE_FILE = getClass().getClassLoader().getResource("test_cert.crt").getFile(); + private final String TEST_SSL_KEY_FILE = getClass().getClassLoader().getResource("test_decrypted_key.key").getFile(); + private String TEST_PIPELINE_NAME = "test-pipeline"; + private String TEST_SOURCE_NAME = "test-source"; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private CertificateProvider certificateProvider; + + @Mock + private CertificateProviderFactory certificateProviderFactory; + + @Mock + private ArmeriaHttpAuthenticationProvider armeriaAuthenticationProvider; + + @Mock + private HttpRequestExceptionHandler httpRequestExceptionHandler; + + @Mock + private GrpcAuthenticationProvider authenticationProvider; + + @Mock + ServerInterceptor authenticationInterceptor; + + @Mock + private Certificate certificate; + + @Test + void createGrpcServerTest() throws JsonProcessingException { + when(authenticationProvider.getAuthenticationInterceptor()).thenReturn(authenticationInterceptor); + final Map metadata = createGrpcMetadata(21890, false, 10000, 10, 5, CompressionOption.NONE, null); + final ServerConfiguration serverConfiguration = createServerConfig(metadata); + final CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, TEST_SOURCE_NAME, TEST_PIPELINE_NAME); + Buffer> buffer = new BlockingBuffer>(TEST_PIPELINE_NAME); + TestService testService = getTestService(buffer); + + Server server = createServer.createGRPCServer(authenticationProvider, testService, certificateProvider, null); + + assertNotNull(server); + assertDoesNotThrow(() -> server.start()); + assertDoesNotThrow(() -> server.stop()); + } + + @Test + void testCustomAuthModule() throws JsonProcessingException{ + when(authenticationProvider.getAuthenticationInterceptor()).thenReturn(authenticationInterceptor); + final Map metadata = createGrpcMetadata(21890, false, 10000, 10, 5, CompressionOption.NONE, null); + + Map authConfig = new HashMap<>(); + authConfig.put("plugin", "test-auth"); + authConfig.put("settings", Collections.singletonMap("key", "value")); + metadata.put("authentication", authConfig); + final ServerConfiguration serverConfiguration = createServerConfig(metadata); + + SimpleAuthDecorator customAuth = new SimpleAuthDecorator(); + Logger mockLogger = mock(Logger.class); + customAuth.setLogger(mockLogger); + when(authenticationProvider.getHttpAuthenticationService()).thenReturn(Optional.of(customAuth)); + + final CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, TEST_SOURCE_NAME, TEST_PIPELINE_NAME); + Buffer> buffer = new BlockingBuffer>(TEST_PIPELINE_NAME); + TestService testService = getTestService(buffer); + Server server = createServer.createGRPCServer(authenticationProvider, testService, certificateProvider, null); + + assertNotNull(server); + assertDoesNotThrow(() -> server.start()); + + WebClient webClient = WebClient.builder( + String.format("http://127.0.0.1:%d", serverConfiguration.getPort()) + ).build(); + webClient.get("/").aggregate().join(); + + verify(mockLogger).info("Ensure Custom Auth Decorator is working"); + + assertDoesNotThrow(() -> server.stop()); + } + + @Test + void createHttpServerTest() throws IOException { + final Path certFilePath = new File(TEST_SSL_CERTIFICATE_FILE).toPath(); + final Path keyFilePath = new File(TEST_SSL_KEY_FILE).toPath(); + final String certAsString = Files.readString(certFilePath); + final String keyAsString = Files.readString(keyFilePath); + + when(certificate.getCertificate()).thenReturn(certAsString); + when(certificate.getPrivateKey()).thenReturn(keyAsString); + when(certificateProvider.getCertificate()).thenReturn(certificate); + when(certificateProviderFactory.getCertificateProvider()).thenReturn(certificateProvider); + final Map metadata = createHttpMetadata(2021, "/log/ingest", 10_000, 200, 500, 1024, true, CompressionOption.NONE); + final ServerConfiguration serverConfiguration = createServerConfig(metadata); + final CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, TEST_SOURCE_NAME, TEST_PIPELINE_NAME); + Buffer> buffer = new BlockingBuffer>(TEST_PIPELINE_NAME); + String logService = "placeholder"; + Server server = createServer.createHTTPServer(buffer, certificateProviderFactory, armeriaAuthenticationProvider, httpRequestExceptionHandler, logService); + assertNotNull(server); + assertDoesNotThrow(() -> server.start()); + assertDoesNotThrow(() -> server.stop()); + } + + + + private Map createGrpcMetadata (Integer port, Boolean ssl, Integer reqeustTimeoutInMillis, Integer maxConnectionCount, Integer threadCount, CompressionOption compression, RetryInfoConfig retryInfo){ + final Map metadata = new HashMap<>(); + metadata.put("port", port); + metadata.put("ssl", ssl); + metadata.put("requestTimeoutInMillis", reqeustTimeoutInMillis); + metadata.put("maxConnectionCount", maxConnectionCount); + metadata.put("threadCount", threadCount); + metadata.put("compression", compression); + metadata.put("retryInfo", retryInfo); + return metadata; + } + + private Map createHttpMetadata (Integer port, String path, Integer requestTimeoutInMillis, Integer threadCount, Integer maxConnectionCount, Integer maxPendingRequests, Boolean hasHealthCheckService, CompressionOption compressionOption){ + final Map metadata = new HashMap<>(); + metadata.put("port", port); + metadata.put("path", path); + metadata.put("requestTimeoutInMillis", requestTimeoutInMillis); + metadata.put("threadCount", threadCount); + metadata.put("maxConnectionCount", maxConnectionCount); + metadata.put("maxPendingRequests", maxPendingRequests); + metadata.put("healthCheck", hasHealthCheckService); + metadata.put("compression", compressionOption); + return metadata; + } + + private ServerConfiguration createServerConfig(final Map metadata) throws JsonProcessingException { + objectMapper = new ObjectMapper(); + objectMapper.registerModule(new JavaTimeModule()); + String json = new ObjectMapper().writeValueAsString(metadata); + return objectMapper.readValue(json, ServerConfiguration.class); + } + + private TestService getTestService(Buffer> buffer){ + TestService testService = new TestService( + 80, + new OTelProtoStandardCodec.OTelProtoDecoder(), + buffer, + pluginMetrics + ); + return testService; + } + + public class SimpleAuthDecorator implements Function { + private Logger authLog = LoggerFactory.getLogger(SimpleAuthDecorator.class); + + void setLogger(Logger logger) { + this.authLog = logger; + } + + @Override + public HttpService apply(HttpService delegate) { + return new HttpService() { + @Override + public com.linecorp.armeria.common.HttpResponse serve(ServiceRequestContext ctx, HttpRequest request) throws Exception { + authLog.info("Ensure Custom Auth Decorator is working"); + return delegate.serve(ctx, request); + } + + }; + } + } + +} diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/health/HealthGrpcServiceTest.java b/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/server/HealthGrpcServiceTest.java similarity index 97% rename from data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/health/HealthGrpcServiceTest.java rename to data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/server/HealthGrpcServiceTest.java index 49c9cb2df3..3a671561d4 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/health/HealthGrpcServiceTest.java +++ b/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/server/HealthGrpcServiceTest.java @@ -3,7 +3,8 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.health; +package org.opensearch.dataprepper.plugins.server; + import io.grpc.ManagedChannel; import io.grpc.Server; diff --git a/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/server/TestService.java b/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/server/TestService.java new file mode 100644 index 0000000000..73fc9d8b33 --- /dev/null +++ b/data-prepper-plugins/http-common/src/test/java/org/opensearch/dataprepper/plugins/server/TestService.java @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.server; + +import com.linecorp.armeria.server.ServiceRequestContext; +import io.grpc.Context; +import io.grpc.stub.StreamObserver; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; +import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; +import org.opensearch.dataprepper.exceptions.BufferWriteException; +import org.opensearch.dataprepper.exceptions.RequestCancelledException; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.metric.Metric; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec.DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE; + +public class TestService extends MetricsServiceGrpc.MetricsServiceImplBase { + private static final Logger LOG = LoggerFactory.getLogger(TestService.class); + + private final int bufferWriteTimeoutInMillis; + private final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder; + private final Buffer> buffer; + + + + public TestService(int bufferWriteTimeoutInMillis, + final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder, + Buffer> buffer, + final PluginMetrics pluginMetrics) { + this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis; + this.buffer = buffer; + + this.oTelProtoDecoder = oTelProtoDecoder; + } + + @Override + public void export(final ExportMetricsServiceRequest request, final StreamObserver responseObserver) { + + if (ServiceRequestContext.current().isTimedOut()) { + return; + } + + if (Context.current().isCancelled()) { + throw new RequestCancelledException("Cancelled by client"); + } + + processRequest(request, responseObserver); + } + + private void processRequest(final ExportMetricsServiceRequest request, final StreamObserver responseObserver) { + try { + if (buffer.isByteBuffer()) { + buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis); + } else { + Collection> metrics; + + AtomicInteger droppedCounter = new AtomicInteger(0); + metrics = oTelProtoDecoder.parseExportMetricsServiceRequest(request, droppedCounter, DEFAULT_EXPONENTIAL_HISTOGRAM_MAX_ALLOWED_SCALE, Instant.now(), true, true, true); + buffer.writeAll(metrics, bufferWriteTimeoutInMillis); + } + } catch (Exception e) { + if (ServiceRequestContext.current().isTimedOut()) { + LOG.warn("Exception writing to buffer but request already timed out.", e); + return; + } + + LOG.error("Failed to write the request of size {} due to:", request.toString().length(), e); + throw new BufferWriteException(e.getMessage(), e); + } + + if (ServiceRequestContext.current().isTimedOut()) { + LOG.warn("Buffer write completed successfully but request already timed out."); + return; + } + + responseObserver.onNext(ExportMetricsServiceResponse.newBuilder().build()); + responseObserver.onCompleted(); + } +} diff --git a/data-prepper-plugins/http-common/src/test/resources/test_cert.crt b/data-prepper-plugins/http-common/src/test/resources/test_cert.crt new file mode 100644 index 0000000000..26c78d1411 --- /dev/null +++ b/data-prepper-plugins/http-common/src/test/resources/test_cert.crt @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICHTCCAYYCCQD4hqYeYDQZADANBgkqhkiG9w0BAQUFADBSMQswCQYDVQQGEwJV +UzELMAkGA1UECAwCVFgxDzANBgNVBAcMBkF1c3RpbjEPMA0GA1UECgwGQW1hem9u +MRQwEgYDVQQLDAtEYXRhcHJlcHBlcjAgFw0yMTA2MjUxOTIzMTBaGA8yMTIxMDYw +MTE5MjMxMFowUjELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAlRYMQ8wDQYDVQQHDAZB +dXN0aW4xDzANBgNVBAoMBkFtYXpvbjEUMBIGA1UECwwLRGF0YXByZXBwZXIwgZ8w +DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAKrb3YhdKbQ5PtLHall10iLZC9ZdDVrq +HOvqVSM8NHlL8f82gJ3l0n9k7hYc5eKisutaS9eDTmJ+Dnn8xn/qPSKTIq9Wh+OZ +O+e9YEEpI/G4F9KpGULgMyRg9sJK0GlZdEt9o5GJNJIJUkptJU5eiLuE0IV+jyJo +Nvm8OE6EJPqxAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAjgnX5n/Tt7eo9uakIGAb +uBhvYdR8JqKXqF9rjFJ/MIK7FdQSF/gCdjnvBhzLlZFK/Nb6MGKoSKm5Lcr75LgC +FyhIwp3WlqQksiMFnOypYVY71vqDgj6UKdMaOBgthsYhngj8lC+wsVzWqQvkJ2Qg +/GAIzJwiZfXiaevQHRk79qI= +-----END CERTIFICATE----- diff --git a/data-prepper-plugins/http-common/src/test/resources/test_decrypted_key.key b/data-prepper-plugins/http-common/src/test/resources/test_decrypted_key.key new file mode 100644 index 0000000000..479b877131 --- /dev/null +++ b/data-prepper-plugins/http-common/src/test/resources/test_decrypted_key.key @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXAIBAAKBgQCq292IXSm0OT7Sx2pZddIi2QvWXQ1a6hzr6lUjPDR5S/H/NoCd +5dJ/ZO4WHOXiorLrWkvXg05ifg55/MZ/6j0ikyKvVofjmTvnvWBBKSPxuBfSqRlC +4DMkYPbCStBpWXRLfaORiTSSCVJKbSVOXoi7hNCFfo8iaDb5vDhOhCT6sQIDAQAB +AoGANrrhFqpJDpr7vcb1ER0Fp/YArbT27zVo+EUC6puBb41dQlQyFOImcHpjLaAq +H1PgnjU5cBp2hGQ+vOK0rwrYc/HNl6vfh6N3NbDptMiuoBafRJA9JzYourAM09BU +zmXyr61Yn3KHzx1PRwWe37icX93oXP3P0qHb3dI1ZF4jG0ECQQDU5N/a7ogoz2zn +ZssD6FvUOUQDsdBWdXmhUvg+YdZrV44e4xk+FVzwEONoRktEYKz9MFXlsgNHr445 +KRguHWcJAkEAzXQkwOkN8WID1wrwoobUIMbZSGAZzofwkKXgTTnllnT1qOQXuRbS +aCMejFEymBBef4aXP6N4+va2FKW/MF34aQJAO2oMl1sOoOUSrZngepy0VAwPUUCk +thxe74jqQu6nGpn6zd/vQYZQw6bS8Fz90H1yic6dilcd1znFZWp0lxoZkQJBALeI +xoBycRsuFQIYasi1q3AwUtBd0Q/3zkZZeBtk2hzjFMUwJaUZpxKSNOrialD/ZnuD +jz+xWBTRKe0d98JMX+kCQCmsJEj/HYQAC1GamZ7JQWogRSRF2KTgTWRaDXDxy0d4 +yUQgwHB+HZLFcbi1JEK6eIixCsX8iifrrkteh+1npJ0= +-----END RSA PRIVATE KEY----- diff --git a/data-prepper-plugins/http-source/README.md b/data-prepper-plugins/http-source/README.md index 19d9a5a543..46141db03b 100644 --- a/data-prepper-plugins/http-source/README.md +++ b/data-prepper-plugins/http-source/README.md @@ -96,7 +96,7 @@ Make sure to replace the paths for the `ssl_certificate_file` and `ssl_key_file` Send a sample log with the following https curl command ``` -curl -k -XPOST -H "Content-Type: application/json" -d '[{"log": "sample log"}]' https://localhost:2021/log/ingest +curl -k -X POST -H "Content-Type: application/json" -d '[{"log": "sample log"}]' https://localhost:2021/log/ingest ``` # Metrics diff --git a/data-prepper-plugins/http-source/build.gradle b/data-prepper-plugins/http-source/build.gradle index fcfb4ad67f..5c6173b0ab 100644 --- a/data-prepper-plugins/http-source/build.gradle +++ b/data-prepper-plugins/http-source/build.gradle @@ -14,6 +14,7 @@ dependencies { implementation project(':data-prepper-plugins:http-source-common') implementation project(':data-prepper-plugins:common') implementation project(':data-prepper-plugins:armeria-common') + implementation project(':data-prepper-plugins:http-common') implementation libs.armeria.core implementation libs.commons.io implementation 'software.amazon.awssdk:acm' diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/ConvertConfiguration.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/ConvertConfiguration.java new file mode 100644 index 0000000000..a985b3e6e1 --- /dev/null +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/ConvertConfiguration.java @@ -0,0 +1,25 @@ +package org.opensearch.dataprepper.plugins.source.loghttp; + +import org.opensearch.dataprepper.http.HttpServerConfig; +import org.opensearch.dataprepper.plugins.server.ServerConfiguration; + +public class ConvertConfiguration { + + public static ServerConfiguration convertConfiguration(final HttpServerConfig sourceConfig) { + ServerConfiguration serverConfiguration = new ServerConfiguration(); + serverConfiguration.setPath(sourceConfig.getPath()); + serverConfiguration.setHealthCheck(sourceConfig.hasHealthCheckService()); + serverConfiguration.setRequestTimeoutInMillis(sourceConfig.getRequestTimeoutInMillis()); + serverConfiguration.setCompression(sourceConfig.getCompression()); + serverConfiguration.setAuthentication(sourceConfig.getAuthentication()); + serverConfiguration.setSsl(sourceConfig.isSsl()); + serverConfiguration.setUnauthenticatedHealthCheck(sourceConfig.isUnauthenticatedHealthCheck()); + serverConfiguration.setMaxRequestLength(sourceConfig.getMaxRequestLength()); + serverConfiguration.setPort(sourceConfig.getPort()); + serverConfiguration.setThreadCount(sourceConfig.getThreadCount()); + serverConfiguration.setMaxPendingRequests(sourceConfig.getMaxPendingRequests()); + serverConfiguration.setMaxConnectionCount(sourceConfig.getMaxConnectionCount()); + serverConfiguration.setBufferTimeoutInMillis(sourceConfig.getBufferTimeoutInMillis()); + return serverConfiguration; + } +} diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java index 1906a3e40f..737a8046d7 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java @@ -5,17 +5,10 @@ package org.opensearch.dataprepper.plugins.source.loghttp; -import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.Server; -import com.linecorp.armeria.server.ServerBuilder; -import com.linecorp.armeria.server.encoding.DecodingService; -import com.linecorp.armeria.server.healthcheck.HealthCheckService; -import com.linecorp.armeria.server.throttling.ThrottlingService; import org.opensearch.dataprepper.HttpRequestExceptionHandler; import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; import org.opensearch.dataprepper.http.HttpServerConfig; -import org.opensearch.dataprepper.http.LogThrottlingRejectHandler; -import org.opensearch.dataprepper.http.LogThrottlingStrategy; import org.opensearch.dataprepper.http.certificate.CertificateProviderFactory; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; @@ -30,23 +23,17 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; -import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; -import org.opensearch.dataprepper.plugins.certificate.model.Certificate; -import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.plugins.server.CreateServer; +import org.opensearch.dataprepper.plugins.server.ServerConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.Collections; -import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.function.Function; @DataPrepperPlugin(name = "http", pluginType = Source.class, pluginConfigurationType = HTTPSourceConfig.class) public class HTTPSource implements Source> { + private static final String PLUGIN_NAME = "http"; private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class); private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}"; public static final String REGEX_HEALTH = "regex:^/(?!health$).*$"; @@ -96,65 +83,10 @@ public void start(final Buffer> buffer) { throw new IllegalStateException("Buffer provided is null"); } if (server == null) { - final ServerBuilder sb = Server.builder(); - - sb.disableServerHeader(); - - if (sourceConfig.isSsl()) { - LOG.info("Creating http source with SSL/TLS enabled."); - final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); - final Certificate certificate = certificateProvider.getCertificate(); - // TODO: enable encrypted key with password - sb.https(sourceConfig.getPort()).tls( - new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)), - new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8) - ) - ); - } else { - LOG.warn("Creating http source without SSL/TLS. This is not secure."); - LOG.warn("In order to set up TLS for the http source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl"); - sb.http(sourceConfig.getPort()); - } - - if(sourceConfig.getAuthentication() != null) { - final Optional> optionalAuthDecorator = authenticationProvider.getAuthenticationDecorator(); - - if (sourceConfig.isUnauthenticatedHealthCheck()) { - optionalAuthDecorator.ifPresent(authDecorator -> sb.decorator(REGEX_HEALTH, authDecorator)); - } else { - optionalAuthDecorator.ifPresent(sb::decorator); - } - } - - sb.maxNumConnections(sourceConfig.getMaxConnectionCount()); - sb.requestTimeout(Duration.ofMillis(sourceConfig.getRequestTimeoutInMillis())); - if(sourceConfig.getMaxRequestLength() != null) { - sb.maxRequestLength(sourceConfig.getMaxRequestLength().getBytes()); - } - final int threads = sourceConfig.getThreadCount(); - final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads); - sb.blockingTaskExecutor(blockingTaskExecutor, true); - final int maxPendingRequests = sourceConfig.getMaxPendingRequests(); - final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy( - maxPendingRequests, blockingTaskExecutor.getQueue()); - final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics); - - final String httpSourcePath = sourceConfig.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); - sb.decorator(httpSourcePath, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler)); - final LogHTTPService logHTTPService = new LogHTTPService(sourceConfig.getBufferTimeoutInMillis(), buffer, pluginMetrics); - - if (CompressionOption.NONE.equals(sourceConfig.getCompression())) { - sb.annotatedService(httpSourcePath, logHTTPService, httpRequestExceptionHandler); - } else { - sb.annotatedService(httpSourcePath, logHTTPService, DecodingService.newDecorator(), httpRequestExceptionHandler); - } - - if (sourceConfig.hasHealthCheckService()) { - LOG.info("HTTP source health check is enabled"); - sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build()); - } - - server = sb.build(); + ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(sourceConfig); + CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, PLUGIN_NAME, pipelineName); + final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics); + server = createServer.createHTTPServer(buffer, certificateProviderFactory, authenticationProvider, httpRequestExceptionHandler, logHTTPService); pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections); } diff --git a/data-prepper-plugins/otel-logs-source/build.gradle b/data-prepper-plugins/otel-logs-source/build.gradle index ac76ddec5d..5ba4631ade 100644 --- a/data-prepper-plugins/otel-logs-source/build.gradle +++ b/data-prepper-plugins/otel-logs-source/build.gradle @@ -12,6 +12,7 @@ dependencies { implementation project(':data-prepper-plugins:common') implementation project(':data-prepper-plugins:blocking-buffer') implementation project(':data-prepper-plugins:otel-proto-common') + implementation project(':data-prepper-plugins:http-common') implementation libs.commons.codec implementation project(':data-prepper-plugins:armeria-common') testImplementation project(':data-prepper-api').sourceSets.test.output diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/ConvertConfiguration.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/ConvertConfiguration.java new file mode 100644 index 0000000000..ccb422340b --- /dev/null +++ b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/ConvertConfiguration.java @@ -0,0 +1,26 @@ +package org.opensearch.dataprepper.plugins.source.otellogs; + +import org.opensearch.dataprepper.plugins.server.ServerConfiguration; + +public class ConvertConfiguration { + + public static ServerConfiguration convertConfiguration(final OTelLogsSourceConfig oTelLogsSourceConfig) { + ServerConfiguration serverConfiguration = new ServerConfiguration(); + serverConfiguration.setPath(oTelLogsSourceConfig.getPath()); + serverConfiguration.setHealthCheck(oTelLogsSourceConfig.hasHealthCheck()); + serverConfiguration.setProtoReflectionService(oTelLogsSourceConfig.hasProtoReflectionService()); + serverConfiguration.setRequestTimeoutInMillis(oTelLogsSourceConfig.getRequestTimeoutInMillis()); + serverConfiguration.setEnableUnframedRequests(oTelLogsSourceConfig.enableUnframedRequests()); + serverConfiguration.setCompression(oTelLogsSourceConfig.getCompression()); + serverConfiguration.setAuthentication(oTelLogsSourceConfig.getAuthentication()); + serverConfiguration.setSsl(oTelLogsSourceConfig.isSsl()); + serverConfiguration.setUseAcmCertForSSL(oTelLogsSourceConfig.useAcmCertForSSL()); + serverConfiguration.setMaxRequestLength(oTelLogsSourceConfig.getMaxRequestLength()); + serverConfiguration.setPort(oTelLogsSourceConfig.getPort()); + serverConfiguration.setRetryInfo(oTelLogsSourceConfig.getRetryInfo()); + serverConfiguration.setThreadCount(oTelLogsSourceConfig.getThreadCount()); + serverConfiguration.setMaxConnectionCount(oTelLogsSourceConfig.getMaxConnectionCount()); + + return serverConfiguration; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java index 1d96aac89a..f65f795d10 100644 --- a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java +++ b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java @@ -5,22 +5,8 @@ package org.opensearch.dataprepper.plugins.source.otellogs; -import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction; -import com.linecorp.armeria.server.HttpService; -import com.linecorp.armeria.server.encoding.DecodingService; -import org.opensearch.dataprepper.GrpcRequestExceptionHandler; -import org.opensearch.dataprepper.plugins.codec.CompressionOption; -import org.opensearch.dataprepper.plugins.health.HealthGrpcService; -import org.opensearch.dataprepper.plugins.source.otellogs.certificate.CertificateProviderFactory; import com.linecorp.armeria.server.Server; -import com.linecorp.armeria.server.ServerBuilder; -import com.linecorp.armeria.server.grpc.GrpcService; -import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; import io.grpc.MethodDescriptor; -import io.grpc.ServerInterceptor; -import io.grpc.ServerInterceptors; -import io.grpc.protobuf.services.ProtoReflectionService; - import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; import io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc; @@ -29,6 +15,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -36,34 +23,25 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; -import org.opensearch.dataprepper.plugins.certificate.model.Certificate; +import org.opensearch.dataprepper.plugins.otel.codec.OTelLogsDecoder; import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat; import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoOpensearchCodec; import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoStandardCodec; -import org.opensearch.dataprepper.model.codec.ByteDecoder; -import org.opensearch.dataprepper.plugins.otel.codec.OTelLogsDecoder; +import org.opensearch.dataprepper.plugins.server.CreateServer; +import org.opensearch.dataprepper.plugins.server.ServerConfiguration; +import org.opensearch.dataprepper.plugins.source.otellogs.certificate.CertificateProviderFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.Collections; -import java.util.List; -import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.function.Function; @DataPrepperPlugin(name = "otel_logs_source", pluginType = Source.class, pluginConfigurationType = OTelLogsSourceConfig.class) public class OTelLogsSource implements Source> { + private static final String PLUGIN_NAME = "otel_logs_source"; private static final Logger LOG = LoggerFactory.getLogger(OTelLogsSource.class); - private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}"; static final String SERVER_CONNECTIONS = "serverConnections"; - // Default RetryInfo with minimum 100ms and maximum 2s - private static final RetryInfoConfig DEFAULT_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(100), Duration.ofMillis(2000)); - private final OTelLogsSourceConfig oTelLogsSourceConfig; private final String pipelineName; private final PluginMetrics pluginMetrics; @@ -112,77 +90,14 @@ public void start(Buffer> buffer) { pluginMetrics ); - final List serverInterceptors = getAuthenticationInterceptor(); - - final GrpcServiceBuilder grpcServiceBuilder = GrpcService - .builder() - .useClientTimeoutHeader(false) - .useBlockingTaskExecutor(true) - .exceptionHandler(createGrpExceptionHandler()); - - final MethodDescriptor methodDescriptor = LogsServiceGrpc.getExportMethod(); - final String oTelLogsSourcePath = oTelLogsSourceConfig.getPath(); - if (oTelLogsSourcePath != null) { - final String transformedOTelLogsSourcePath = oTelLogsSourcePath.replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); - grpcServiceBuilder.addService(transformedOTelLogsSourcePath, - ServerInterceptors.intercept(oTelLogsGrpcService, serverInterceptors), methodDescriptor); - } else { - grpcServiceBuilder.addService(ServerInterceptors.intercept(oTelLogsGrpcService, serverInterceptors)); - } - - if (oTelLogsSourceConfig.hasHealthCheck()) { - LOG.info("Health check is enabled"); - grpcServiceBuilder.addService(new HealthGrpcService()); - } - - if (oTelLogsSourceConfig.hasProtoReflectionService()) { - LOG.info("Proto reflection service is enabled"); - grpcServiceBuilder.addService(ProtoReflectionService.newInstance()); - } - - grpcServiceBuilder.enableUnframedRequests(oTelLogsSourceConfig.enableUnframedRequests()); - - final ServerBuilder sb = Server.builder(); - sb.disableServerHeader(); - if (CompressionOption.NONE.equals(oTelLogsSourceConfig.getCompression())) { - sb.service(grpcServiceBuilder.build()); - } else { - sb.service(grpcServiceBuilder.build(), DecodingService.newDecorator()); - } - - if (oTelLogsSourceConfig.getAuthentication() != null) { - final Optional> optionalHttpAuthenticationService = - authenticationProvider.getHttpAuthenticationService(); - optionalHttpAuthenticationService.ifPresent(sb::decorator); - } - - sb.requestTimeoutMillis(oTelLogsSourceConfig.getRequestTimeoutInMillis()); - if(oTelLogsSourceConfig.getMaxRequestLength() != null) { - sb.maxRequestLength(oTelLogsSourceConfig.getMaxRequestLength().getBytes()); - } - - // ACM Cert for SSL takes preference + ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(oTelLogsSourceConfig); + CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, PLUGIN_NAME, pipelineName); + CertificateProvider certificateProvider = null; if (oTelLogsSourceConfig.isSsl() || oTelLogsSourceConfig.useAcmCertForSSL()) { - LOG.info("SSL/TLS is enabled."); - final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); - final Certificate certificate = certificateProvider.getCertificate(); - sb.https(oTelLogsSourceConfig.getPort()).tls( - new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)), - new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8) - ) - ); - } else { - LOG.warn("Creating otel_logs_source without SSL/TLS. This is not secure."); - LOG.warn("In order to set up TLS for the otel_logs_source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-logs-source#ssl"); - sb.http(oTelLogsSourceConfig.getPort()); + certificateProvider = certificateProviderFactory.getCertificateProvider(); } - - sb.maxNumConnections(oTelLogsSourceConfig.getMaxConnectionCount()); - sb.blockingTaskExecutor( - Executors.newScheduledThreadPool(oTelLogsSourceConfig.getThreadCount()), - true); - - server = sb.build(); + final MethodDescriptor methodDescriptor = LogsServiceGrpc.getExportMethod(); + server = createServer.createGRPCServer(authenticationProvider, oTelLogsGrpcService, certificateProvider, methodDescriptor); pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections); } @@ -220,22 +135,6 @@ public void stop() { LOG.info("Stopped otel_logs_source."); } - private GrpcExceptionHandlerFunction createGrpExceptionHandler() { - RetryInfoConfig retryInfo = oTelLogsSourceConfig.getRetryInfo() != null - ? oTelLogsSourceConfig.getRetryInfo() - : DEFAULT_RETRY_INFO; - - return new GrpcRequestExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay()); - } - - private List getAuthenticationInterceptor() { - final ServerInterceptor authenticationInterceptor = authenticationProvider.getAuthenticationInterceptor(); - if (authenticationInterceptor == null) { - return Collections.emptyList(); - } - return Collections.singletonList(authenticationInterceptor); - } - private GrpcAuthenticationProvider createAuthenticationProvider(final PluginFactory pluginFactory) { final PluginModel authenticationConfiguration = oTelLogsSourceConfig.getAuthentication(); diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceConfig.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceConfig.java index 501d8259db..9babfdafc1 100644 --- a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceConfig.java +++ b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceConfig.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat; +import org.opensearch.dataprepper.plugins.server.RetryInfoConfig; public class OTelLogsSourceConfig { static final String REQUEST_TIMEOUT = "request_timeout"; diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/RetryInfoConfig.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/RetryInfoConfig.java deleted file mode 100644 index 1ae026202e..0000000000 --- a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/RetryInfoConfig.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.opensearch.dataprepper.plugins.source.otellogs; - -import java.time.Duration; - -import com.fasterxml.jackson.annotation.JsonProperty; - -public class RetryInfoConfig { - - @JsonProperty(value = "min_delay", defaultValue = "100ms") - private Duration minDelay; - - @JsonProperty(value = "max_delay", defaultValue = "2s") - private Duration maxDelay; - - // Jackson needs this constructor - public RetryInfoConfig() {} - - public RetryInfoConfig(Duration minDelay, Duration maxDelay) { - this.minDelay = minDelay; - this.maxDelay = maxDelay; - } - - public Duration getMinDelay() { - return minDelay; - } - - public void setMinDelay(Duration minDelay) { - this.minDelay = minDelay; - } - - public Duration getMaxDelay() { - return maxDelay; - } - - public void setMaxDelay(Duration maxDelay) { - this.maxDelay = maxDelay; - } -} diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/health/HealthGrpcServiceTest.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/health/HealthGrpcServiceTest.java deleted file mode 100644 index 49c9cb2df3..0000000000 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/health/HealthGrpcServiceTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.health; - -import io.grpc.ManagedChannel; -import io.grpc.Server; -import io.grpc.health.v1.HealthCheckRequest; -import io.grpc.health.v1.HealthCheckResponse; -import io.grpc.health.v1.HealthGrpc; -import io.grpc.inprocess.InProcessChannelBuilder; -import io.grpc.inprocess.InProcessServerBuilder; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.concurrent.TimeUnit; - -import static org.junit.jupiter.api.Assertions.assertEquals; - - -public class HealthGrpcServiceTest { - - private Server server; - private ManagedChannel channel; - - @BeforeEach - public void setup() throws Exception { - String serverName = InProcessServerBuilder.generateName(); - - server = InProcessServerBuilder.forName(serverName) - .directExecutor() - .addService(new HealthGrpcService()) - .build() - .start(); - - channel = InProcessChannelBuilder.forName(serverName) - .directExecutor() - .build(); - } - - @AfterEach - public void teardown() throws Exception { - try { - channel.shutdown(); - server.shutdown(); - - assert channel.awaitTermination(10, TimeUnit.SECONDS) : "channel cannot be gracefully shutdown"; - assert server.awaitTermination(10, TimeUnit.SECONDS) : "server cannot be gracefully shutdown"; - } finally { - channel.shutdownNow(); - server.shutdownNow(); - } - } - - @Test - void testHealthCheckResponse() { - HealthGrpc.HealthBlockingStub blockingStub = HealthGrpc.newBlockingStub(channel); - HealthCheckResponse response = - blockingStub.check(HealthCheckRequest.newBuilder().build()); - - assertEquals(HealthCheckResponse.ServingStatus.SERVING, response.getStatus()); - } -} diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java index 4bcacdaa99..cc228eb28f 100644 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java @@ -73,7 +73,7 @@ import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.opensearch.dataprepper.plugins.codec.CompressionOption; -import org.opensearch.dataprepper.plugins.health.HealthGrpcService; +import org.opensearch.dataprepper.plugins.server.HealthGrpcService; import org.opensearch.dataprepper.plugins.otel.codec.OTelLogsDecoder; import org.opensearch.dataprepper.plugins.source.otellogs.certificate.CertificateProviderFactory; diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSourceConfigTests.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSourceConfigTests.java index abf259da33..f8b1ef8c0d 100644 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSourceConfigTests.java +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSourceConfigTests.java @@ -43,6 +43,7 @@ import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.SSL_KEY_FILE; import static org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig.THREAD_COUNT; import static org.hamcrest.MatcherAssert.assertThat; +import org.opensearch.dataprepper.plugins.server.RetryInfoConfig; import static org.hamcrest.Matchers.equalTo; class OtelLogsSourceConfigTests { diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSource_RetryInfoTest.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSource_RetryInfoTest.java index 40cdaa1091..33295666f0 100644 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSource_RetryInfoTest.java +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSource_RetryInfoTest.java @@ -53,6 +53,7 @@ import io.opentelemetry.proto.logs.v1.ResourceLogs; import io.opentelemetry.proto.logs.v1.ScopeLogs; import io.opentelemetry.proto.resource.v1.Resource; +import org.opensearch.dataprepper.plugins.server.RetryInfoConfig; @ExtendWith(MockitoExtension.class) class OtelLogsSource_RetryInfoTest { diff --git a/data-prepper-plugins/otel-metrics-source/build.gradle b/data-prepper-plugins/otel-metrics-source/build.gradle index 7e994cf282..79d5baf22c 100644 --- a/data-prepper-plugins/otel-metrics-source/build.gradle +++ b/data-prepper-plugins/otel-metrics-source/build.gradle @@ -14,6 +14,7 @@ dependencies { implementation libs.commons.codec implementation project(':data-prepper-plugins:armeria-common') implementation project(':data-prepper-plugins:otel-proto-common') + implementation project(':data-prepper-plugins:http-common') testImplementation project(':data-prepper-api').sourceSets.test.output implementation libs.opentelemetry.proto implementation libs.commons.io diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/health/HealthGrpcService.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/health/HealthGrpcService.java deleted file mode 100644 index adce5be0b9..0000000000 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/health/HealthGrpcService.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.health; - -import io.grpc.health.v1.HealthCheckRequest; -import io.grpc.health.v1.HealthCheckResponse; -import io.grpc.health.v1.HealthGrpc; -import io.grpc.stub.StreamObserver; - -public class HealthGrpcService extends HealthGrpc.HealthImplBase { - - @Override - public void check(final HealthCheckRequest request, final StreamObserver responseObserver) { - responseObserver.onNext( - HealthCheckResponse.newBuilder().setStatus(HealthCheckResponse.ServingStatus.SERVING).build()); - responseObserver.onCompleted(); - } -} diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/ConvertConfiguration.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/ConvertConfiguration.java new file mode 100644 index 0000000000..3922a10572 --- /dev/null +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/ConvertConfiguration.java @@ -0,0 +1,27 @@ +package org.opensearch.dataprepper.plugins.source.otelmetrics; + +import org.opensearch.dataprepper.plugins.server.ServerConfiguration; + +public class ConvertConfiguration { + + public static ServerConfiguration convertConfiguration(final OTelMetricsSourceConfig oTelMetricsSourceConfig) { + ServerConfiguration serverConfiguration = new ServerConfiguration(); + serverConfiguration.setPath(oTelMetricsSourceConfig.getPath()); + serverConfiguration.setHealthCheck(oTelMetricsSourceConfig.hasHealthCheck()); + serverConfiguration.setProtoReflectionService(oTelMetricsSourceConfig.hasProtoReflectionService()); + serverConfiguration.setRequestTimeoutInMillis(oTelMetricsSourceConfig.getRequestTimeoutInMillis()); + serverConfiguration.setEnableUnframedRequests(oTelMetricsSourceConfig.enableUnframedRequests()); + serverConfiguration.setCompression(oTelMetricsSourceConfig.getCompression()); + serverConfiguration.setAuthentication(oTelMetricsSourceConfig.getAuthentication()); + serverConfiguration.setSsl(oTelMetricsSourceConfig.isSsl()); + serverConfiguration.setUnauthenticatedHealthCheck(oTelMetricsSourceConfig.isUnauthenticatedHealthCheck()); + serverConfiguration.setUseAcmCertForSSL(oTelMetricsSourceConfig.useAcmCertForSSL()); + serverConfiguration.setMaxRequestLength(oTelMetricsSourceConfig.getMaxRequestLength()); + serverConfiguration.setPort(oTelMetricsSourceConfig.getPort()); + serverConfiguration.setRetryInfo(oTelMetricsSourceConfig.getRetryInfo()); + serverConfiguration.setThreadCount(oTelMetricsSourceConfig.getThreadCount()); + serverConfiguration.setMaxConnectionCount(oTelMetricsSourceConfig.getMaxConnectionCount()); + + return serverConfiguration; + } +} diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java index e3fbea33a4..0e0cb8c343 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java @@ -5,68 +5,44 @@ package org.opensearch.dataprepper.plugins.source.otelmetrics; -import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction; -import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.Server; -import com.linecorp.armeria.server.ServerBuilder; -import com.linecorp.armeria.server.encoding.DecodingService; -import com.linecorp.armeria.server.grpc.GrpcService; -import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; -import com.linecorp.armeria.server.healthcheck.HealthCheckService; import io.grpc.MethodDescriptor; -import io.grpc.ServerInterceptor; -import io.grpc.ServerInterceptors; -import io.grpc.protobuf.services.ProtoReflectionService; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; -import org.opensearch.dataprepper.GrpcRequestExceptionHandler; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; -import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.metric.Metric; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.metric.Metric; import org.opensearch.dataprepper.model.source.Source; -import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder; +import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat; import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoOpensearchCodec; import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoStandardCodec; -import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat; -import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; -import org.opensearch.dataprepper.plugins.certificate.model.Certificate; -import org.opensearch.dataprepper.plugins.health.HealthGrpcService; +import org.opensearch.dataprepper.plugins.server.CreateServer; +import org.opensearch.dataprepper.plugins.server.ServerConfiguration; import org.opensearch.dataprepper.plugins.source.otelmetrics.certificate.CertificateProviderFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.Collections; -import java.util.List; -import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.function.Function; @DataPrepperPlugin(name = "otel_metrics_source", pluginType = Source.class, pluginConfigurationType = OTelMetricsSourceConfig.class) public class OTelMetricsSource implements Source> { + private static final String PLUGIN_NAME = "otel_metrics_source"; private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsSource.class); - private static final String HTTP_HEALTH_CHECK_PATH = "/health"; - private static final String REGEX_HEALTH = "regex:^/(?!health$).*$"; - private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}"; static final String SERVER_CONNECTIONS = "serverConnections"; - // Default RetryInfo with minimum 100ms and maximum 2s - private static final RetryInfoConfig DEFAULT_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(100), Duration.ofMillis(2000)); - private final OTelMetricsSourceConfig oTelMetricsSourceConfig; private final String pipelineName; private final PluginMetrics pluginMetrics; @@ -112,87 +88,14 @@ public void start(Buffer> buffer) { pluginMetrics ); - final List serverInterceptors = getAuthenticationInterceptor(); - - final GrpcServiceBuilder grpcServiceBuilder = GrpcService - .builder() - .useClientTimeoutHeader(false) - .useBlockingTaskExecutor(true) - .exceptionHandler(createGrpExceptionHandler()); - - final MethodDescriptor methodDescriptor = MetricsServiceGrpc.getExportMethod(); - final String oTelMetricsSourcePath = oTelMetricsSourceConfig.getPath(); - if (oTelMetricsSourcePath != null) { - final String transformedOTelMetricsSourcePath = oTelMetricsSourcePath.replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); - grpcServiceBuilder.addService(transformedOTelMetricsSourcePath, - ServerInterceptors.intercept(oTelMetricsGrpcService, serverInterceptors), methodDescriptor); - } else { - grpcServiceBuilder.addService(ServerInterceptors.intercept(oTelMetricsGrpcService, serverInterceptors)); - } - - if (oTelMetricsSourceConfig.hasHealthCheck()) { - LOG.info("Health check is enabled"); - grpcServiceBuilder.addService(new HealthGrpcService()); - } - - if (oTelMetricsSourceConfig.hasProtoReflectionService()) { - LOG.info("Proto reflection service is enabled"); - grpcServiceBuilder.addService(ProtoReflectionService.newInstance()); - } - - grpcServiceBuilder.enableUnframedRequests(oTelMetricsSourceConfig.enableUnframedRequests()); - - final ServerBuilder sb = Server.builder(); - sb.disableServerHeader(); - if (CompressionOption.NONE.equals(oTelMetricsSourceConfig.getCompression())) { - sb.service(grpcServiceBuilder.build()); - } else { - sb.service(grpcServiceBuilder.build(), DecodingService.newDecorator()); - } - - if(oTelMetricsSourceConfig.enableHttpHealthCheck()) { - sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build()); - } - - if(oTelMetricsSourceConfig.getAuthentication() != null) { - final Optional> optionalHttpAuthenticationService = - authenticationProvider.getHttpAuthenticationService(); - - if(oTelMetricsSourceConfig.isUnauthenticatedHealthCheck()) { - optionalHttpAuthenticationService.ifPresent(httpAuthenticationService -> - sb.decorator(REGEX_HEALTH, httpAuthenticationService)); - } else { - optionalHttpAuthenticationService.ifPresent(sb::decorator); - } - } - - sb.requestTimeoutMillis(oTelMetricsSourceConfig.getRequestTimeoutInMillis()); - if(oTelMetricsSourceConfig.getMaxRequestLength() != null) { - sb.maxRequestLength(oTelMetricsSourceConfig.getMaxRequestLength().getBytes()); - } - - // ACM Cert for SSL takes preference + ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(oTelMetricsSourceConfig); + CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, PLUGIN_NAME, pipelineName); + CertificateProvider certificateProvider = null; if (oTelMetricsSourceConfig.isSsl() || oTelMetricsSourceConfig.useAcmCertForSSL()) { - LOG.info("SSL/TLS is enabled."); - final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); - final Certificate certificate = certificateProvider.getCertificate(); - sb.https(oTelMetricsSourceConfig.getPort()).tls( - new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)), - new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8) - ) - ); - } else { - LOG.warn("Creating otel_metrics_source without SSL/TLS. This is not secure."); - LOG.warn("In order to set up TLS for the otel_metrics_source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-metrics-source#ssl"); - sb.http(oTelMetricsSourceConfig.getPort()); + certificateProvider = certificateProviderFactory.getCertificateProvider(); } - - sb.maxNumConnections(oTelMetricsSourceConfig.getMaxConnectionCount()); - sb.blockingTaskExecutor( - Executors.newScheduledThreadPool(oTelMetricsSourceConfig.getThreadCount()), - true); - - server = sb.build(); + final MethodDescriptor methodDescriptor = MetricsServiceGrpc.getExportMethod(); + server = createServer.createGRPCServer(authenticationProvider, oTelMetricsGrpcService, certificateProvider, methodDescriptor); pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections); } @@ -230,21 +133,6 @@ public void stop() { LOG.info("Stopped otel_metrics_source."); } - private GrpcExceptionHandlerFunction createGrpExceptionHandler() { - RetryInfoConfig retryInfo = oTelMetricsSourceConfig.getRetryInfo() != null - ? oTelMetricsSourceConfig.getRetryInfo() - : DEFAULT_RETRY_INFO; - - return new GrpcRequestExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay()); - } - - private List getAuthenticationInterceptor() { - final ServerInterceptor authenticationInterceptor = authenticationProvider.getAuthenticationInterceptor(); - if (authenticationInterceptor == null) { - return Collections.emptyList(); - } - return Collections.singletonList(authenticationInterceptor); - } private GrpcAuthenticationProvider createAuthenticationProvider(final PluginFactory pluginFactory) { final PluginModel authenticationConfiguration = oTelMetricsSourceConfig.getAuthentication(); diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java index a2fe35437b..4f2a18d4ea 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java @@ -11,6 +11,7 @@ import org.apache.commons.lang3.StringUtils; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.plugins.server.RetryInfoConfig; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat; diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/RetryInfoConfig.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/RetryInfoConfig.java deleted file mode 100644 index 5fd80133c6..0000000000 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/RetryInfoConfig.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.opensearch.dataprepper.plugins.source.otelmetrics; - -import java.time.Duration; - -import com.fasterxml.jackson.annotation.JsonProperty; - -public class RetryInfoConfig { - - @JsonProperty(value = "min_delay", defaultValue = "100ms") - private Duration minDelay; - - @JsonProperty(value = "max_delay", defaultValue = "2s") - private Duration maxDelay; - - // Jackson needs this constructor - public RetryInfoConfig() {} - - public RetryInfoConfig(Duration minDelay, Duration maxDelay) { - this.minDelay = minDelay; - this.maxDelay = maxDelay; - } - - public Duration getMinDelay() { - return minDelay; - } - - public void setMinDelay(Duration minDelay) { - this.minDelay = minDelay; - } - - public Duration getMaxDelay() { - return maxDelay; - } - - public void setMaxDelay(Duration maxDelay) { - this.maxDelay = maxDelay; - } -} diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java index ee1f65c191..81214e3c10 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java @@ -79,7 +79,7 @@ import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.opensearch.dataprepper.plugins.codec.CompressionOption; -import org.opensearch.dataprepper.plugins.health.HealthGrpcService; +import org.opensearch.dataprepper.plugins.server.HealthGrpcService; import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder; import org.opensearch.dataprepper.plugins.source.otelmetrics.certificate.CertificateProviderFactory; diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource_RetryInfoTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource_RetryInfoTest.java index 32f4570240..59627b70ec 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource_RetryInfoTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource_RetryInfoTest.java @@ -53,6 +53,7 @@ import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import io.opentelemetry.proto.metrics.v1.ScopeMetrics; import io.opentelemetry.proto.resource.v1.Resource; +import org.opensearch.dataprepper.plugins.server.RetryInfoConfig; @ExtendWith(MockitoExtension.class) class OTelMetricsSource_RetryInfoTest { diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OtelMetricsSourceConfigTests.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OtelMetricsSourceConfigTests.java index 3ee075cc03..2f5ee30d00 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OtelMetricsSourceConfigTests.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OtelMetricsSourceConfigTests.java @@ -31,6 +31,7 @@ import static org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsSourceConfig.DEFAULT_PORT; import static org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS; import static org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsSourceConfig.DEFAULT_THREAD_COUNT; +import org.opensearch.dataprepper.plugins.server.RetryInfoConfig; class OtelMetricsSourceConfigTests { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JavaTimeModule()); diff --git a/data-prepper-plugins/otel-trace-source/build.gradle b/data-prepper-plugins/otel-trace-source/build.gradle index 8aa3f58628..4a625ee0ff 100644 --- a/data-prepper-plugins/otel-trace-source/build.gradle +++ b/data-prepper-plugins/otel-trace-source/build.gradle @@ -13,6 +13,7 @@ dependencies { implementation project(':data-prepper-plugins:blocking-buffer') implementation project(':data-prepper-plugins:armeria-common') implementation project(':data-prepper-plugins:otel-proto-common') + implementation project(':data-prepper-plugins:http-common') implementation libs.commons.codec testImplementation project(':data-prepper-api').sourceSets.test.output implementation libs.opentelemetry.proto diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/health/HealthGrpcService.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/health/HealthGrpcService.java deleted file mode 100644 index adce5be0b9..0000000000 --- a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/health/HealthGrpcService.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.health; - -import io.grpc.health.v1.HealthCheckRequest; -import io.grpc.health.v1.HealthCheckResponse; -import io.grpc.health.v1.HealthGrpc; -import io.grpc.stub.StreamObserver; - -public class HealthGrpcService extends HealthGrpc.HealthImplBase { - - @Override - public void check(final HealthCheckRequest request, final StreamObserver responseObserver) { - responseObserver.onNext( - HealthCheckResponse.newBuilder().setStatus(HealthCheckResponse.ServingStatus.SERVING).build()); - responseObserver.onCompleted(); - } -} diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/ConvertConfiguration.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/ConvertConfiguration.java new file mode 100644 index 0000000000..cadb71a996 --- /dev/null +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/ConvertConfiguration.java @@ -0,0 +1,27 @@ +package org.opensearch.dataprepper.plugins.source.oteltrace; + +import org.opensearch.dataprepper.plugins.server.ServerConfiguration; + +public class ConvertConfiguration { + + public static ServerConfiguration convertConfiguration(final OTelTraceSourceConfig oTelTraceSourceConfig) { + ServerConfiguration serverConfiguration = new ServerConfiguration(); + serverConfiguration.setPath(oTelTraceSourceConfig.getPath()); + serverConfiguration.setHealthCheck(oTelTraceSourceConfig.hasHealthCheck()); + serverConfiguration.setProtoReflectionService(oTelTraceSourceConfig.hasProtoReflectionService()); + serverConfiguration.setRequestTimeoutInMillis(oTelTraceSourceConfig.getRequestTimeoutInMillis()); + serverConfiguration.setEnableUnframedRequests(oTelTraceSourceConfig.enableUnframedRequests()); + serverConfiguration.setCompression(oTelTraceSourceConfig.getCompression()); + serverConfiguration.setAuthentication(oTelTraceSourceConfig.getAuthentication()); + serverConfiguration.setSsl(oTelTraceSourceConfig.isSsl()); + serverConfiguration.setUnauthenticatedHealthCheck(oTelTraceSourceConfig.isUnauthenticatedHealthCheck()); + serverConfiguration.setUseAcmCertForSSL(oTelTraceSourceConfig.useAcmCertForSSL()); + serverConfiguration.setMaxRequestLength(oTelTraceSourceConfig.getMaxRequestLength()); + serverConfiguration.setPort(oTelTraceSourceConfig.getPort()); + serverConfiguration.setRetryInfo(oTelTraceSourceConfig.getRetryInfo()); + serverConfiguration.setThreadCount(oTelTraceSourceConfig.getThreadCount()); + serverConfiguration.setMaxConnectionCount(oTelTraceSourceConfig.getMaxConnectionCount()); + + return serverConfiguration; + } +} diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java index bfe2b785ee..a498a63b61 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java @@ -5,29 +5,17 @@ package org.opensearch.dataprepper.plugins.source.oteltrace; -import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction; -import com.linecorp.armeria.common.util.BlockingTaskExecutor; -import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.Server; -import com.linecorp.armeria.server.ServerBuilder; -import com.linecorp.armeria.server.encoding.DecodingService; -import com.linecorp.armeria.server.grpc.GrpcService; -import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; -import com.linecorp.armeria.server.healthcheck.HealthCheckService; import io.grpc.MethodDescriptor; -import io.grpc.ServerInterceptor; -import io.grpc.ServerInterceptors; -import io.grpc.protobuf.services.ProtoReflectionService; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc; -import org.opensearch.dataprepper.GrpcRequestExceptionHandler; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; -import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -35,36 +23,24 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; -import org.opensearch.dataprepper.plugins.certificate.model.Certificate; -import org.opensearch.dataprepper.plugins.health.HealthGrpcService; import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat; import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoOpensearchCodec; import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoStandardCodec; -import org.opensearch.dataprepper.plugins.source.oteltrace.certificate.CertificateProviderFactory; -import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.plugins.otel.codec.OTelTraceDecoder; +import org.opensearch.dataprepper.plugins.server.CreateServer; +import org.opensearch.dataprepper.plugins.server.ServerConfiguration; +import org.opensearch.dataprepper.plugins.source.oteltrace.certificate.CertificateProviderFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.Collections; -import java.util.List; -import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.function.Function; @DataPrepperPlugin(name = "otel_trace_source", pluginType = Source.class, pluginConfigurationType = OTelTraceSourceConfig.class) public class OTelTraceSource implements Source> { + private static final String PLUGIN_NAME = "otel_trace_source"; private static final Logger LOG = LoggerFactory.getLogger(OTelTraceSource.class); - private static final String HTTP_HEALTH_CHECK_PATH = "/health"; - public static final String REGEX_HEALTH = "regex:^/(?!health$).*$"; static final String SERVER_CONNECTIONS = "serverConnections"; - private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}"; - - // Default RetryInfo with minimum 100ms and maximum 2s - private static final RetryInfoConfig DEFAULT_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(100), Duration.ofMillis(2000)); private final OTelTraceSourceConfig oTelTraceSourceConfig; private final PluginMetrics pluginMetrics; @@ -112,89 +88,14 @@ public void start(Buffer> buffer) { pluginMetrics ); - final List serverInterceptors = getAuthenticationInterceptor(); - - final GrpcServiceBuilder grpcServiceBuilder = GrpcService - .builder() - .useClientTimeoutHeader(false) - .useBlockingTaskExecutor(true) - .exceptionHandler(createGrpExceptionHandler()); - - final MethodDescriptor methodDescriptor = TraceServiceGrpc.getExportMethod(); - final String oTelTraceSourcePath = oTelTraceSourceConfig.getPath(); - if (oTelTraceSourcePath != null) { - final String transformedOTelTraceSourcePath = oTelTraceSourcePath.replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); - grpcServiceBuilder.addService(transformedOTelTraceSourcePath, - ServerInterceptors.intercept(oTelTraceGrpcService, serverInterceptors), methodDescriptor); - } else { - grpcServiceBuilder.addService(ServerInterceptors.intercept(oTelTraceGrpcService, serverInterceptors)); - } - - if (oTelTraceSourceConfig.hasHealthCheck()) { - LOG.info("Health check is enabled"); - grpcServiceBuilder.addService(new HealthGrpcService()); - } - - if (oTelTraceSourceConfig.hasProtoReflectionService()) { - LOG.info("Proto reflection service is enabled"); - grpcServiceBuilder.addService(ProtoReflectionService.newInstance()); - } - - grpcServiceBuilder.enableUnframedRequests(oTelTraceSourceConfig.enableUnframedRequests()); - - final ServerBuilder sb = Server.builder(); - sb.disableServerHeader(); - if (CompressionOption.NONE.equals(oTelTraceSourceConfig.getCompression())) { - sb.service(grpcServiceBuilder.build()); - } else { - sb.service(grpcServiceBuilder.build(), DecodingService.newDecorator()); - } - - if (oTelTraceSourceConfig.enableHttpHealthCheck()) { - sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build()); - } - - if (oTelTraceSourceConfig.getAuthentication() != null) { - final Optional> optionalHttpAuthenticationService = - authenticationProvider.getHttpAuthenticationService(); - - if (oTelTraceSourceConfig.isUnauthenticatedHealthCheck()) { - optionalHttpAuthenticationService.ifPresent(httpAuthenticationService -> - sb.decorator(REGEX_HEALTH, httpAuthenticationService)); - } else { - optionalHttpAuthenticationService.ifPresent(sb::decorator); - } - } - - sb.requestTimeoutMillis(oTelTraceSourceConfig.getRequestTimeoutInMillis()); - if(oTelTraceSourceConfig.getMaxRequestLength() != null) { - sb.maxRequestLength(oTelTraceSourceConfig.getMaxRequestLength().getBytes()); - } - - // ACM Cert for SSL takes preference + ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(oTelTraceSourceConfig); + CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, PLUGIN_NAME, pipelineName); + CertificateProvider certificateProvider = null; if (oTelTraceSourceConfig.isSsl() || oTelTraceSourceConfig.useAcmCertForSSL()) { - LOG.info("SSL/TLS is enabled."); - final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); - final Certificate certificate = certificateProvider.getCertificate(); - sb.https(oTelTraceSourceConfig.getPort()).tls( - new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)), - new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8) - ) - ); - } else { - LOG.warn("Creating otel_trace_source without SSL/TLS. This is not secure."); - LOG.warn("In order to set up TLS for the otel_trace_source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-trace-source#ssl"); - sb.http(oTelTraceSourceConfig.getPort()); + certificateProvider = certificateProviderFactory.getCertificateProvider(); } - - sb.maxNumConnections(oTelTraceSourceConfig.getMaxConnectionCount()); - final BlockingTaskExecutor blockingTaskExecutor = BlockingTaskExecutor.builder() - .numThreads(oTelTraceSourceConfig.getThreadCount()) - .threadNamePrefix(pipelineName + "-otel_trace") - .build(); - sb.blockingTaskExecutor(blockingTaskExecutor, true); - - server = sb.build(); + final MethodDescriptor methodDescriptor = TraceServiceGrpc.getExportMethod(); + server = createServer.createGRPCServer(authenticationProvider, oTelTraceGrpcService, certificateProvider, methodDescriptor); pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections); } @@ -213,14 +114,6 @@ public void start(Buffer> buffer) { LOG.info("Started otel_trace_source on port " + oTelTraceSourceConfig.getPort() + "..."); } - private GrpcExceptionHandlerFunction createGrpExceptionHandler() { - RetryInfoConfig retryInfo = oTelTraceSourceConfig.getRetryInfo() != null - ? oTelTraceSourceConfig.getRetryInfo() - : DEFAULT_RETRY_INFO; - - return new GrpcRequestExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay()); - } - @Override public void stop() { if (server != null) { @@ -240,14 +133,6 @@ public void stop() { LOG.info("Stopped otel_trace_source."); } - private List getAuthenticationInterceptor() { - final ServerInterceptor authenticationInterceptor = authenticationProvider.getAuthenticationInterceptor(); - if (authenticationInterceptor == null) { - return Collections.emptyList(); - } - return Collections.singletonList(authenticationInterceptor); - } - private GrpcAuthenticationProvider createAuthenticationProvider(final PluginFactory pluginFactory) { final PluginModel authenticationConfiguration = oTelTraceSourceConfig.getAuthentication(); diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java index abd1e2e26d..a87db080e8 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat; +import org.opensearch.dataprepper.plugins.server.RetryInfoConfig; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.commons.lang3.StringUtils; diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/health/HealthGrpcServiceTest.java b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/health/HealthGrpcServiceTest.java deleted file mode 100644 index 0595159cdf..0000000000 --- a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/health/HealthGrpcServiceTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.health; - -import io.grpc.ManagedChannel; -import io.grpc.Server; -import io.grpc.health.v1.HealthCheckRequest; -import io.grpc.health.v1.HealthCheckResponse; -import io.grpc.health.v1.HealthGrpc; -import io.grpc.inprocess.InProcessChannelBuilder; -import io.grpc.inprocess.InProcessServerBuilder; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.concurrent.TimeUnit; - -import static org.junit.jupiter.api.Assertions.assertEquals; - - -class HealthGrpcServiceTest { - - private Server server; - private ManagedChannel channel; - - @BeforeEach - void setup() throws Exception { - String serverName = InProcessServerBuilder.generateName(); - - server = InProcessServerBuilder.forName(serverName) - .directExecutor() - .addService(new HealthGrpcService()) - .build() - .start(); - - channel = InProcessChannelBuilder.forName(serverName) - .directExecutor() - .build(); - } - - @AfterEach - void teardown() throws Exception { - try { - channel.shutdown(); - server.shutdown(); - - assert channel.awaitTermination(10, TimeUnit.SECONDS) : "channel cannot be gracefully shutdown"; - assert server.awaitTermination(10, TimeUnit.SECONDS) : "server cannot be gracefully shutdown"; - } finally { - channel.shutdownNow(); - server.shutdownNow(); - } - } - - @Test - void testHealthCheckResponse() { - HealthGrpc.HealthBlockingStub blockingStub = HealthGrpc.newBlockingStub(channel); - HealthCheckResponse response = - blockingStub.check(HealthCheckRequest.newBuilder().build()); - - assertEquals(HealthCheckResponse.ServingStatus.SERVING, response.getStatus()); - } -} diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java index 75e0d005b7..39ef2a9b42 100644 --- a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java +++ b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java @@ -74,7 +74,8 @@ import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.opensearch.dataprepper.plugins.codec.CompressionOption; -import org.opensearch.dataprepper.plugins.health.HealthGrpcService; +import org.opensearch.dataprepper.plugins.server.HealthGrpcService; +import org.opensearch.dataprepper.plugins.server.RetryInfoConfig; import org.opensearch.dataprepper.plugins.source.oteltrace.certificate.CertificateProviderFactory; import java.io.ByteArrayOutputStream; diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource_RetryInfoTest.java b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource_RetryInfoTest.java index 7894c45598..e2613d5dc8 100644 --- a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource_RetryInfoTest.java +++ b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource_RetryInfoTest.java @@ -37,6 +37,7 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.GrpcBasicAuthenticationProvider; import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.plugins.server.RetryInfoConfig; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java index f67a637220..f2b30795e0 100644 --- a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java +++ b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java @@ -14,6 +14,7 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.server.RetryInfoConfig; import java.time.Duration; import java.util.HashMap;