diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java index f6446d439b..24d2df5510 100644 --- a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java +++ b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java @@ -5,8 +5,6 @@ package org.opensearch.dataprepper.plugins.source.otellogs; -import static org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME; - import com.linecorp.armeria.common.SessionProtocol; import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction; import com.linecorp.armeria.server.Server; @@ -18,7 +16,6 @@ import com.linecorp.armeria.server.throttling.ThrottlingService; import org.opensearch.dataprepper.GrpcRequestExceptionHandler; -import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; import org.opensearch.dataprepper.http.LogThrottlingRejectHandler; import org.opensearch.dataprepper.http.LogThrottlingStrategy; @@ -56,8 +53,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -135,7 +130,7 @@ public void start(Buffer> buffer) { private Server createServer(ServerBuilder serverBuilder, Buffer> buffer) { serverBuilder.disableServerHeader(); - if (oTelLogsSourceConfig.isSsl()) { + if (oTelLogsSourceConfig.isSsl() || oTelLogsSourceConfig.useAcmCertForSSL()) { LOG.info("Creating http source with SSL/TLS enabled."); final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider(); final Certificate certificate = certificateProvider.getCertificate(); @@ -149,11 +144,8 @@ private Server createServer(ServerBuilder serverBuilder, Buffer> serverBuilder.http(oTelLogsSourceConfig.getPort()); } - if (oTelLogsSourceConfig.getAuthentication() != null) { - createHttpAuthentication() - .flatMap(ArmeriaHttpAuthenticationProvider::getAuthenticationDecorator) - .ifPresent(serverBuilder::decorator); - } + final GrpcAuthenticationProvider authProvider = createGrpcAuthenticationProvider(pluginFactory); + authProvider.getHttpAuthenticationService().ifPresent(serverBuilder::decorator); serverBuilder.maxNumConnections(oTelLogsSourceConfig.getMaxConnectionCount()); serverBuilder.requestTimeout(Duration.ofMillis(oTelLogsSourceConfig.getRequestTimeoutInMillis())); @@ -165,13 +157,16 @@ private Server createServer(ServerBuilder serverBuilder, Buffer> ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(threadCount); serverBuilder.blockingTaskExecutor(executor, true); - if (oTelLogsSourceConfig.hasHealthCheck()) { + if ((oTelLogsSourceConfig.enableUnframedRequests() || oTelLogsSourceConfig.getHttpPath() != null) + && oTelLogsSourceConfig.hasHealthCheck()) { LOG.info("HTTP source health check is enabled"); serverBuilder.service(HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build()); } - configureGrpcService(serverBuilder, buffer); - configureHttpService(serverBuilder, buffer, executor.getQueue()); + configureGrpcService(serverBuilder, buffer, authProvider); + if (oTelLogsSourceConfig.getHttpPath() != null) { + configureHttpService(serverBuilder, buffer, executor.getQueue()); + } return serverBuilder.build(); } @@ -200,7 +195,8 @@ private void configureHttpService(ServerBuilder serverBuilder, Buffer> buffer) { + private void configureGrpcService(ServerBuilder serverBuilder, Buffer> buffer, + GrpcAuthenticationProvider authProvider) { LOG.info("Configuring gRPC service"); final GrpcServiceBuilder grpcServiceBuilder = GrpcService @@ -215,7 +211,6 @@ private void configureGrpcService(ServerBuilder serverBuilder, Buffer interceptors = new ArrayList<>(); if (authProvider.getAuthenticationInterceptor() != null) { @@ -296,21 +291,6 @@ private GrpcAuthenticationProvider createGrpcAuthenticationProvider(final Plugin return pluginFactory.loadPlugin(GrpcAuthenticationProvider.class, authenticationPluginSetting); } - private Optional createHttpAuthentication() { - if (oTelLogsSourceConfig.getAuthentication() == null || oTelLogsSourceConfig.getAuthentication().getPluginName().equals(UNAUTHENTICATED_PLUGIN_NAME)) { - LOG.warn("Creating otel_trace_source http service without authentication. This is not secure."); - LOG.warn("In order to set up Http Basic authentication for the otel-trace-source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-trace-source#authentication-configurations"); - return Optional.empty(); - } else { - return Optional.of(createGrpcAuthenticationProvider(oTelLogsSourceConfig.getAuthentication())); - } - } - - private ArmeriaHttpAuthenticationProvider createGrpcAuthenticationProvider(final PluginModel authenticationConfiguration) { - Map pluginSettings = authenticationConfiguration.getPluginSettings(); - return pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, new PluginSetting(authenticationConfiguration.getPluginName(), pluginSettings)); - } - private GrpcExceptionHandlerFunction createGrpExceptionHandler(OTelLogsSourceConfig config) { RetryInfoConfig retryInfo = config.getRetryInfo() != null ? config.getRetryInfo() diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceGrpcTest.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceGrpcTest.java index 5dd67cf9bf..1c221e53de 100644 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceGrpcTest.java +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceGrpcTest.java @@ -113,6 +113,7 @@ import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createBuilderForConfigWithAcmeSsl; import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createConfigBuilderWithBasicAuth; import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createDefaultConfig; +import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createDefaultConfigBuilder; import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createBuilderForConfigWithSsl; import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.BASIC_AUTH_PASSWORD; import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.BASIC_AUTH_USERNAME; @@ -340,6 +341,18 @@ void testStartWithEmptyBuffer() { assertThrows(IllegalStateException.class, () -> source.start(null)); } + @Test + void start_withoutHttpPath_doesNotThrowNPE() { + final OTelLogsSourceConfig config = createDefaultConfigBuilder() + .httpPath(null) + .path("/test-pipeline/v1/logs") + .build(); + final OTelLogsSource source = new OTelLogsSource(config, pluginMetrics, pluginFactory, + certificateProviderFactory, pipelineDescription); + source.start(buffer); + source.stop(); + } + @Test void testStartWithServerExecutionExceptionNoCause() throws ExecutionException, InterruptedException { // Prepare diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceHttpTest.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceHttpTest.java index 779e11db77..1bcd615d49 100644 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceHttpTest.java +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceHttpTest.java @@ -69,7 +69,6 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.verification.VerificationMode; -import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig; import org.opensearch.dataprepper.metrics.MetricsTestUtil; @@ -81,7 +80,6 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.GrpcBasicAuthenticationProvider; -import org.opensearch.dataprepper.plugins.HttpBasicArmeriaHttpAuthenticationProvider; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.plugins.otel.codec.OTelLogsDecoder; @@ -272,8 +270,7 @@ void httpRequest_payloadIsCompressed_returns200() throws IOException { @MethodSource("getBasicAuthTestData") void httpRequest_withBasicAuth_returnsAppropriateResponse(String givenUsername, String givenPassword, HttpStatus expectedStatus, VerificationMode expectedBufferWrites) throws Exception { final HttpBasicAuthenticationConfig basicAuthConfig = new HttpBasicAuthenticationConfig(BASIC_AUTH_USERNAME, BASIC_AUTH_PASSWORD); - final HttpBasicArmeriaHttpAuthenticationProvider authProvider = new HttpBasicArmeriaHttpAuthenticationProvider(basicAuthConfig); - when(pluginFactory.loadPlugin(eq(ArmeriaHttpAuthenticationProvider.class), any(PluginSetting.class))).thenReturn(authProvider); + when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class))).thenReturn(new GrpcBasicAuthenticationProvider(basicAuthConfig)); configureSource(createConfigBuilderWithBasicAuth().build()); SOURCE.start(buffer); diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSource_RetryInfoTest.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSource_RetryInfoTest.java index 05d4d9feaa..bc9b05cfc1 100644 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSource_RetryInfoTest.java +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSource_RetryInfoTest.java @@ -21,6 +21,7 @@ import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.CONFIG_HTTP_PATH; import java.time.Duration; +import java.util.Optional; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -79,7 +80,7 @@ class OtelLogsSource_RetryInfoTest { @BeforeEach void beforeEach() throws Exception { - lenient().when(authenticationProvider.getHttpAuthenticationService()).thenCallRealMethod(); + lenient().when(authenticationProvider.getHttpAuthenticationService()).thenReturn(Optional.empty()); Mockito.lenient().doThrow(SizeOverflowException.class).when(buffer).writeAll(any(), anyInt()); when(oTelLogsSourceConfig.getPort()).thenReturn(DEFAULT_PORT);