Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.dataprepper.plugins.source.otellogs;

import static org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME;

import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction;
import com.linecorp.armeria.server.Server;
Expand All @@ -18,7 +16,6 @@
import com.linecorp.armeria.server.throttling.ThrottlingService;

import org.opensearch.dataprepper.GrpcRequestExceptionHandler;
import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider;
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
import org.opensearch.dataprepper.http.LogThrottlingRejectHandler;
import org.opensearch.dataprepper.http.LogThrottlingStrategy;
Expand Down Expand Up @@ -56,8 +53,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -135,7 +130,7 @@ public void start(Buffer<Record<Object>> buffer) {

private Server createServer(ServerBuilder serverBuilder, Buffer<Record<Object>> buffer) {
serverBuilder.disableServerHeader();
if (oTelLogsSourceConfig.isSsl()) {
if (oTelLogsSourceConfig.isSsl() || oTelLogsSourceConfig.useAcmCertForSSL()) {
LOG.info("Creating http source with SSL/TLS enabled.");
final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();
final Certificate certificate = certificateProvider.getCertificate();
Expand All @@ -149,11 +144,8 @@ private Server createServer(ServerBuilder serverBuilder, Buffer<Record<Object>>
serverBuilder.http(oTelLogsSourceConfig.getPort());
}

if (oTelLogsSourceConfig.getAuthentication() != null) {
createHttpAuthentication()
.flatMap(ArmeriaHttpAuthenticationProvider::getAuthenticationDecorator)
.ifPresent(serverBuilder::decorator);
}
final GrpcAuthenticationProvider authProvider = createGrpcAuthenticationProvider(pluginFactory);
authProvider.getHttpAuthenticationService().ifPresent(serverBuilder::decorator);

serverBuilder.maxNumConnections(oTelLogsSourceConfig.getMaxConnectionCount());
serverBuilder.requestTimeout(Duration.ofMillis(oTelLogsSourceConfig.getRequestTimeoutInMillis()));
Expand All @@ -165,13 +157,16 @@ private Server createServer(ServerBuilder serverBuilder, Buffer<Record<Object>>
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(threadCount);
serverBuilder.blockingTaskExecutor(executor, true);

if (oTelLogsSourceConfig.hasHealthCheck()) {
if ((oTelLogsSourceConfig.enableUnframedRequests() || oTelLogsSourceConfig.getHttpPath() != null)
&& oTelLogsSourceConfig.hasHealthCheck()) {
LOG.info("HTTP source health check is enabled");
serverBuilder.service(HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build());
}

configureGrpcService(serverBuilder, buffer);
configureHttpService(serverBuilder, buffer, executor.getQueue());
configureGrpcService(serverBuilder, buffer, authProvider);
if (oTelLogsSourceConfig.getHttpPath() != null) {
configureHttpService(serverBuilder, buffer, executor.getQueue());
}

return serverBuilder.build();
}
Expand Down Expand Up @@ -200,7 +195,8 @@ private void configureHttpService(ServerBuilder serverBuilder, Buffer<Record<Obj
}
}

private void configureGrpcService(ServerBuilder serverBuilder, Buffer<Record<Object>> buffer) {
private void configureGrpcService(ServerBuilder serverBuilder, Buffer<Record<Object>> buffer,
GrpcAuthenticationProvider authProvider) {
LOG.info("Configuring gRPC service");

final GrpcServiceBuilder grpcServiceBuilder = GrpcService
Expand All @@ -215,7 +211,6 @@ private void configureGrpcService(ServerBuilder serverBuilder, Buffer<Record<Obj
pluginMetrics,
null
);
GrpcAuthenticationProvider authProvider = createGrpcAuthenticationProvider(pluginFactory);

final List<ServerInterceptor> interceptors = new ArrayList<>();
if (authProvider.getAuthenticationInterceptor() != null) {
Expand Down Expand Up @@ -296,21 +291,6 @@ private GrpcAuthenticationProvider createGrpcAuthenticationProvider(final Plugin
return pluginFactory.loadPlugin(GrpcAuthenticationProvider.class, authenticationPluginSetting);
}

private Optional<ArmeriaHttpAuthenticationProvider> createHttpAuthentication() {
if (oTelLogsSourceConfig.getAuthentication() == null || oTelLogsSourceConfig.getAuthentication().getPluginName().equals(UNAUTHENTICATED_PLUGIN_NAME)) {
LOG.warn("Creating otel_trace_source http service without authentication. This is not secure.");
LOG.warn("In order to set up Http Basic authentication for the otel-trace-source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-trace-source#authentication-configurations");
return Optional.empty();
} else {
return Optional.of(createGrpcAuthenticationProvider(oTelLogsSourceConfig.getAuthentication()));
}
}

private ArmeriaHttpAuthenticationProvider createGrpcAuthenticationProvider(final PluginModel authenticationConfiguration) {
Map<String, Object> pluginSettings = authenticationConfiguration.getPluginSettings();
return pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, new PluginSetting(authenticationConfiguration.getPluginName(), pluginSettings));
}

private GrpcExceptionHandlerFunction createGrpExceptionHandler(OTelLogsSourceConfig config) {
RetryInfoConfig retryInfo = config.getRetryInfo() != null
? config.getRetryInfo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createBuilderForConfigWithAcmeSsl;
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createConfigBuilderWithBasicAuth;
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createDefaultConfig;
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createDefaultConfigBuilder;
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createBuilderForConfigWithSsl;
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.BASIC_AUTH_PASSWORD;
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.BASIC_AUTH_USERNAME;
Expand Down Expand Up @@ -340,6 +341,18 @@ void testStartWithEmptyBuffer() {
assertThrows(IllegalStateException.class, () -> source.start(null));
}

@Test
void start_withoutHttpPath_doesNotThrowNPE() {
final OTelLogsSourceConfig config = createDefaultConfigBuilder()
.httpPath(null)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should invert this. Most tests should set httpPath to null and we only have one that sets it.

I'm fine with merging this fix now, but think we should update in a quick follow-on.

.path("/test-pipeline/v1/logs")
.build();
final OTelLogsSource source = new OTelLogsSource(config, pluginMetrics, pluginFactory,
certificateProviderFactory, pipelineDescription);
source.start(buffer);
source.stop();
}

@Test
void testStartWithServerExecutionExceptionNoCause() throws ExecutionException, InterruptedException {
// Prepare
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.verification.VerificationMode;
import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider;
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
Expand All @@ -81,7 +80,6 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.GrpcBasicAuthenticationProvider;
import org.opensearch.dataprepper.plugins.HttpBasicArmeriaHttpAuthenticationProvider;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.otel.codec.OTelLogsDecoder;
Expand Down Expand Up @@ -272,8 +270,7 @@ void httpRequest_payloadIsCompressed_returns200() throws IOException {
@MethodSource("getBasicAuthTestData")
void httpRequest_withBasicAuth_returnsAppropriateResponse(String givenUsername, String givenPassword, HttpStatus expectedStatus, VerificationMode expectedBufferWrites) throws Exception {
final HttpBasicAuthenticationConfig basicAuthConfig = new HttpBasicAuthenticationConfig(BASIC_AUTH_USERNAME, BASIC_AUTH_PASSWORD);
final HttpBasicArmeriaHttpAuthenticationProvider authProvider = new HttpBasicArmeriaHttpAuthenticationProvider(basicAuthConfig);
when(pluginFactory.loadPlugin(eq(ArmeriaHttpAuthenticationProvider.class), any(PluginSetting.class))).thenReturn(authProvider);
when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class))).thenReturn(new GrpcBasicAuthenticationProvider(basicAuthConfig));
configureSource(createConfigBuilderWithBasicAuth().build());
SOURCE.start(buffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.CONFIG_HTTP_PATH;

import java.time.Duration;
import java.util.Optional;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -79,7 +80,7 @@ class OtelLogsSource_RetryInfoTest {

@BeforeEach
void beforeEach() throws Exception {
lenient().when(authenticationProvider.getHttpAuthenticationService()).thenCallRealMethod();
lenient().when(authenticationProvider.getHttpAuthenticationService()).thenReturn(Optional.empty());
Mockito.lenient().doThrow(SizeOverflowException.class).when(buffer).writeAll(any(), anyInt());

when(oTelLogsSourceConfig.getPort()).thenReturn(DEFAULT_PORT);
Expand Down
Loading