Skip to content

Commit 54bb933

Browse files
ps48github-actions[bot]
authored andcommitted
OTLP Source unified endpoint for logs, traces and metrics (#5677)
* init single endpoint Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * add reflection service Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * update getmetadata expression to support eventType Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * format files Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * update config and add health check Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * add http support and update config options Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * reset Otel trace source Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * add tests for healthcheck, auth, requests and config Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * add tests for grpc Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * update retry tests Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * update plugin name from otel-telemetry-source to otlp Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * update readme with usage details Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * reset Otel trace source changes Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * revert GetMetadataExpressionFunction and OTelTraceSource changes Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * remove example Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * update source to use http-common server Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * added back retry tests, use http-common health check Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * update readme with authentication details Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * fix checkstyle issues Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * add support for OpenSearch formats & update readme Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * remove dupe in settings Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * remove junit and mockito Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * update config fields and move certs Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * update tests with new config options Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * use data prepper duration and add generic output_format Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * update timeouts to duration Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> * update the output format defaults to null Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> --------- Signed-off-by: Shenoy Pratik <sgguruda@amazon.com> (cherry picked from commit 5ad289d)
1 parent 51e0626 commit 54bb933

16 files changed

Lines changed: 3661 additions & 42 deletions

File tree

data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/CreateServer.java

Lines changed: 104 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
package org.opensearch.dataprepper.plugins.server;
77

8-
98
import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction;
109
import com.linecorp.armeria.common.util.BlockingTaskExecutor;
1110
import com.linecorp.armeria.server.HttpService;
@@ -40,17 +39,17 @@
4039
import java.io.ByteArrayInputStream;
4140
import java.nio.charset.StandardCharsets;
4241
import java.time.Duration;
42+
import java.util.ArrayList;
4343
import java.util.Collections;
4444
import java.util.List;
4545
import java.util.Optional;
4646
import java.util.concurrent.ScheduledThreadPoolExecutor;
4747
import java.util.function.Function;
4848

49-
5049
public class CreateServer {
5150
private final ServerConfiguration serverConfiguration;
5251
private final Logger LOG;
53-
private final PluginMetrics pluginMetrics;
52+
private final PluginMetrics pluginMetrics;
5453
private String sourceName;
5554
private String pipelineName;
5655

@@ -68,7 +67,53 @@ public CreateServer(final ServerConfiguration serverConfiguration, final Logger
6867
this.pipelineName = pipelineName;
6968
}
7069

71-
public <K, V> Server createGRPCServer(final GrpcAuthenticationProvider authenticationProvider, final BindableService grpcService, final CertificateProvider certificateProvider, final MethodDescriptor<K, V> methodDescriptor) {
70+
/**
71+
* Service configuration class for GRPC services
72+
* @param <K> Request type
73+
* @param <V> Response type
74+
*/
75+
public static class GRPCServiceConfig<K, V> {
76+
private final BindableService service;
77+
private final String path;
78+
private final MethodDescriptor<K, V> methodDescriptor;
79+
80+
public GRPCServiceConfig(BindableService service, String path, MethodDescriptor<K, V> methodDescriptor) {
81+
this.service = service;
82+
this.path = path;
83+
this.methodDescriptor = methodDescriptor;
84+
}
85+
86+
public GRPCServiceConfig(BindableService service) {
87+
this.service = service;
88+
this.path = null;
89+
this.methodDescriptor = null;
90+
}
91+
92+
public BindableService getService() {
93+
return service;
94+
}
95+
96+
public String getPath() {
97+
return path;
98+
}
99+
100+
public MethodDescriptor<K, V> getMethodDescriptor() {
101+
return methodDescriptor;
102+
}
103+
}
104+
105+
/**
106+
* Creates a GRPC server with multiple services, each with its own optional path and method descriptor
107+
* @param authenticationProvider Provider for authentication
108+
* @param grpcServiceConfigs List of service configurations
109+
* @param certificateProvider Provider for SSL/TLS certificates
110+
* @return Configured server
111+
*/
112+
public Server createGRPCServer(
113+
final GrpcAuthenticationProvider authenticationProvider,
114+
final List<GRPCServiceConfig<?, ?>> grpcServiceConfigs,
115+
final CertificateProvider certificateProvider) {
116+
72117
final List<ServerInterceptor> serverInterceptors = getAuthenticationInterceptor(authenticationProvider);
73118

74119
final GrpcServiceBuilder grpcServiceBuilder = GrpcService
@@ -77,13 +122,20 @@ public <K, V> Server createGRPCServer(final GrpcAuthenticationProvider authentic
77122
.useBlockingTaskExecutor(true)
78123
.exceptionHandler(createGrpExceptionHandler());
79124

80-
final String sourcePath = serverConfiguration.getPath();
81-
if (sourcePath != null) {
82-
final String transformedSourcePath = sourcePath.replace(PIPELINE_NAME_PLACEHOLDER, pipelineName);
83-
grpcServiceBuilder.addService(transformedSourcePath,
84-
ServerInterceptors.intercept(grpcService, serverInterceptors), methodDescriptor);
85-
} else {
86-
grpcServiceBuilder.addService(ServerInterceptors.intercept(grpcService, serverInterceptors));
125+
// Add each service with its own path and method descriptor
126+
for (GRPCServiceConfig<?, ?> serviceConfig : grpcServiceConfigs) {
127+
if (serviceConfig.getPath() != null && serviceConfig.getMethodDescriptor() != null) {
128+
final String transformedPath = serviceConfig.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName);
129+
grpcServiceBuilder.addService(
130+
transformedPath,
131+
ServerInterceptors.intercept(serviceConfig.getService(), serverInterceptors),
132+
serviceConfig.getMethodDescriptor());
133+
LOG.info("Adding service with path: {}", transformedPath);
134+
} else {
135+
grpcServiceBuilder.addService(
136+
ServerInterceptors.intercept(serviceConfig.getService(), serverInterceptors));
137+
LOG.info("Adding service without specific path");
138+
}
87139
}
88140

89141
if (serverConfiguration.hasHealthCheck()) {
@@ -151,7 +203,35 @@ public <K, V> Server createGRPCServer(final GrpcAuthenticationProvider authentic
151203
return sb.build();
152204
}
153205

154-
public Server createHTTPServer(final Buffer<Record<Log>> buffer, final CertificateProviderFactory certificateProviderFactory, final ArmeriaHttpAuthenticationProvider authenticationProvider, final HttpRequestExceptionHandler httpRequestExceptionHandler, final Object logService) {
206+
/**
207+
* Creates a GRPC server with a single service
208+
* @param authenticationProvider Provider for authentication
209+
* @param grpcService Service to be added
210+
* @param certificateProvider Provider for SSL/TLS certificates
211+
* @param methodDescriptor Method descriptor for the service
212+
* @return Configured server
213+
*/
214+
public <K, V> Server createGRPCServer(
215+
final GrpcAuthenticationProvider authenticationProvider,
216+
final BindableService grpcService,
217+
final CertificateProvider certificateProvider,
218+
final MethodDescriptor<K, V> methodDescriptor) {
219+
220+
List<GRPCServiceConfig<?, ?>> serviceConfigs = new ArrayList<>();
221+
if (serverConfiguration.getPath() != null) {
222+
serviceConfigs.add(new GRPCServiceConfig<>(grpcService, serverConfiguration.getPath(), methodDescriptor));
223+
} else {
224+
serviceConfigs.add(new GRPCServiceConfig<>(grpcService));
225+
}
226+
227+
return createGRPCServer(authenticationProvider, serviceConfigs, certificateProvider);
228+
}
229+
230+
231+
public Server createHTTPServer(final Buffer<Record<Log>> buffer,
232+
final CertificateProviderFactory certificateProviderFactory,
233+
final ArmeriaHttpAuthenticationProvider authenticationProvider,
234+
final HttpRequestExceptionHandler httpRequestExceptionHandler, final Object logService) {
155235
final ServerBuilder sb = Server.builder();
156236

157237
sb.disableServerHeader();
@@ -163,17 +243,16 @@ public Server createHTTPServer(final Buffer<Record<Log>> buffer, final Certifica
163243
// TODO: enable encrypted key with password
164244
sb.https(serverConfiguration.getPort()).tls(
165245
new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)),
166-
new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)
167-
)
168-
);
246+
new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)));
169247
} else {
170248
LOG.warn("Creating " + sourceName + " without SSL/TLS. This is not secure.");
171249
LOG.warn("In order to set up TLS for the " + sourceName + ", go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl");
172250
sb.http(serverConfiguration.getPort());
173251
}
174252

175-
if(serverConfiguration.getAuthentication() != null) {
176-
final Optional<Function<? super HttpService, ? extends HttpService>> optionalAuthDecorator = authenticationProvider.getAuthenticationDecorator();
253+
if (serverConfiguration.getAuthentication() != null) {
254+
final Optional<Function<? super HttpService, ? extends HttpService>> optionalAuthDecorator = authenticationProvider
255+
.getAuthenticationDecorator();
177256

178257
if (serverConfiguration.isUnauthenticatedHealthCheck()) {
179258
optionalAuthDecorator.ifPresent(authDecorator -> sb.decorator(REGEX_HEALTH, authDecorator));
@@ -184,7 +263,7 @@ public Server createHTTPServer(final Buffer<Record<Log>> buffer, final Certifica
184263

185264
sb.maxNumConnections(serverConfiguration.getMaxConnectionCount());
186265
sb.requestTimeout(Duration.ofMillis(serverConfiguration.getRequestTimeoutInMillis()));
187-
if(serverConfiguration.getMaxRequestLength() != null) {
266+
if (serverConfiguration.getMaxRequestLength() != null) {
188267
sb.maxRequestLength(serverConfiguration.getMaxRequestLength().getBytes());
189268
}
190269
final int threads = serverConfiguration.getThreadCount();
@@ -193,15 +272,17 @@ public Server createHTTPServer(final Buffer<Record<Log>> buffer, final Certifica
193272
final int maxPendingRequests = serverConfiguration.getMaxPendingRequests();
194273
final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy(
195274
maxPendingRequests, blockingTaskExecutor.getQueue());
196-
final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics);
275+
final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests,
276+
pluginMetrics);
197277

198278
final String httpSourcePath = serverConfiguration.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName);
199279
sb.decorator(httpSourcePath, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler));
200280

201281
if (CompressionOption.NONE.equals(serverConfiguration.getCompression())) {
202282
sb.annotatedService(httpSourcePath, logService, httpRequestExceptionHandler);
203283
} else {
204-
sb.annotatedService(httpSourcePath, logService, DecodingService.newDecorator(), httpRequestExceptionHandler);
284+
sb.annotatedService(httpSourcePath, logService, DecodingService.newDecorator(),
285+
httpRequestExceptionHandler);
205286
}
206287

207288
if (serverConfiguration.hasHealthCheck()) {
@@ -220,12 +301,12 @@ private GrpcExceptionHandlerFunction createGrpExceptionHandler() {
220301
return new GrpcRequestExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay());
221302
}
222303

223-
private List<ServerInterceptor> getAuthenticationInterceptor(final GrpcAuthenticationProvider authenticationProvider) {
304+
private List<ServerInterceptor> getAuthenticationInterceptor(
305+
final GrpcAuthenticationProvider authenticationProvider) {
224306
final ServerInterceptor authenticationInterceptor = authenticationProvider.getAuthenticationInterceptor();
225307
if (authenticationInterceptor == null) {
226308
return Collections.emptyList();
227309
}
228310
return Collections.singletonList(authenticationInterceptor);
229311
}
230-
}
231-
//
312+
}

0 commit comments

Comments
 (0)