-
Notifications
You must be signed in to change notification settings - Fork 326
Streamline source creation #6849
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| * | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.plugins.source.otellogs.grpc; | ||
|
|
||
| import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction; | ||
| import com.linecorp.armeria.server.ServerBuilder; | ||
| import com.linecorp.armeria.server.encoding.DecodingService; | ||
| import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; | ||
| import io.grpc.ServerInterceptor; | ||
| import io.grpc.ServerInterceptors; | ||
| import io.grpc.protobuf.services.ProtoReflectionService; | ||
| import org.opensearch.dataprepper.GrpcRequestExceptionHandler; | ||
| import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; | ||
| import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
| import org.opensearch.dataprepper.plugins.codec.CompressionOption; | ||
| import org.opensearch.dataprepper.plugins.server.HealthGrpcService; | ||
| import org.opensearch.dataprepper.plugins.server.RetryInfoConfig; | ||
| import org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsGrpcService; | ||
| import org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
|
|
||
| public class GrpcServiceConfigurator { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(GrpcServiceConfigurator.class); | ||
| private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}"; | ||
| private static final RetryInfoConfig DEFAULT_RETRY_INFO = | ||
| new RetryInfoConfig(Duration.ofMillis(100), Duration.ofMillis(2000)); | ||
|
|
||
| private final OTelLogsSourceConfig config; | ||
| private final PluginMetrics pluginMetrics; | ||
| private final String pipelineName; | ||
| private final GrpcAuthenticationProvider authenticationProvider; | ||
|
|
||
| public GrpcServiceConfigurator(final OTelLogsSourceConfig config, | ||
| final PluginMetrics pluginMetrics, | ||
| final String pipelineName, | ||
| final GrpcAuthenticationProvider authenticationProvider) { | ||
| this.config = config; | ||
| this.pluginMetrics = pluginMetrics; | ||
| this.pipelineName = pipelineName; | ||
| this.authenticationProvider = authenticationProvider; | ||
| } | ||
|
|
||
| public void configure(final ServerBuilder sb, final OTelLogsGrpcService grpcService) { | ||
| final GrpcServiceBuilder grpcSB = com.linecorp.armeria.server.grpc.GrpcService.builder() | ||
| .useClientTimeoutHeader(false) | ||
| .useBlockingTaskExecutor(true) | ||
| .exceptionHandler(createGrpcExceptionHandler()); | ||
|
|
||
| final List<ServerInterceptor> interceptors = getAuthenticationInterceptors(); | ||
|
|
||
| if (config.getPath() != null) { | ||
| final String path = config.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); | ||
| LOG.info("Configuring gRPC service at custom path: {}", path); | ||
| grpcSB.addService(path, | ||
| ServerInterceptors.intercept(grpcService, interceptors), | ||
| grpcService.getExportMethodDescriptor()); | ||
| } else { | ||
| grpcSB.addService(ServerInterceptors.intercept(grpcService, interceptors)); | ||
| } | ||
|
|
||
| if (config.hasHealthCheck()) { | ||
| LOG.info("gRPC health check service is enabled."); | ||
| grpcSB.addService(new HealthGrpcService()); | ||
| } | ||
|
|
||
| if (config.hasProtoReflectionService()) { | ||
| LOG.info("gRPC proto reflection service is enabled."); | ||
| grpcSB.addService(ProtoReflectionService.newInstance()); | ||
| } | ||
|
|
||
| grpcSB.enableUnframedRequests(config.enableUnframedRequests()); | ||
|
|
||
| final com.linecorp.armeria.server.grpc.GrpcService builtGrpcService = grpcSB.build(); | ||
| if (CompressionOption.NONE.equals(config.getCompression())) { | ||
| sb.service(builtGrpcService); | ||
| } else { | ||
| sb.service(builtGrpcService, DecodingService.newDecorator()); | ||
| } | ||
| } | ||
|
|
||
| private List<ServerInterceptor> getAuthenticationInterceptors() { | ||
| final ServerInterceptor interceptor = authenticationProvider.getAuthenticationInterceptor(); | ||
| return interceptor == null ? Collections.emptyList() : Collections.singletonList(interceptor); | ||
| } | ||
|
|
||
| private GrpcExceptionHandlerFunction createGrpcExceptionHandler() { | ||
| final RetryInfoConfig retryInfo = config.getRetryInfo() != null | ||
| ? config.getRetryInfo() | ||
| : DEFAULT_RETRY_INFO; | ||
| return new GrpcRequestExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay()); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,112 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| * | ||
| */ | ||
|
|
||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They rely on the different config classes though. They need to be taken into consideration.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree the config classes differ, but they all expose the same methods ( |
||
| package org.opensearch.dataprepper.plugins.source.otellogs.http; | ||
|
|
||
| import static org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME; | ||
|
|
||
| import com.linecorp.armeria.server.ServerBuilder; | ||
| import com.linecorp.armeria.server.encoding.DecodingService; | ||
| import com.linecorp.armeria.server.throttling.ThrottlingService; | ||
|
|
||
| import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; | ||
| import org.opensearch.dataprepper.http.LogThrottlingRejectHandler; | ||
| import org.opensearch.dataprepper.http.LogThrottlingStrategy; | ||
| import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
| import org.opensearch.dataprepper.model.buffer.Buffer; | ||
| import org.opensearch.dataprepper.model.configuration.PluginModel; | ||
| import org.opensearch.dataprepper.model.configuration.PluginSetting; | ||
| import org.opensearch.dataprepper.model.plugin.PluginFactory; | ||
| import org.opensearch.dataprepper.model.record.Record; | ||
| import org.opensearch.dataprepper.plugins.codec.CompressionOption; | ||
| import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec; | ||
| import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoOpensearchCodec; | ||
| import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoStandardCodec; | ||
| import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat; | ||
| import org.opensearch.dataprepper.plugins.server.RetryInfoConfig; | ||
| import org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsSourceConfig; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.concurrent.BlockingQueue; | ||
|
|
||
| public class HttpServiceConfigurator { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(HttpServiceConfigurator.class); | ||
| private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}"; | ||
| private static final int MAX_PENDING_REQUESTS = 1024; | ||
| private static final RetryInfoConfig DEFAULT_RETRY_INFO = | ||
| new RetryInfoConfig(Duration.ofMillis(100), Duration.ofMillis(2000)); | ||
|
|
||
| private final OTelLogsSourceConfig config; | ||
| private final PluginMetrics pluginMetrics; | ||
| private final String pipelineName; | ||
| private final PluginFactory pluginFactory; | ||
|
|
||
| public HttpServiceConfigurator(final OTelLogsSourceConfig config, | ||
| final PluginMetrics pluginMetrics, | ||
| final String pipelineName, | ||
| final PluginFactory pluginFactory) { | ||
| this.config = config; | ||
| this.pluginMetrics = pluginMetrics; | ||
| this.pipelineName = pipelineName; | ||
| this.pluginFactory = pluginFactory; | ||
| } | ||
|
|
||
| public void configure(final ServerBuilder sb, | ||
| final Buffer<Record<Object>> buffer, | ||
| final BlockingQueue<Runnable> executorQueue) { | ||
|
TomasLongo marked this conversation as resolved.
|
||
| final String path = config.getHttpPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); | ||
| LOG.info("Configuring HTTP service at path: {}", path); | ||
|
|
||
| final RetryInfoConfig retryInfo = config.getRetryInfo() != null | ||
| ? config.getRetryInfo() | ||
| : DEFAULT_RETRY_INFO; | ||
|
|
||
| final OTelProtoCodec.OTelProtoDecoder decoder = config.getOutputFormat() == OTelOutputFormat.OPENSEARCH | ||
| ? new OTelProtoOpensearchCodec.OTelProtoDecoder() | ||
| : new OTelProtoStandardCodec.OTelProtoDecoder(); | ||
|
|
||
| final ArmeriaHttpService httpService = new ArmeriaHttpService( | ||
| buffer, pluginMetrics, (int) (config.getRequestTimeoutInMillis() * 0.8), decoder); | ||
|
|
||
| final HttpExceptionHandler exceptionHandler = | ||
| new HttpExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay()); | ||
|
|
||
| final LogThrottlingStrategy throttlingStrategy = | ||
| new LogThrottlingStrategy(MAX_PENDING_REQUESTS, executorQueue); | ||
| final LogThrottlingRejectHandler throttlingRejectHandler = | ||
| new LogThrottlingRejectHandler(MAX_PENDING_REQUESTS, pluginMetrics); | ||
| sb.decorator(path, ThrottlingService.newDecorator(throttlingStrategy, throttlingRejectHandler)); | ||
|
|
||
|
TomasLongo marked this conversation as resolved.
|
||
| configureAuthentication(sb); | ||
|
|
||
| if (CompressionOption.NONE.equals(config.getCompression())) { | ||
| sb.annotatedService(path, httpService, exceptionHandler); | ||
| } else { | ||
| sb.annotatedService(path, httpService, DecodingService.newDecorator(), exceptionHandler); | ||
| } | ||
| } | ||
|
|
||
| private void configureAuthentication(final ServerBuilder sb) { | ||
| final PluginModel authConfig = config.getAuthentication(); | ||
| if (authConfig == null || authConfig.getPluginName().equals(UNAUTHENTICATED_PLUGIN_NAME)) { | ||
| LOG.warn("Creating {} HTTP service without authentication. This is not secure.", "otlp_traces"); | ||
| LOG.warn("In order to set up Http Basic authentication for the {}, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-trace-source#authentication-configurations", "otlp_traces"); | ||
| } else { | ||
| final ArmeriaHttpAuthenticationProvider authProvider = pluginFactory.loadPlugin( | ||
| ArmeriaHttpAuthenticationProvider.class, | ||
| new PluginSetting(authConfig.getPluginName(), authConfig.getPluginSettings())); | ||
| authProvider.getAuthenticationDecorator().ifPresent(sb::decorator); | ||
| } | ||
| } | ||
| } | ||
|
|
||
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GrpcServiceConfigurator.java/HttpServiceConfigurator.java. These are nearly identical across otel-trace, otel-metrics, and otel-logs (3 copies each). This is the same code triplicated. Should live in a sharedotel-source-common moduleorotel-proto-common.