Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public Server createHTTPServer(final Buffer<Record<Log>> buffer,
if (serverConfiguration.getMaxRequestLength() != null) {
sb.maxRequestLength(serverConfiguration.getMaxRequestLength().getBytes());
}

final int threads = serverConfiguration.getThreadCount();
final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads);
sb.blockingTaskExecutor(blockingTaskExecutor, true);
Expand Down Expand Up @@ -309,4 +310,4 @@ private List<ServerInterceptor> getAuthenticationInterceptor(
}
return Collections.singletonList(authenticationInterceptor);
}
}
}
3 changes: 3 additions & 0 deletions data-prepper-plugins/otel-logs-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
implementation project(':data-prepper-plugins:blocking-buffer')
implementation project(':data-prepper-plugins:otel-proto-common')
implementation project(':data-prepper-plugins:http-common')
implementation project(':data-prepper-plugins:http-source-common' )
implementation libs.commons.codec
implementation project(':data-prepper-plugins:armeria-common')
testImplementation project(':data-prepper-api').sourceSets.test.output
Expand All @@ -22,6 +23,8 @@ dependencies {
implementation 'software.amazon.awssdk:auth'
implementation 'software.amazon.awssdk:regions'
implementation 'software.amazon.awssdk:s3'
implementation 'org.projectlombok:lombok:1.18.26'
annotationProcessor 'org.projectlombok:lombok:1.18.26'
implementation libs.protobuf.util
implementation libs.armeria.core
implementation libs.armeria.grpc
Expand Down

This file was deleted.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,21 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Size;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

import org.apache.commons.lang3.StringUtils;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat;
import org.opensearch.dataprepper.plugins.server.RetryInfoConfig;

@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OTelLogsSourceConfig {
static final String REQUEST_TIMEOUT = "request_timeout";
static final String PORT = "port";
Expand Down Expand Up @@ -57,6 +65,11 @@ public class OTelLogsSourceConfig {
@Size(min = 1, message = "path length should be at least 1")
private String path;

@Getter
@JsonProperty("http_path")
@Size(min = 1, message = "path length should be at least 1")
private String httpPath;

@JsonProperty(HEALTH_CHECK_SERVICE)
private boolean healthCheck = DEFAULT_HEALTH_CHECK;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ public CertificateProvider getCertificateProvider() {
.overrideConfiguration(metricPublisher -> metricPublisher.addMetricPublisher(new MicrometerMetricPublisher(awsSdkMetrics)))
.build();

return new ACMCertificateProvider(awsCertificateManager, oTelLogsSourceConfig.getAcmCertificateArn(),
oTelLogsSourceConfig.getAcmCertIssueTimeOutMillis(), oTelLogsSourceConfig.getAcmPrivateKeyPassword());
return new ACMCertificateProvider(awsCertificateManager,
oTelLogsSourceConfig.getAcmCertificateArn(),
oTelLogsSourceConfig.getAcmCertIssueTimeOutMillis(),
oTelLogsSourceConfig.getAcmPrivateKeyPassword());
} else if (oTelLogsSourceConfig.isSslCertAndKeyFileInS3()) {
LOG.info("Using S3 to fetch certificate and private key for SSL/TLS.");
final AwsCredentialsProvider credentialsProvider = AwsCredentialsProviderChain.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.otellogs.http;
Comment thread
TomasLongo marked this conversation as resolved.

import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;

import org.opensearch.dataprepper.exceptions.BadRequestException;
import org.opensearch.dataprepper.exceptions.BufferWriteException;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.log.OpenTelemetryLog;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.ConsumesJson;
import com.linecorp.armeria.server.annotation.ConsumesProtobuf;
import com.linecorp.armeria.server.annotation.Post;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;

public class ArmeriaHttpService {
private static final Logger LOG = LoggerFactory.getLogger(ArmeriaHttpService.class);

public static final String REQUESTS_RECEIVED = "requestsReceived";
public static final String SUCCESS_REQUESTS = "successRequests";
public static final String PAYLOAD_SIZE = "payloadSize";
public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration";

private final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder;
private final Buffer<Record<Object>> buffer;

private final int bufferWriteTimeoutInMillis;

private final Counter requestsReceivedCounter;
private final Counter successRequestsCounter;
private final DistributionSummary payloadSizeSummary;
private final Timer requestProcessDuration;

public ArmeriaHttpService(
Buffer<Record<Object>> buffer,
final PluginMetrics pluginMetrics,
final int bufferWriteTimeoutInMillis,
final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder
) {
this.buffer = buffer;
this.oTelProtoDecoder = oTelProtoDecoder;
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;

requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
payloadSizeSummary = pluginMetrics.summary(PAYLOAD_SIZE);
requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION);
}

// no path provided. Will be set by config.
@Post("")
@ConsumesJson
@ConsumesProtobuf
public ExportLogsServiceResponse exportLog(ExportLogsServiceRequest request) {
requestsReceivedCounter.increment();
payloadSizeSummary.record(request.getSerializedSize());

requestProcessDuration.record(() -> processRequest(request));

return ExportLogsServiceResponse.newBuilder().build();
}

private void processRequest(final ExportLogsServiceRequest request) {
final List<OpenTelemetryLog> logs;

try {
logs = oTelProtoDecoder.parseExportLogsServiceRequest(request, Instant.now());
} catch (Exception e) {
LOG.warn(DataPrepperMarkers.SENSITIVE, "Failed to parse the request with error {}. Request body: {}", e, request);
throw new BadRequestException(e.getMessage(), e);
}

try {
if (buffer.isByteBuffer()) {
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
} else {
final List<Record<Object>> records = logs.stream().map(log -> new Record<Object>(log)).collect(Collectors.toList());
buffer.writeAll(records, bufferWriteTimeoutInMillis);
}
} catch (Exception e) {
if (ServiceRequestContext.current().isTimedOut()) {
LOG.warn("Exception writing to buffer but request already timed out.", e);
return;
}

LOG.error("Failed to write the request of size {} due to:", request.toString().length(), e);
throw new BufferWriteException(e.getMessage(), e);
}

if (ServiceRequestContext.current().isTimedOut()) {
LOG.warn("Buffer write completed successfully but request already timed out.");
return;
}

successRequestsCounter.increment();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.otellogs.http;
Comment thread
TomasLongo marked this conversation as resolved.


import java.time.Duration;
import java.util.concurrent.TimeoutException;

import org.opensearch.dataprepper.RetryInfoCalculator;
import org.opensearch.dataprepper.exceptions.BadRequestException;
import org.opensearch.dataprepper.exceptions.BufferWriteException;
import org.opensearch.dataprepper.exceptions.RequestCancelledException;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.rpc.RetryInfo;
import com.linecorp.armeria.common.ContentTooLargeException;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.server.HttpStatusException;
import com.linecorp.armeria.server.RequestTimeoutException;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.micrometer.core.instrument.Counter;

public class HttpExceptionHandler implements ExceptionHandlerFunction {
private static final Logger LOG = LoggerFactory.getLogger(HttpExceptionHandler.class);

static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full.";
public static final String REQUEST_TIMEOUTS = "requestTimeouts";
public static final String BAD_REQUESTS = "badRequests";
public static final String REQUESTS_TOO_LARGE = "requestsTooLarge";
public static final String INTERNAL_SERVER_ERROR = "internalServerError";

private final Counter requestTimeoutsCounter;
private final Counter badRequestsCounter;
private final Counter requestsTooLargeCounter;
private final Counter internalServerErrorCounter;
private final RetryInfoCalculator retryInfoCalculator;

public HttpExceptionHandler(final PluginMetrics pluginMetrics, Duration retryInfoMinDelay, Duration retryInfoMaxDelay) {
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
this.retryInfoCalculator = new RetryInfoCalculator(retryInfoMinDelay, retryInfoMaxDelay);
}

@Override
public HttpResponse handleException(final ServiceRequestContext ctx,
final HttpRequest req,
final Throwable e) {
final Throwable exceptionCause;
if (e instanceof BufferWriteException) {
exceptionCause = e.getCause();
} else if (e instanceof HttpStatusException) {
exceptionCause = e.getCause();
} else {
exceptionCause = e;
}

StatusHolder statusHolder = createStatus(exceptionCause);

try {
JsonFormat.TypeRegistry typeRegistry = JsonFormat.TypeRegistry.newBuilder()
.add(RetryInfo.getDescriptor())
.build();

JsonFormat.Printer printer = JsonFormat.printer().usingTypeRegistry(typeRegistry);
return HttpResponse.of(statusHolder.getHttpStatus(), MediaType.JSON, printer.print(statusHolder.getStatus()));
} catch (InvalidProtocolBufferException ipbe) {
throw new RuntimeException(ipbe);
}
}

private StatusHolder createStatus(Throwable e) {
if (e instanceof RequestTimeoutException || e instanceof TimeoutException) {
requestTimeoutsCounter.increment();
return new StatusHolder(createStatus(e, Status.Code.RESOURCE_EXHAUSTED), createHttpStatusFromProtoBufStatus(Status.Code.RESOURCE_EXHAUSTED));
} else if (e instanceof SizeOverflowException || e instanceof ContentTooLargeException) {
requestsTooLargeCounter.increment();
return new StatusHolder(createStatus(e, Status.Code.RESOURCE_EXHAUSTED), createHttpStatusFromProtoBufStatus(Status.Code.RESOURCE_EXHAUSTED));
} else if (e instanceof BadRequestException) {
badRequestsCounter.increment();
return new StatusHolder(createStatus(e, Status.Code.INVALID_ARGUMENT), createHttpStatusFromProtoBufStatus(Status.Code.INVALID_ARGUMENT));
} else if ((e instanceof StatusRuntimeException) && (e.getMessage().contains("Invalid protobuf byte sequence") || e.getMessage().contains("Can't decode compressed frame"))) {
badRequestsCounter.increment();
return new StatusHolder(createStatus(e, Status.Code.INVALID_ARGUMENT), createHttpStatusFromProtoBufStatus(Status.Code.INVALID_ARGUMENT));
} else if (e instanceof RequestCancelledException) {
requestTimeoutsCounter.increment();
return new StatusHolder(createStatus(e, Status.Code.CANCELLED), createHttpStatusFromProtoBufStatus(Status.Code.CANCELLED));
} else {
LOG.error("Unexpected exception handling http request", e);
internalServerErrorCounter.increment();
return new StatusHolder(createStatus(e, Status.Code.INTERNAL), createHttpStatusFromProtoBufStatus(Status.Code.INTERNAL));
}
}

private HttpStatus createHttpStatusFromProtoBufStatus(Status.Code status) {
if (status == Status.Code.RESOURCE_EXHAUSTED) {
return HttpStatus.INSUFFICIENT_STORAGE;
} else if (status == Status.Code.INVALID_ARGUMENT) {
return HttpStatus.BAD_REQUEST;
} else {
return HttpStatus.INTERNAL_SERVER_ERROR;
}
}

private com.google.rpc.Status createStatus(final Throwable e, final Status.Code code) {
com.google.rpc.Status.Builder builder = com.google.rpc.Status.newBuilder().setCode(code.value());
if (e instanceof RequestTimeoutException) {
builder.setMessage(ARMERIA_REQUEST_TIMEOUT_MESSAGE);
} else {
builder.setMessage(e.getMessage() == null ? code.name() :e.getMessage());
}
if (code == Status.Code.RESOURCE_EXHAUSTED) {
builder.addDetails(Any.pack(retryInfoCalculator.createRetryInfo()));
}
return builder.build();
}

private static class StatusHolder {
private final HttpStatus httpStatus;
private final com.google.rpc.Status status;

public StatusHolder(com.google.rpc.Status status, HttpStatus httpStatus) {
this.httpStatus = httpStatus;
this.status = status;
}

public HttpStatus getHttpStatus() {
return httpStatus;
}

public com.google.rpc.Status getStatus() {
return status;
}
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.otellogs;
Expand Down
Loading
Loading