Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8bd9b4e
init single endpoint
ps48 Apr 7, 2025
b73fca4
add reflection service
ps48 Apr 8, 2025
f209bff
update getmetadata expression to support eventType
ps48 Apr 9, 2025
48db7af
format files
ps48 Apr 11, 2025
2b0f4cd
update config and add health check
ps48 Apr 15, 2025
583a7c0
add http support and update config options
ps48 Apr 17, 2025
ddd9fe4
reset Otel trace source
ps48 Apr 17, 2025
c5f0b51
add tests for healthcheck, auth, requests and config
ps48 Apr 21, 2025
18cfe68
add tests for grpc
ps48 May 4, 2025
dda0cfa
update retry tests
ps48 May 4, 2025
2520c99
update plugin name from otel-telemetry-source to otlp
ps48 May 4, 2025
3886144
update readme with usage details
ps48 May 4, 2025
6afeaf1
reset Otel trace source changes
ps48 May 4, 2025
38a5beb
revert GetMetadataExpressionFunction and OTelTraceSource changes
ps48 May 4, 2025
725fc2a
remove example
ps48 May 4, 2025
763b0f2
update source to use http-common server
ps48 May 6, 2025
865dbd4
added back retry tests, use http-common health check
ps48 May 6, 2025
886f279
update readme with authentication details
ps48 May 6, 2025
91ffa90
fix checkstyle issues
ps48 May 8, 2025
1b75321
add support for OpenSearch formats & update readme
ps48 May 9, 2025
00027e4
remove dupe in settings
ps48 Jun 19, 2025
707889f
remove junit and mockito
ps48 Jun 19, 2025
41b18fb
update config fields and move certs
ps48 Jun 25, 2025
a2da4a0
update tests with new config options
ps48 Jun 25, 2025
10b0968
use data prepper duration and add generic output_format
ps48 Jun 25, 2025
3a9e7be
update timeouts to duration
ps48 Jun 25, 2025
c119aa8
update the output format defaults to null
ps48 Jun 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.dataprepper.plugins.server;


import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction;
import com.linecorp.armeria.common.util.BlockingTaskExecutor;
import com.linecorp.armeria.server.HttpService;
Expand Down Expand Up @@ -40,17 +39,17 @@
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Function;


public class CreateServer {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not so sure about the class name. To me this class looks like a factory of some kind and I would name it as such. But I am also fine with the name it got right now.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Karsten,
The CreateServer abstaction was added in last week by this PR #5653. I can rename it ServerFactory or something similar, but I feel this can be out of scope for this PR.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I have seen when the change was introduced. I would appreciate a renaming at some point in time, but I am fine with not doing it in this PR.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with Karsten, need better name

private final ServerConfiguration serverConfiguration;
private final Logger LOG;
private final PluginMetrics pluginMetrics;
private final PluginMetrics pluginMetrics;
private String sourceName;
private String pipelineName;

Expand All @@ -68,7 +67,53 @@ public CreateServer(final ServerConfiguration serverConfiguration, final Logger
this.pipelineName = pipelineName;
}

public <K, V> Server createGRPCServer(final GrpcAuthenticationProvider authenticationProvider, final BindableService grpcService, final CertificateProvider certificateProvider, final MethodDescriptor<K, V> methodDescriptor) {
/**
* Service configuration class for GRPC services
* @param <K> Request type
* @param <V> Response type
*/
public static class GRPCServiceConfig<K, V> {
private final BindableService service;
private final String path;
private final MethodDescriptor<K, V> methodDescriptor;

public GRPCServiceConfig(BindableService service, String path, MethodDescriptor<K, V> methodDescriptor) {
this.service = service;
this.path = path;
this.methodDescriptor = methodDescriptor;
}

public GRPCServiceConfig(BindableService service) {
this.service = service;
this.path = null;
this.methodDescriptor = null;
}

public BindableService getService() {
return service;
}

public String getPath() {
return path;
}

public MethodDescriptor<K, V> getMethodDescriptor() {
return methodDescriptor;
}
}

/**
* Creates a GRPC server with multiple services, each with its own optional path and method descriptor
* @param authenticationProvider Provider for authentication
* @param grpcServiceConfigs List of service configurations
* @param certificateProvider Provider for SSL/TLS certificates
* @return Configured server
*/
public Server createGRPCServer(
final GrpcAuthenticationProvider authenticationProvider,
final List<GRPCServiceConfig<?, ?>> grpcServiceConfigs,
final CertificateProvider certificateProvider) {

final List<ServerInterceptor> serverInterceptors = getAuthenticationInterceptor(authenticationProvider);

final GrpcServiceBuilder grpcServiceBuilder = GrpcService
Expand All @@ -77,13 +122,20 @@ public <K, V> Server createGRPCServer(final GrpcAuthenticationProvider authentic
.useBlockingTaskExecutor(true)
.exceptionHandler(createGrpExceptionHandler());

final String sourcePath = serverConfiguration.getPath();
if (sourcePath != null) {
final String transformedSourcePath = sourcePath.replace(PIPELINE_NAME_PLACEHOLDER, pipelineName);
grpcServiceBuilder.addService(transformedSourcePath,
ServerInterceptors.intercept(grpcService, serverInterceptors), methodDescriptor);
} else {
grpcServiceBuilder.addService(ServerInterceptors.intercept(grpcService, serverInterceptors));
// Add each service with its own path and method descriptor
for (GRPCServiceConfig<?, ?> serviceConfig : grpcServiceConfigs) {
if (serviceConfig.getPath() != null && serviceConfig.getMethodDescriptor() != null) {
final String transformedPath = serviceConfig.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName);
grpcServiceBuilder.addService(
transformedPath,
ServerInterceptors.intercept(serviceConfig.getService(), serverInterceptors),
serviceConfig.getMethodDescriptor());
LOG.info("Adding service with path: {}", transformedPath);
} else {
grpcServiceBuilder.addService(
ServerInterceptors.intercept(serviceConfig.getService(), serverInterceptors));
LOG.info("Adding service without specific path");
}
}

if (serverConfiguration.hasHealthCheck()) {
Expand Down Expand Up @@ -151,7 +203,35 @@ public <K, V> Server createGRPCServer(final GrpcAuthenticationProvider authentic
return sb.build();
}

public Server createHTTPServer(final Buffer<Record<Log>> buffer, final CertificateProviderFactory certificateProviderFactory, final ArmeriaHttpAuthenticationProvider authenticationProvider, final HttpRequestExceptionHandler httpRequestExceptionHandler, final Object logService) {
/**
* Creates a GRPC server with a single service
* @param authenticationProvider Provider for authentication
* @param grpcService Service to be added
* @param certificateProvider Provider for SSL/TLS certificates
* @param methodDescriptor Method descriptor for the service
* @return Configured server
*/
public <K, V> Server createGRPCServer(
final GrpcAuthenticationProvider authenticationProvider,
final BindableService grpcService,
final CertificateProvider certificateProvider,
final MethodDescriptor<K, V> methodDescriptor) {

List<GRPCServiceConfig<?, ?>> serviceConfigs = new ArrayList<>();
if (serverConfiguration.getPath() != null) {
serviceConfigs.add(new GRPCServiceConfig<>(grpcService, serverConfiguration.getPath(), methodDescriptor));
} else {
serviceConfigs.add(new GRPCServiceConfig<>(grpcService));
}

return createGRPCServer(authenticationProvider, serviceConfigs, certificateProvider);
}


public Server createHTTPServer(final Buffer<Record<Log>> buffer,
final CertificateProviderFactory certificateProviderFactory,
final ArmeriaHttpAuthenticationProvider authenticationProvider,
final HttpRequestExceptionHandler httpRequestExceptionHandler, final Object logService) {
final ServerBuilder sb = Server.builder();

sb.disableServerHeader();
Expand All @@ -163,17 +243,16 @@ public Server createHTTPServer(final Buffer<Record<Log>> buffer, final Certifica
// TODO: enable encrypted key with password
sb.https(serverConfiguration.getPort()).tls(
new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)),
new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)
)
);
new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)));
} else {
LOG.warn("Creating " + sourceName + " without SSL/TLS. This is not secure.");
LOG.warn("In order to set up TLS for the " + sourceName + ", go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl");
sb.http(serverConfiguration.getPort());
}

if(serverConfiguration.getAuthentication() != null) {
final Optional<Function<? super HttpService, ? extends HttpService>> optionalAuthDecorator = authenticationProvider.getAuthenticationDecorator();
if (serverConfiguration.getAuthentication() != null) {
final Optional<Function<? super HttpService, ? extends HttpService>> optionalAuthDecorator = authenticationProvider
.getAuthenticationDecorator();

if (serverConfiguration.isUnauthenticatedHealthCheck()) {
optionalAuthDecorator.ifPresent(authDecorator -> sb.decorator(REGEX_HEALTH, authDecorator));
Expand All @@ -184,7 +263,7 @@ public Server createHTTPServer(final Buffer<Record<Log>> buffer, final Certifica

sb.maxNumConnections(serverConfiguration.getMaxConnectionCount());
sb.requestTimeout(Duration.ofMillis(serverConfiguration.getRequestTimeoutInMillis()));
if(serverConfiguration.getMaxRequestLength() != null) {
if (serverConfiguration.getMaxRequestLength() != null) {
sb.maxRequestLength(serverConfiguration.getMaxRequestLength().getBytes());
}
final int threads = serverConfiguration.getThreadCount();
Expand All @@ -193,15 +272,17 @@ public Server createHTTPServer(final Buffer<Record<Log>> buffer, final Certifica
final int maxPendingRequests = serverConfiguration.getMaxPendingRequests();
final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy(
maxPendingRequests, blockingTaskExecutor.getQueue());
final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics);
final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests,
pluginMetrics);

final String httpSourcePath = serverConfiguration.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName);
sb.decorator(httpSourcePath, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler));

if (CompressionOption.NONE.equals(serverConfiguration.getCompression())) {
sb.annotatedService(httpSourcePath, logService, httpRequestExceptionHandler);
} else {
sb.annotatedService(httpSourcePath, logService, DecodingService.newDecorator(), httpRequestExceptionHandler);
sb.annotatedService(httpSourcePath, logService, DecodingService.newDecorator(),
httpRequestExceptionHandler);
}

if (serverConfiguration.hasHealthCheck()) {
Expand All @@ -220,12 +301,12 @@ private GrpcExceptionHandlerFunction createGrpExceptionHandler() {
return new GrpcRequestExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay());
}

private List<ServerInterceptor> getAuthenticationInterceptor(final GrpcAuthenticationProvider authenticationProvider) {
private List<ServerInterceptor> getAuthenticationInterceptor(
final GrpcAuthenticationProvider authenticationProvider) {
final ServerInterceptor authenticationInterceptor = authenticationProvider.getAuthenticationInterceptor();
if (authenticationInterceptor == null) {
return Collections.emptyList();
}
return Collections.singletonList(authenticationInterceptor);
}
}
//
}
Loading
Loading