diff --git a/data-prepper-plugins/otlp-sink/README.md b/data-prepper-plugins/otlp-sink/README.md new file mode 100644 index 0000000000..09c70d2a9f --- /dev/null +++ b/data-prepper-plugins/otlp-sink/README.md @@ -0,0 +1,176 @@ +# OTLP Sink Plugin + +The OTLP sink plugin sends span data using the OpenTelemetry Protocol (OTLP) format. +The initial release supports exporting spans to AWS X-Ray. Future releases will support sending spans, metrics, and logs +to any OTLP Protobuf-compatible endpoint. + +--- + +## Known Limitations + +- Currently, supports only trace data (spans). Support for metrics and logs will be added in future releases. +- No support for DQL-based loss-less delivery in this release. +- Only AWS X-Ray-compatible OTLP endpoints are currently supported (`https://xray..amazonaws.com/v1/traces`). +- Only OTLP over HTTP is supported; gRPC is not yet supported. + +--- + +## Sample Pipeline Configuration + +### Minimal Configuration (No STS) + +Use this when Data Prepper has permission to write to AWS X-Ray directly. + +```yaml +otlp_pipeline: + source: + otel_trace_source: + + sink: + - otlp: + endpoint: "https://xray.us-west-2.amazonaws.com/v1/traces" + aws: { } +``` + +### Full Configuration with STS + +Use this when assuming a cross-account role is required. + +```yaml +otlp_pipeline: + workers: 2 + + source: + otel_trace_source: + ssl: false + port: 21890 + + buffer: + bounded_blocking: + buffer_size: 1000000 + batch_size: 125000 + + sink: + - otlp: + endpoint: "https://xray.us-west-2.amazonaws.com/v1/traces" + max_retries: 5 + threshold: + max_events: 512 + max_batch_size: 1mb + flush_timeout: 200ms + aws: + sts_role_arn: arn:aws:iam::123456789012:role/MyRole + sts_external_id: external-id-value +``` + +--- + +## Configuration Options + +| Property | Type | Required | Default | Description | +|----------------------------|----------|----------|-----------------------|----------------------------------------------------------------------------------------------------------| +| `endpoint` | `String` | Yes | — | AWS X-Ray OTLP endpoint where spans will be sent. | +| `max_retries` | `int` | No | `5` | Maximum number of retry attempts on HTTP send failures. | +| **threshold** | `Object` | No | — | Controls batching behavior. See below for sub-properties. | +| `threshold.max_events` | `int` | No | `512` (recommended) | Maximum number of spans per batch. Use `0` to disable count-based flushing. Must be ≥ 0. | +| `threshold.max_batch_size` | `String` | No | `1mb` (recommended) | Maximum total payload bytes per batch. Supports human-readable suffixes (`kb`, `mb`). | +| `threshold.flush_timeout` | `String` | No | `200ms` (recommended) | Maximum time to wait before flushing a non-empty batch. Minimum: 1ms (e.g., `200ms`, `1s`) | +| **aws** | `Object` | Yes | — | AWS authentication settings. Use `{}` if no STS role is needed. See below. | +| `aws.sts_role_arn` | `String` | No | — | IAM Role ARN that Data Prepper (or OSI) assumes to send spans to X-Ray on behalf of a customer account. | +| `aws.sts_external_id` | `String` | No | — | External ID to use when assuming the role. Required only if the target IAM role enforces sts:ExternalId. | + +**Additional Notes:** + +- `aws.region` is automatically derived from the endpoint. + +--- + +## Performance Benchmark + +### Summary + +* Sustains ~3.5K TPS with ≤150ms p99 latency on t4g.large. +* Uses only ~8% CPU, ~100MB heap. +* 0 errors, retries, or drops during a 3-hour soak test. + +### Tuning Recommendations + +| Setting | Recommended | Reason | +|------------------|-------------|-------------------------------------------------------------------------------------------------| +| `max_retries` | `5` | Matches AWS SDK default. Gives ~8s of exponential backoff to tolerate transient 503/5xx errors. | +| `max_events` | `512` | Supports up to 3.5K TPS with 2 workers. Keeps p99 latency around 130ms. | +| `max_batch_size` | `1mb` | Aligns with OTEL + AWS X-Ray guidance. Larger batches get split, increasing latency/load. | +| `flush_timeout` | `200ms` | Short enough to avoid delay, long enough to fill batches and keep CPU/GCs low. | + +### Additional Tuning tips + +* Lower `max_events` to **200–400** to reduce latency below 100 ms +* Decrease `flush_timeout` to **100 ms** for faster flushes (with higher CPU/network cost) +* Increase `max_batch_size` to **≥ 8 MB** only if p99 span > 9 KB +* Add pipeline workers if queue saturates at >4K TPS + +### Queue Sizing Rule + +> Queue capacity = max_events * 10 (minimum 2000) +> +> To keep memory usage under ~50MB: +> max_events ≤ 50_000_000 ÷ (10 × p99_span_size_bytes) +> +> Example: With p99 span size of 1 KB, max_events should be ≤ 5000 + + +--- + +## Protocol Details + +* Protocol: OTLP over HTTP +* Content-Type: `application/x-protobuf` +* Compression: `gzip` (enabled by default) + All outgoing HTTP requests use gzip compression to reduce payload size and bandwidth usage. + +--- + +## Delivery Semantics + +Currently, the sink provides at-most-once delivery. Once retries are exhausted, span batches are dropped. +Future releases will support durable queueing via DQL for loss-less guarantees. + +--- + +## Retry Behavior + +- The sink uses an exponential backoff with jitter strategy for retryable HTTP status codes (e.g., 429, 502, 503, 504). +- Maximum number of attempts is controlled by `max_retries`. Once exceeded: + - The span batch is dropped. + - The plugin logs the exception and increments the error metric. +- Non-retryable errors (e.g., 400, 403) are logged and counted immediately without retry. +- Retry logic follows + the [OTLP/HTTP response specification](https://opentelemetry.io/docs/specs/otlp/#otlphttp-response). +- `Retry-After` header is not used for dynamic backoff because: + - Armeria’s retry rule API only supports boolean conditions or fixed `Backoff` strategies. + - Supporting `Retry-After` would require a custom `Backoff` implementation, adding unnecessary complexity. + - The exponential backoff already handles common retry intervals effectively. +--- + +## Logging & Metrics + +* Exceptions are logged with full stack traces. No customer data is logged. +* Metrics are emitted via Micrometer and include: + * recordsIn, recordsOut + * httpLatency, HTTP codes + * errorCount, rejectedSpansCount, failedSpansCount, retriesCount + * queueSize, queueCapacity + * payloadSize, payloadGzipSize + * JVM stats if configured (e.g., heap usage, GC pauses) + +--- + +## Developer Guide + +See the [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) guide for general information on contributions. + +### Run unit tests locally + +```bash +./gradlew :data-prepper-plugins:otlp-sink:test +``` diff --git a/data-prepper-plugins/otlp-sink/build.gradle b/data-prepper-plugins/otlp-sink/build.gradle new file mode 100644 index 0000000000..ef3ff0ac7a --- /dev/null +++ b/data-prepper-plugins/otlp-sink/build.gradle @@ -0,0 +1,88 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +sourceSets { + integrationTest { + java.srcDir file('src/integrationTest/java') + resources.srcDir file('src/integrationTest/resources') + compileClasspath += sourceSets.main.output + runtimeClasspath += sourceSets.main.output + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntimeOnly.extendsFrom testRuntimeOnly +} + +dependencies { + // AWS SDK + implementation 'software.amazon.awssdk:sdk-core' + implementation 'software.amazon.awssdk:auth' + implementation 'software.amazon.awssdk:sts' + implementation 'software.amazon.awssdk:regions' + implementation 'software.amazon.awssdk:http-client-spi' + implementation 'software.amazon.awssdk:apache-client' + + // Hibernate + implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final' + + // OpenTelemetry Protobuf + implementation libs.opentelemetry.proto + implementation libs.protobuf.util + testImplementation libs.opentelemetry.proto + testImplementation libs.protobuf.util + + // Jackson + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' + testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' + + // Lombok + compileOnly 'org.projectlombok:lombok:1.18.30' + annotationProcessor 'org.projectlombok:lombok:1.18.30' + + // Data Prepper Projects + implementation project(':data-prepper-api') + implementation project(':data-prepper-plugins:aws-plugin-api') + implementation project(':data-prepper-plugins:otel-proto-common') + + // Armeria + implementation libs.armeria.core + + // Metrics + implementation 'io.micrometer:micrometer-core' +} + +test { + useJUnitPlatform() + finalizedBy jacocoTestReport, jacocoTestCoverageVerification +} + +tasks.register('integrationTest', Test) { + description = 'Runs integration tests.' + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + classpath = sourceSets.integrationTest.runtimeClasspath + shouldRunAfter test + useJUnitPlatform() +} + +check.dependsOn integrationTest + +jacocoTestCoverageVerification { + violationRules { + rule { + enabled = true + element = 'CLASS' + includes = ['org.opensearch.dataprepper.plugins.sink.otlp.*'] + limit { + counter = 'LINE' + value = 'COVEREDRATIO' + minimum = 1.00 + } + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/OtlpSink.java b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/OtlpSink.java new file mode 100644 index 0000000000..7045e882c3 --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/OtlpSink.java @@ -0,0 +1,95 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.otlp; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.annotations.Experimental; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.AbstractSink; +import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.trace.Span; +import org.opensearch.dataprepper.plugins.sink.otlp.buffer.OtlpSinkBuffer; +import org.opensearch.dataprepper.plugins.sink.otlp.configuration.OtlpSinkConfig; +import org.opensearch.dataprepper.plugins.sink.otlp.metrics.OtlpSinkMetrics; + +import javax.annotation.Nonnull; +import java.util.Collection; + +/** + * OTLP Sink Plugin for Data Prepper. + */ +@Experimental +@DataPrepperPlugin( + name = "otlp", + pluginType = Sink.class, + pluginConfigurationType = OtlpSinkConfig.class +) +public class OtlpSink extends AbstractSink> { + private volatile boolean initialized = false; + + private final OtlpSinkBuffer buffer; + private final OtlpSinkMetrics sinkMetrics; + + /** + * Constructor for the OTLP sink plugin. + * + * @param awsCredentialsSupplier the AWS credentials supplier + * @param config the configuration for the sink + * @param pluginMetrics the plugin metrics to use + * @param pluginSetting the plugin setting to use + */ + @DataPrepperPluginConstructor + public OtlpSink(@Nonnull final AwsCredentialsSupplier awsCredentialsSupplier, @Nonnull final OtlpSinkConfig config, @Nonnull final PluginMetrics pluginMetrics, @Nonnull final PluginSetting pluginSetting) { + super(pluginSetting); + + this.sinkMetrics = new OtlpSinkMetrics(pluginMetrics, pluginSetting); + this.buffer = new OtlpSinkBuffer(awsCredentialsSupplier, config, sinkMetrics); + } + + /** + * Initialize the buffer + */ + @Override + public void doInitialize() { + buffer.start(); + initialized = true; + } + + /** + * Implement the sink's output logic + * + * @param records Records to be output + */ + @Override + public void doOutput(@Nonnull final Collection> records) { + for (final Record record : records) { + buffer.add(record); + } + } + + /** + * Indicates whether this sink is ready to receive data. + * + * @return true if the sink is ready + */ + @Override + public boolean isReady() { + return initialized && buffer.isRunning(); + } + + /** + * Hook called during pipeline shutdown. + */ + @Override + public void shutdown() { + super.shutdown(); + buffer.stop(); + } +} diff --git a/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/buffer/OtlpSinkBuffer.java b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/buffer/OtlpSinkBuffer.java new file mode 100644 index 0000000000..2c1c0e8f04 --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/buffer/OtlpSinkBuffer.java @@ -0,0 +1,202 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.otlp.buffer; + +import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import lombok.Getter; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.trace.Span; +import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoStandardCodec; +import org.opensearch.dataprepper.plugins.sink.otlp.configuration.OtlpSinkConfig; +import org.opensearch.dataprepper.plugins.sink.otlp.http.OtlpHttpSender; +import org.opensearch.dataprepper.plugins.sink.otlp.metrics.OtlpSinkMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.utils.Pair; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * A back-pressure buffer for OTLP sink. + */ +public class OtlpSinkBuffer { + private static final Logger LOG = LoggerFactory.getLogger(OtlpSinkBuffer.class); + private static final int SAFETY_FACTOR = 10; + private static final int MIN_QUEUE_CAPACITY = 2000; + + private final BlockingQueue> queue; + private final OTelProtoStandardCodec.OTelProtoEncoder encoder; + private final OtlpHttpSender sender; + private final OtlpSinkMetrics sinkMetrics; + + private final int maxEvents; + private final long maxBatchBytes; + private final long flushTimeoutMillis; + + private final ExecutorService executor; + + @Getter + private volatile boolean running = true; + + /** + * Creates a new OTLP sink buffer. + * + * @param awsCredentialsSupplier the AWS credentials supplier + * @param config the OTLP sink configuration + * @param sinkMetrics the metrics collector to use + */ + public OtlpSinkBuffer(@Nonnull final AwsCredentialsSupplier awsCredentialsSupplier, @Nonnull final OtlpSinkConfig config, @Nonnull final OtlpSinkMetrics sinkMetrics) { + this(config, sinkMetrics, new OTelProtoStandardCodec.OTelProtoEncoder(), new OtlpHttpSender(awsCredentialsSupplier, config, sinkMetrics)); + } + + /** + * Visible for testing only: constructs an OTLP sink buffer with injected encoder and sender. + */ + @VisibleForTesting + OtlpSinkBuffer(@Nonnull final OtlpSinkConfig config, + @Nonnull final OtlpSinkMetrics sinkMetrics, + @Nonnull final OTelProtoStandardCodec.OTelProtoEncoder encoder, + @Nonnull final OtlpHttpSender sender) { + + this.sinkMetrics = sinkMetrics; + this.encoder = encoder; + this.sender = sender; + + this.maxEvents = config.getMaxEvents(); + this.maxBatchBytes = config.getMaxBatchSize(); + this.flushTimeoutMillis = config.getFlushTimeoutMillis(); + + this.queue = new LinkedBlockingQueue<>(getQueueCapacity()); + sinkMetrics.registerQueueGauges(queue); + + this.executor = Executors.newSingleThreadExecutor(r -> { + final Thread t = new Thread(() -> { + try { + r.run(); + } catch (final Throwable t1) { + LOG.error("Worker thread crashed unexpectedly", t1); + sinkMetrics.incrementErrorsCount(); + restartWorker(); + } + }, "otlp-sink-buffer-thread"); + t.setDaemon(false); + return t; + }); + } + + private int getQueueCapacity() { + return Math.max(maxEvents * SAFETY_FACTOR, MIN_QUEUE_CAPACITY); + } + + public void start() { + running = true; + executor.execute(this::run); + } + + public void stop() { + running = false; + executor.shutdownNow(); + } + + @VisibleForTesting + void restartWorker() { + if (running && !executor.isShutdown()) { + LOG.info("Restarting OTLP sink buffer worker thread"); + executor.execute(this::run); + } + } + + /** + * Enqueues a span record for later batching and sending. + *

+ * This will block if the internal queue is full, guaranteeing + * lossless delivery during normal operations. + * On interruption, the span is still rejected and + * error metrics are incremented. + * + * @param record the span record to enqueue + */ + public void add(final Record record) { + try { + queue.put(record); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Interrupted while enqueuing span", e); + sinkMetrics.incrementFailedSpansCount(1); + sinkMetrics.incrementErrorsCount(); + } + } + + /** + * Worker loop that batches spans by count, size, or time and then flushes them. + *

+ * Continues running as long as {@link #running} is true or the queue is not empty. + * Handles encoding failures, timeout-based flush, and final flush on shutdown. + */ + private void run() { + final List> batch = new ArrayList<>(); + long batchSize = 0; + long lastFlush = System.currentTimeMillis(); + + while (true) { + try { + final long now = System.currentTimeMillis(); + final Record record = queue.poll(100, TimeUnit.MILLISECONDS); + + if (record != null) { + try { + final ResourceSpans resourceSpans = encoder.convertToResourceSpans(record.getData()); + final EventHandle eventHandle = record.getData().getEventHandle(); + batch.add(Pair.of(resourceSpans, eventHandle)); + batchSize += resourceSpans.getSerializedSize(); + } catch (final Exception e) { + LOG.error("Failed to encode span, skipping", e); + sinkMetrics.incrementFailedSpansCount(1); + sinkMetrics.incrementErrorsCount(); + } + } + + final boolean flushBySize = (maxEvents > 0 && batch.size() >= maxEvents) || batchSize >= maxBatchBytes; + final boolean flushByTime = !batch.isEmpty() && (now - lastFlush >= flushTimeoutMillis); + + if (flushBySize || flushByTime) { + sender.send(batch); + batch.clear(); + batchSize = 0; + lastFlush = now; + } + + if (!running && queue.isEmpty()) { + break; + } + + } catch (final InterruptedException e) { + if (running) { + LOG.debug("Worker interrupted while polling, continuing..."); + sinkMetrics.incrementErrorsCount(); + } + + // Continue to loop if still running + } + } + + // Final flush + if (!batch.isEmpty()) { + sender.send(batch); + batch.clear(); + } + } +} diff --git a/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/configuration/OtlpSinkConfig.java b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/configuration/OtlpSinkConfig.java new file mode 100644 index 0000000000..3a7e698aa1 --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/configuration/OtlpSinkConfig.java @@ -0,0 +1,124 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.otlp.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotBlank; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.opensearch.dataprepper.aws.api.AwsConfig; +import software.amazon.awssdk.regions.Region; + +import java.net.URI; +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Configuration class for the OTLP sink plugin. + * This class defines the configuration options available when setting up + * the OTLP sink in Data Prepper pipelines. + *

+ * Note that {@code @Getter} is applied at the field level (not the class level) + * to preserve encapsulation and maintain control over exposed configuration data. + *

+ * This class is automatically wired by the Data Prepper framework during pipeline initialization. + */ +@NoArgsConstructor +public class OtlpSinkConfig { + + @Getter + @JsonProperty("endpoint") + @NotBlank(message = "endpoint is required") + private String endpoint; + + @Getter + @JsonProperty("max_retries") + @Min(value = 0) + private int maxRetries = 5; + + /** + * The threshold configuration for sending spans to the OTLP endpoint. + * This field is kept private and its contents should be accessed via the generated getter methods. + * Using eager-default values and allows the configuration to be optional in the pipeline configuration. + */ + @JsonProperty("threshold") + @Valid + private ThresholdConfig thresholdConfig = new ThresholdConfig(); + + public int getMaxEvents() { + return thresholdConfig.getMaxEvents(); + } + + public long getMaxBatchSize() { + return thresholdConfig.getMaxBatchSize().getBytes(); + } + + public long getFlushTimeoutMillis() { + return thresholdConfig.getFlushTimeout().toMillis(); + } + + /** + * AWS authentication configuration. + * This field is kept private and its contents should be accessed via the generated getter methods. + */ + @JsonProperty("aws") + @Valid + private AwsConfig awsConfig; + + /** + * Get AWS region from the provided endpoint. + * + * @return the AWS region + */ + public Region getAwsRegion() { + try { + final String host = URI.create(this.endpoint).getHost(); + if (host == null) { + throw new IllegalArgumentException(); + } + + final Set knownRegions = Region.regions().stream() + .map(Region::id) + .collect(Collectors.toSet()); + + return Arrays.stream(host.split("\\.")) + .filter(knownRegions::contains) + .map(Region::of) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("No valid AWS region found in endpoint: " + endpoint)); + } catch (final Exception e) { + throw new IllegalArgumentException("Failed to parse AWS region from endpoint: " + endpoint, e); + } + } + + public String getStsRoleArn() { + if (awsConfig == null || awsConfig.getAwsStsRoleArn() == null) { + return null; + } + + return awsConfig.getAwsStsRoleArn(); + } + + public String getStsExternalId() { + if (awsConfig == null || awsConfig.getAwsStsExternalId() == null) { + return null; + } + + return awsConfig.getAwsStsExternalId(); + } + + /** + * Validate the AWS configuration. + * This method ensures breaking change in future release where non-AWS OTLP endpoints are supported. + */ + @AssertTrue + boolean isAwsConfigValid() { + return awsConfig != null; + } +} diff --git a/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/configuration/ThresholdConfig.java b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/configuration/ThresholdConfig.java new file mode 100644 index 0000000000..d4c87c0c7c --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/configuration/ThresholdConfig.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.otlp.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Min; +import lombok.Getter; +import lombok.NoArgsConstructor; +import org.hibernate.validator.constraints.time.DurationMin; +import org.opensearch.dataprepper.model.types.ByteCount; + +import java.time.Duration; + +/** + * Configuration class for threshold settings. + * This class will be automatically wired by Data-Prepper. + */ +@NoArgsConstructor +@Getter +class ThresholdConfig { + + /** + * Max number of spans per batch. + * Use 0 to disable event-count based flushing (unbounded). + */ + @JsonProperty("max_events") + @Min(value = 0, message = "max_events must be 0 (unbounded) or greater") + private int maxEvents = 512; + + @JsonProperty("max_batch_size") + private ByteCount maxBatchSize = ByteCount.parse("1mb"); + + @JsonProperty("flush_timeout") + @DurationMin(millis = 1, message = "flush_timeout must be at least 1ms") + private Duration flushTimeout = Duration.ofMillis(200); +} diff --git a/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/http/GzipCompressor.java b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/http/GzipCompressor.java new file mode 100644 index 0000000000..44a14134cd --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/http/GzipCompressor.java @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.otlp.http; + +import com.google.common.annotations.VisibleForTesting; +import org.opensearch.dataprepper.plugins.sink.otlp.metrics.OtlpSinkMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.function.Function; +import java.util.zip.GZIPOutputStream; + +/** + * Perform GZIP-compression on OTLP byte payloads. + */ +class GzipCompressor implements Function { + private static final Logger LOG = LoggerFactory.getLogger(GzipCompressor.class); + private final OtlpSinkMetrics sinkMetrics; + + /** + * Constructor for the GzipCompressor. + * + * @param sinkMetrics The metrics for the OTLP sink plugin. + */ + GzipCompressor(final OtlpSinkMetrics sinkMetrics) { + this.sinkMetrics = sinkMetrics; + } + + /** + * Compresses the provided payload using GZIP compression. + * + * @param payload The payload to be compressed. + * @return Optional containing the compressed payload, or empty if compression failed. + */ + @Override + public byte[] apply(final byte[] payload) { + try { + return compressInternal(payload); + } catch (final IOException e) { + LOG.error("Failed to compress payload", e); + sinkMetrics.incrementErrorsCount(); + return new byte[0]; + } + } + + /** + * Internal method to enable mocked-testing. + */ + @VisibleForTesting + byte[] compressInternal(final byte[] payload) throws IOException { + try (final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final GZIPOutputStream gzip = new GZIPOutputStream(out)) { + + gzip.write(payload); + gzip.finish(); + return out.toByteArray(); + } + } +} diff --git a/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/http/OtlpHttpSender.java b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/http/OtlpHttpSender.java new file mode 100644 index 0000000000..013fe6730f --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/http/OtlpHttpSender.java @@ -0,0 +1,236 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.otlp.http; + +import com.google.common.annotations.VisibleForTesting; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.client.retry.Backoff; +import com.linecorp.armeria.client.retry.RetryRuleWithContent; +import com.linecorp.armeria.client.retry.RetryingClient; +import com.linecorp.armeria.client.retry.RetryingClientBuilder; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.common.RequestHeadersBuilder; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.plugins.sink.otlp.configuration.OtlpSinkConfig; +import org.opensearch.dataprepper.plugins.sink.otlp.metrics.OtlpSinkMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.SdkHttpFullRequest; +import software.amazon.awssdk.utils.Pair; + +import javax.annotation.Nonnull; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Responsible for sending signed OTLP Protobuf requests to OTLP endpoint using an Ameria client. + */ +public class OtlpHttpSender { + private static final Logger LOG = LoggerFactory.getLogger(OtlpHttpSender.class); + private static final Set RETRYABLE_STATUS_CODES = Set.of(429, 502, 503, 504); + + private final SigV4Signer signer; + private final WebClient webClient; + private final OtlpSinkMetrics sinkMetrics; + private final Function gzipCompressor; + + /** + * Constructor for the OtlpHttpSender. + * + * @param awsCredentialsSupplier the AWS credentials supplier + * @param config The configuration for the OTLP sink plugin. + * @param sinkMetrics The metrics for the OTLP sink plugin. + */ + public OtlpHttpSender(@Nonnull final AwsCredentialsSupplier awsCredentialsSupplier, @Nonnull final OtlpSinkConfig config, @Nonnull final OtlpSinkMetrics sinkMetrics) { + this(sinkMetrics, new GzipCompressor(sinkMetrics), new SigV4Signer(awsCredentialsSupplier, config), buildWebClient(config)); + } + + /** + * Constructor for unit testing with injected dependencies. + */ + @VisibleForTesting + OtlpHttpSender(@Nonnull final OtlpSinkMetrics sinkMetrics, @Nonnull final Function gzipCompressor, + final SigV4Signer signer, final WebClient webClient) { + + this.sinkMetrics = sinkMetrics; + this.gzipCompressor = gzipCompressor; + this.signer = signer; + this.webClient = webClient; + } + + /** + * Builds a WebClient with retry logic for known OTLP retryable status codes. + *

+ * Retries on 429, 502, 503, and 504 per OTEL spec. + * See: OTLP/HTTP Response + *

+ * We are not using the Retry-After header for dynamic backoff because: + * - Armeria’s retry rule API expects a boolean decision or fixed Backoff. + * - Applying Retry-After semantics would require a custom Backoff implementation, + * adding complexity with minimal benefit for most OTLP endpoints. + * - Our exponential backoff already handles typical retry intervals gracefully. + */ + private static WebClient buildWebClient(final OtlpSinkConfig config) { + final RetryRuleWithContent retryRule = RetryRuleWithContent.builder() + .onStatus((ctx, status) -> RETRYABLE_STATUS_CODES.contains(status.code())) + .thenBackoff(Backoff.exponential(100, 10_000).withJitter(0.2)); + + final long estimatedContentLimit = Math.max(1, config.getMaxBatchSize()) * (config.getMaxRetries() + 1); + final int safeContentLimit = (int) Math.min(estimatedContentLimit, Integer.MAX_VALUE); + + final RetryingClientBuilder retryingClientBuilder = RetryingClient.builder(retryRule, safeContentLimit) + .maxTotalAttempts(config.getMaxRetries() + 1); + + final long httpTimeoutMs = Math.min( + Math.max(config.getFlushTimeoutMillis() * 2, 3_000), 10_000 + ); + + return WebClient.builder() + .decorator(retryingClientBuilder.newDecorator()) + .responseTimeoutMillis(httpTimeoutMs) + .maxResponseLength(safeContentLimit) + .build(); + } + + /** + * Sends the provided OTLP Protobuf payload to the OTLP endpoint asynchronously. + * + * @param batch the batch of spans to send + */ + public void send(@Nonnull final List> batch) { + if (batch.isEmpty()) { + return; + } + + // Defensive copy to avoid ConcurrentModificationException + final List> immutableBatch = List.copyOf(batch); + + final Pair payloadAndCompressedPayload = getPayloadAndCompressedPayload(immutableBatch); + final int spans = immutableBatch.size(); + if (payloadAndCompressedPayload.right().length == 0) { + sinkMetrics.incrementFailedSpansCount(spans); + releaseAllEventHandle(immutableBatch, false); + return; + } + + final HttpRequest request = buildHttpRequest(payloadAndCompressedPayload.right()); + final long startTime = System.currentTimeMillis(); + + webClient.execute(request) + .aggregate() + .thenAccept(response -> { + final long latency = System.currentTimeMillis() - startTime; + sinkMetrics.recordHttpLatency(latency); + sinkMetrics.incrementPayloadSize(payloadAndCompressedPayload.left().length); + sinkMetrics.incrementPayloadGzipSize(payloadAndCompressedPayload.right().length); + + final int statusCode = response.status().code(); + final byte[] responseBytes = response.content().array(); + handleResponse(statusCode, responseBytes, immutableBatch); + }) + .exceptionally(e -> { + LOG.error("Failed to send {} spans.", spans, e); + sinkMetrics.incrementRejectedSpansCount(spans); + releaseAllEventHandle(immutableBatch, false); + return null; + }); + } + + private Pair getPayloadAndCompressedPayload(final List> batch) { + final ExportTraceServiceRequest request = ExportTraceServiceRequest.newBuilder() + .addAllResourceSpans(batch.stream().map(Pair::left).collect(Collectors.toList())) + .build(); + final byte[] payload = request.toByteArray(); + final byte[] compressedPayload = gzipCompressor.apply(payload); + + return Pair.of(payload, compressedPayload); + } + + private HttpRequest buildHttpRequest(final byte[] compressedPayload) { + final SdkHttpFullRequest signedRequest = signer.signRequest(compressedPayload); + + final RequestHeadersBuilder headersBuilder = RequestHeaders.builder() + .method(HttpMethod.POST) + .scheme(signedRequest.getUri().getScheme()) + .path(signedRequest.getUri().getRawPath()) + .authority(signedRequest.getUri().getAuthority()); + + // ONLY use the signed headers + signedRequest.headers().forEach((k, vList) -> vList.forEach(v -> headersBuilder.add(k, v))); + return HttpRequest.of(headersBuilder.build(), HttpData.wrap(compressedPayload)); + } + + private void handleResponse(final int statusCode, final byte[] responseBytes, final List> batch) { + sinkMetrics.recordResponseCode(statusCode); + + if (statusCode >= 200 && statusCode < 300) { + handleSuccessfulResponse(responseBytes, batch); + return; + } + + final String responseBody = responseBytes != null + ? new String(responseBytes, StandardCharsets.UTF_8) + : ""; + + LOG.error("Non-successful OTLP response. Status: {}, Response: {}", statusCode, responseBody); + sinkMetrics.incrementRejectedSpansCount(batch.size()); + releaseAllEventHandle(batch, false); + } + + /** + * Handles a successful OTLP response with partial success. + */ + private void handleSuccessfulResponse(final byte[] responseBytes, final List> batch) { + final int spans = batch.size(); + if (responseBytes == null) { + sinkMetrics.incrementRecordsOut(spans); + releaseAllEventHandle(batch, true); + return; + } + + try { + final ExportTraceServiceResponse otlpResponse = ExportTraceServiceResponse.parseFrom(responseBytes); + + if (otlpResponse.hasPartialSuccess()) { + final var partial = otlpResponse.getPartialSuccess(); + final long rejectedSpans = partial.getRejectedSpans(); + final String errorMessage = partial.getErrorMessage(); + if (rejectedSpans > 0) { + LOG.error("OTLP Partial Success: rejectedSpans={}, message={}", rejectedSpans, errorMessage); + sinkMetrics.incrementRejectedSpansCount(rejectedSpans); + } + + final long deliveredSpans = spans - rejectedSpans; + sinkMetrics.incrementRecordsOut(deliveredSpans); + + // Optimistically release all as true, no per-span granularity + releaseAllEventHandle(batch, true); + } else { + sinkMetrics.incrementRecordsOut(spans); + releaseAllEventHandle(batch, true); + } + } catch (final Exception e) { + LOG.error("Could not parse OTLP response as ExportTraceServiceResponse: {}", e.getMessage()); + sinkMetrics.incrementErrorsCount(); + sinkMetrics.incrementRecordsOut(spans); + releaseAllEventHandle(batch, true); + } + } + + private void releaseAllEventHandle(@Nonnull final List> batch, final boolean success) { + batch.forEach(pair -> pair.right().release(success)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/http/SigV4Signer.java b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/http/SigV4Signer.java new file mode 100644 index 0000000000..9b1b0becab --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/http/SigV4Signer.java @@ -0,0 +1,76 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.otlp.http; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.sink.otlp.configuration.OtlpSinkConfig; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.signer.Aws4Signer; +import software.amazon.awssdk.auth.signer.params.Aws4SignerParams; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.http.SdkHttpFullRequest; +import software.amazon.awssdk.http.SdkHttpMethod; +import software.amazon.awssdk.regions.Region; + +import javax.annotation.Nonnull; +import java.net.URI; + +/** + * Helper class to apply AWS SigV4 signing to outgoing HTTP requests + * before sending them to the AWS OTLP endpoint. + */ +class SigV4Signer { + private static final String SERVICE_NAME = "xray"; + private static final String OTLP_PATH = "/v1/traces"; + private final Aws4Signer signer = Aws4Signer.create(); + + private final AwsCredentialsProvider credentialsProvider; + private final Region region; + private final URI endpointUri; + + /** + * Constructs a SigV4 signer helper. + * + * @param awsCredentialsSupplier the AWS credentials supplier + * @param config Configuration for region and optional STS role + */ + SigV4Signer(@Nonnull final AwsCredentialsSupplier awsCredentialsSupplier, @Nonnull final OtlpSinkConfig config) { + this.region = config.getAwsRegion(); + + this.credentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder() + .withRegion(region) + .withStsRoleArn(config.getStsRoleArn()) + .withStsExternalId(config.getStsExternalId()) + .build()); + + this.endpointUri = config.getEndpoint() != null + ? URI.create(config.getEndpoint()) + : URI.create(String.format("https://xray.%s.amazonaws.com%s", region.id(), OTLP_PATH)); + } + + /** + * Signs a request payload using AWS SigV4 and returns a fully signed request. + * + * @param payload The OTLP Protobuf-encoded request body to be sent + * @return A signed {@link SdkHttpFullRequest} ready for transmission to the AWS OTLP endpoint + */ + SdkHttpFullRequest signRequest(@Nonnull final byte[] payload) { + final SdkHttpFullRequest unsignedRequest = SdkHttpFullRequest.builder() + .method(SdkHttpMethod.POST) + .uri(endpointUri) + .putHeader("Content-Type", "application/x-protobuf") + .putHeader("Content-Encoding", "gzip") + .contentStreamProvider(() -> SdkBytes.fromByteArray(payload).asInputStream()) + .build(); + + return signer.sign(unsignedRequest, Aws4SignerParams.builder() + .signingRegion(region) + .signingName(SERVICE_NAME) + .awsCredentials(credentialsProvider.resolveCredentials()) + .build()); + } +} diff --git a/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/metrics/OtlpSinkMetrics.java b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/metrics/OtlpSinkMetrics.java new file mode 100644 index 0000000000..0079c8b356 --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/otlp/metrics/OtlpSinkMetrics.java @@ -0,0 +1,138 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.otlp.metrics; + +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; + +import javax.annotation.Nonnull; +import java.time.Duration; +import java.util.concurrent.BlockingQueue; + +/** + * A central metrics facade for the OTLP sink plugin. + *

+ * All OTLP sink components should use this class to record and expose common metrics + * such as record counts, latencies, payload sizes, and queue statistics. + *

+ */ +public class OtlpSinkMetrics { + + private final PluginMetrics pluginMetrics; + private final Timer httpLatency; + private final DistributionSummary payloadSize; + private final DistributionSummary payloadGzipSize; + + /** + * Constructor for OtlpSinkMetrics + * + * @param pluginMetrics The plugin metrics + * @param pluginSetting The plugin setting + */ + public OtlpSinkMetrics(@Nonnull final PluginMetrics pluginMetrics, @Nonnull final PluginSetting pluginSetting) { + this.pluginMetrics = pluginMetrics; + + final String pipelineName = pluginSetting.getPipelineName(); + final String pluginName = pluginSetting.getName(); + + httpLatency = buildLatencyTimer(pipelineName, pluginName, "httpLatency"); + + payloadSize = buildDistributionSummary(pipelineName, pluginName, "payloadSize"); + payloadGzipSize = buildDistributionSummary(pipelineName, pluginName, "payloadGzipSize"); + } + + /** + * Builds a timer for latency metrics with percentiles + * + * @param pipelineName The pipeline name + * @param pluginName The plugin name + * @param metricName The metric name + * @return The timer + */ + private static Timer buildLatencyTimer(@Nonnull final String pipelineName, @Nonnull final String pluginName, @Nonnull final String metricName) { + return Timer.builder(String.format("%s_%s_%s", pipelineName, pluginName, metricName)) + .publishPercentiles(0.5, 0.9, 0.95, 0.99, 1.0) + .publishPercentileHistogram(true) + .distributionStatisticBufferLength(1024) + .distributionStatisticExpiry(Duration.ofMinutes(10)) + .register(Metrics.globalRegistry); + } + + /** + * Builds a distribution summary for payload size metrics with percentiles + * + * @param pipelineName The pipeline name + * @param pluginName The plugin name + * @param metricName The metric name + * @return The distribution summary + */ + private static DistributionSummary buildDistributionSummary(@Nonnull final String pipelineName, @Nonnull final String pluginName, @Nonnull final String metricName) { + return DistributionSummary.builder(String.format("%s_%s_%s", pipelineName, pluginName, metricName)) + .baseUnit("bytes") + .publishPercentiles(0.5, 0.9, 0.95, 0.99, 1.0) + .publishPercentileHistogram(true) + .distributionStatisticBufferLength(1024) + .distributionStatisticExpiry(Duration.ofMinutes(10)) + .register(Metrics.globalRegistry); + } + + public void incrementRecordsOut(final long count) { + pluginMetrics.counter("recordsOut").increment(count); + } + + public void incrementErrorsCount() { + pluginMetrics.counter("errorsCount").increment(1); + } + + public void incrementPayloadSize(final long bytes) { + payloadSize.record(bytes); + } + + public void incrementPayloadGzipSize(final long bytes) { + payloadGzipSize.record(bytes); + } + + public void recordHttpLatency(final long durationMillis) { + httpLatency.record(Duration.ofMillis(durationMillis)); + } + + public void registerQueueGauges(final BlockingQueue queue) { + pluginMetrics.gauge("queueSize", queue, BlockingQueue::size); + pluginMetrics.gauge("queueCapacity", queue, q -> q.remainingCapacity() + q.size()); + } + + /** + * Increments the count of spans that were explicitly rejected by the OTLP endpoint. + * + * @param count The number of spans rejected. + */ + public void incrementRejectedSpansCount(final long count) { + pluginMetrics.counter("rejectedSpansCount").increment(count); + } + + /** + * Increments the count of spans that failed to be processed by the sink. + * + * @param count The number of spans failed. + */ + public void incrementFailedSpansCount(final long count) { + pluginMetrics.counter("failedSpansCount").increment(count); + } + + /** + * Records the response code in the metrics. + * Group response codes by category: 2xx, 4xx, 5xx, etc. + * + * @param statusCode The HTTP response code. + */ + public void recordResponseCode(final int statusCode) { + final String codeCategory = (statusCode / 100) + "xx"; + pluginMetrics.counter("http_" + codeCategory + "_responses").increment(); + } +} diff --git a/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/OtlpSinkTest.java b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/OtlpSinkTest.java new file mode 100644 index 0000000000..d6ed2da99a --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/OtlpSinkTest.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.otlp; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.trace.Span; +import org.opensearch.dataprepper.plugins.sink.otlp.buffer.OtlpSinkBuffer; +import org.opensearch.dataprepper.plugins.sink.otlp.configuration.OtlpSinkConfig; +import software.amazon.awssdk.regions.Region; + +import java.lang.reflect.Field; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +class OtlpSinkTest { + private OtlpSink target; + private OtlpSinkBuffer mockBuffer; + private OtlpSinkConfig mockConfig; + private PluginMetrics mockMetrics; + private PluginSetting mockSetting; + private AwsCredentialsSupplier mockAwsCredSupplier; + + @BeforeEach + void setUp() throws Exception { + // Arrange: stub out config, metrics, setting + mockAwsCredSupplier = mock(AwsCredentialsSupplier.class); + mockConfig = mock(OtlpSinkConfig.class); + when(mockConfig.getAwsRegion()).thenReturn(Region.of("us-west-2")); + when(mockConfig.getEndpoint()).thenReturn("https://localhost/v1/traces"); + + mockMetrics = mock(PluginMetrics.class); + + mockSetting = mock(PluginSetting.class); + when(mockSetting.getPipelineName()).thenReturn("pipeline"); + when(mockSetting.getName()).thenReturn("otlp"); + + // Create the real sink + target = new OtlpSink(mockAwsCredSupplier, mockConfig, mockMetrics, mockSetting); + + // Replace its private buffer with a mock + mockBuffer = mock(OtlpSinkBuffer.class); + final Field bufferField = OtlpSink.class.getDeclaredField("buffer"); + bufferField.setAccessible(true); + bufferField.set(target, mockBuffer); + } + + @Test + void testInitialize_startsBuffer() { + // Act + target.initialize(); + + // Assert + verify(mockBuffer).start(); + } + + @Test + void testOutput_addsEveryRecordToBuffer() { + // Arrange + @SuppressWarnings("unchecked") final Record r1 = mock(Record.class); + @SuppressWarnings("unchecked") final Record r2 = mock(Record.class); + + // Act + target.output(List.of(r1, r2)); + + // Assert + verify(mockBuffer).add(r1); + verify(mockBuffer).add(r2); + verifyNoMoreInteractions(mockBuffer); + } + + @Test + void testIsReady_returnsTrueOnlyAfterInitialization() { + when(mockBuffer.isRunning()).thenReturn(true); + + // Not initialized yet + assertFalse(target.isReady()); + + // Initialize, which sets 'initialized = true' and starts the buffer + target.initialize(); + assertTrue(target.isReady()); + + // Now simulate buffer being not running + when(mockBuffer.isRunning()).thenReturn(false); + assertFalse(target.isReady()); + } + + @Test + void testShutdown_stopsBuffer() { + // Act + target.shutdown(); + + // Assert + verify(mockBuffer).stop(); + } + + @Test + void testConstructor_doesNotThrow() { + // Just ensure the three-arg constructor still works + assertDoesNotThrow(() -> new OtlpSink(mockAwsCredSupplier, mockConfig, mockMetrics, mockSetting)); + } +} diff --git a/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/buffer/OtlpSinkBufferTest.java b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/buffer/OtlpSinkBufferTest.java new file mode 100644 index 0000000000..ec20a6473d --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/buffer/OtlpSinkBufferTest.java @@ -0,0 +1,563 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.otlp.buffer; + +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.trace.Span; +import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoStandardCodec; +import org.opensearch.dataprepper.plugins.sink.otlp.configuration.OtlpSinkConfig; +import org.opensearch.dataprepper.plugins.sink.otlp.http.OtlpHttpSender; +import org.opensearch.dataprepper.plugins.sink.otlp.metrics.OtlpSinkMetrics; +import software.amazon.awssdk.regions.Region; + +import java.lang.reflect.Field; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class OtlpSinkBufferTest { + + private OtlpSinkConfig config; + private OtlpSinkMetrics metrics; + private OTelProtoStandardCodec.OTelProtoEncoder encoder; + private OtlpHttpSender sender; + private OtlpSinkBuffer buffer; + private AwsCredentialsSupplier mockAwsCredSupplier; + + @BeforeEach + void setUp() { + config = mock(OtlpSinkConfig.class); + when(config.getEndpoint()).thenReturn("https://localhost/v1/traces"); + when(config.getMaxEvents()).thenReturn(2); + when(config.getMaxRetries()).thenReturn(2); + when(config.getMaxBatchSize()).thenReturn(1_000_000L); + when(config.getFlushTimeoutMillis()).thenReturn(10L); + when(config.getAwsRegion()).thenReturn(Region.of("us-west-2")); + + metrics = mock(OtlpSinkMetrics.class); + encoder = mock(OTelProtoStandardCodec.OTelProtoEncoder.class); + sender = mock(OtlpHttpSender.class); + mockAwsCredSupplier = mock(AwsCredentialsSupplier.class); + + buffer = new OtlpSinkBuffer(config, metrics, encoder, sender); + } + + @AfterEach + void tearDown() { + if (buffer != null) { + buffer.stop(); + } + } + + @Test + void testStartAndStopDoesNotThrow() { + buffer.start(); + assertTrue(buffer.isRunning()); + buffer.stop(); + assertFalse(buffer.isRunning()); + } + + @Test + void testPublicConstructorInitializesWithDefaults() { + final OtlpSinkBuffer defaultBuffer = new OtlpSinkBuffer(mockAwsCredSupplier, config, metrics); + assertNotNull(defaultBuffer); + assertTrue(defaultBuffer.isRunning()); + defaultBuffer.stop(); + } + + @Test + void testQueueCapacityCalculation_withHighMaxEvents() { + when(config.getMaxEvents()).thenReturn(500); + final OtlpSinkBuffer highCapacityBuffer = new OtlpSinkBuffer(config, metrics, encoder, sender); + assertNotNull(highCapacityBuffer); + highCapacityBuffer.stop(); + } + + @Test + void testQueueCapacityCalculation_withLowMaxEvents() { + when(config.getMaxEvents()).thenReturn(1); + final OtlpSinkBuffer lowCapacityBuffer = new OtlpSinkBuffer(config, metrics, encoder, sender); + assertNotNull(lowCapacityBuffer); + lowCapacityBuffer.stop(); + } + + @Test + void testQueueGaugesRegistration() { + verify(metrics).registerQueueGauges(any(BlockingQueue.class)); + } + + @Test + void testAddHandlesInterruptedException() throws Exception { + @SuppressWarnings("unchecked") final BlockingQueue> badQueue = mock(BlockingQueue.class); + doThrow(new InterruptedException()).when(badQueue).put(any()); + + final Field queueField = OtlpSinkBuffer.class.getDeclaredField("queue"); + queueField.setAccessible(true); + queueField.set(buffer, badQueue); + + final Record rec = mock(Record.class); + buffer.add(rec); + + verify(metrics).incrementFailedSpansCount(1); + verify(metrics).incrementErrorsCount(); + } + + @Test + void testWorkerThreadHandlesEncodeException() throws Exception { + // First call throws exception, second call succeeds + when(encoder.convertToResourceSpans(any(Span.class))) + .thenThrow(new RuntimeException("boom")) + .thenReturn(ResourceSpans.getDefaultInstance()); + + final Record rec1 = createMockRecord(); + final Record rec2 = createMockRecord(); + + buffer.start(); + buffer.add(rec1); + buffer.add(rec2); + + // Wait for processing to complete + await().atMost(2, SECONDS).untilAsserted(() -> { + verify(metrics).incrementFailedSpansCount(1); + verify(metrics, atLeastOnce()).incrementErrorsCount(); + }); + + buffer.stop(); + + // The second record should have been processed and sent + await().atMost(1, SECONDS).untilAsserted(() -> verify(sender).send(anyList())); + } + + @Test + void testFlushByEventCount() throws Exception { + when(config.getMaxEvents()).thenReturn(2); + when(config.getMaxBatchSize()).thenReturn(Long.MAX_VALUE); + when(config.getFlushTimeoutMillis()).thenReturn(Long.MAX_VALUE); + + final ResourceSpans resourceSpans = ResourceSpans.getDefaultInstance(); + when(encoder.convertToResourceSpans(any(Span.class))).thenReturn(resourceSpans); + + buffer = new OtlpSinkBuffer(config, metrics, encoder, sender); + buffer.start(); + + final Record rec1 = createMockRecord(); + final Record rec2 = createMockRecord(); + + buffer.add(rec1); + buffer.add(rec2); + + await().atMost(2, SECONDS).untilAsserted(() -> verify(sender).send(anyList())); + buffer.stop(); + } + + @Test + void testFlushByBatchSize() throws Exception { + when(config.getMaxEvents()).thenReturn(Integer.MAX_VALUE); + when(config.getMaxBatchSize()).thenReturn(100L); + when(config.getFlushTimeoutMillis()).thenReturn(Long.MAX_VALUE); + + final ResourceSpans largeResourceSpans = mock(ResourceSpans.class); + when(largeResourceSpans.getSerializedSize()).thenReturn(150); + when(encoder.convertToResourceSpans(any(Span.class))).thenReturn(largeResourceSpans); + + buffer = new OtlpSinkBuffer(config, metrics, encoder, sender); + buffer.start(); + + final Record rec = createMockRecord(); + buffer.add(rec); + + await().atMost(2, SECONDS).untilAsserted(() -> verify(sender).send(anyList())); + buffer.stop(); + } + + @Test + void testFlushByTimeout() throws Exception { + when(config.getMaxEvents()).thenReturn(Integer.MAX_VALUE); + when(config.getMaxBatchSize()).thenReturn(Long.MAX_VALUE); + when(config.getFlushTimeoutMillis()).thenReturn(50L); + + final ResourceSpans resourceSpans = ResourceSpans.getDefaultInstance(); + when(encoder.convertToResourceSpans(any(Span.class))).thenReturn(resourceSpans); + + buffer = new OtlpSinkBuffer(config, metrics, encoder, sender); + buffer.start(); + + final Record rec = createMockRecord(); + buffer.add(rec); + + await().atMost(2, SECONDS).untilAsserted(() -> verify(sender).send(anyList())); + buffer.stop(); + } + + @Test + void testWorkerLoopWithEmptyQueue() throws Exception { + when(config.getMaxEvents()).thenReturn(Integer.MAX_VALUE); + when(config.getMaxBatchSize()).thenReturn(Long.MAX_VALUE); + when(config.getFlushTimeoutMillis()).thenReturn(Long.MAX_VALUE); + + buffer = new OtlpSinkBuffer(config, metrics, encoder, sender); + buffer.start(); + + TimeUnit.MILLISECONDS.sleep(200); // Let it poll empty queue + + buffer.stop(); + verify(sender, never()).send(anyList()); + } + + @Test + void testEventHandleIncludedInBatch() throws Exception { + final ResourceSpans resourceSpans = ResourceSpans.getDefaultInstance(); + when(encoder.convertToResourceSpans(any(Span.class))).thenReturn(resourceSpans); + + final EventHandle eventHandle = mock(EventHandle.class); + final Record rec = mock(Record.class); + final Span span = mock(Span.class); + when(rec.getData()).thenReturn(span); + when(span.getEventHandle()).thenReturn(eventHandle); + + buffer.start(); + buffer.add(rec); + + TimeUnit.MILLISECONDS.sleep(100); + buffer.stop(); + + await().atMost(1, SECONDS).untilAsserted(() -> verify(sender).send(anyList())); + verify(span).getEventHandle(); + } + + @Test + void testUncaughtExceptionHandler_logsAndRestarts_actualThread() throws Exception { + final ExecutorService crashingExecutor = Executors.newSingleThreadExecutor(r -> { + final Thread t = new Thread(() -> { + throw new RuntimeException("forced crash"); + }, "otlp-sink-buffer-thread"); + t.setUncaughtExceptionHandler((thread, ex) -> { + metrics.incrementErrorsCount(); + buffer.restartWorker(); + }); + return t; + }); + + final Field executorField = OtlpSinkBuffer.class.getDeclaredField("executor"); + executorField.setAccessible(true); + executorField.set(buffer, crashingExecutor); + + buffer.start(); + + await().atMost(1, SECONDS).untilAsserted(() -> verify(metrics, atLeastOnce()).incrementErrorsCount()); + + crashingExecutor.shutdownNow(); + } + + @Test + void testRestartWorkerDoesNotSubmitIfShutdown() throws Exception { + final ExecutorService mockExecutor = mock(ExecutorService.class); + when(mockExecutor.isShutdown()).thenReturn(true); + + final Field executorField = OtlpSinkBuffer.class.getDeclaredField("executor"); + executorField.setAccessible(true); + executorField.set(buffer, mockExecutor); + + buffer.restartWorker(); + + verify(mockExecutor, never()).execute(any()); + } + + @Test + void testRestartWorkerSubmitsRunnableIfRunning() throws Exception { + final ExecutorService mockExecutor = mock(ExecutorService.class); + when(mockExecutor.isShutdown()).thenReturn(false); + + final Field executorField = OtlpSinkBuffer.class.getDeclaredField("executor"); + executorField.setAccessible(true); + executorField.set(buffer, mockExecutor); + + buffer.restartWorker(); + + verify(mockExecutor, atLeastOnce()).execute(any()); + } + + @Test + void testRestartWorkerDoesNotSubmitIfNotRunning() throws Exception { + final ExecutorService mockExecutor = mock(ExecutorService.class); + when(mockExecutor.isShutdown()).thenReturn(false); + + final Field executorField = OtlpSinkBuffer.class.getDeclaredField("executor"); + executorField.setAccessible(true); + executorField.set(buffer, mockExecutor); + + buffer.stop(); // Set running to false + + buffer.restartWorker(); + + verify(mockExecutor, never()).execute(any()); + } + + @Test + void testRunWithInterruptedException() throws Exception { + final OtlpSinkBuffer localBuffer = new OtlpSinkBuffer(config, metrics, encoder, sender); + + // Override the internal queue to throw InterruptedException + final BlockingQueue> interruptingQueue = mock(BlockingQueue.class); + when(interruptingQueue.poll(anyLong(), any(TimeUnit.class))).thenThrow(new InterruptedException()); + when(interruptingQueue.isEmpty()).thenReturn(true); + + final Field queueField = OtlpSinkBuffer.class.getDeclaredField("queue"); + queueField.setAccessible(true); + queueField.set(localBuffer, interruptingQueue); + + localBuffer.start(); + + TimeUnit.MILLISECONDS.sleep(100); // Let the thread hit the poll + + localBuffer.stop(); + + await().atMost(1, SECONDS).untilAsserted(() -> + verify(metrics, atLeastOnce()).incrementErrorsCount() + ); + } + + @Test + void testRunWithInterruptedException_whileRunning() throws Exception { + final OtlpSinkBuffer localBuffer = new OtlpSinkBuffer(config, metrics, encoder, sender); + + // Override the internal queue to throw InterruptedException first, then return null + final BlockingQueue> interruptingQueue = mock(BlockingQueue.class); + when(interruptingQueue.poll(anyLong(), any(TimeUnit.class))) + .thenThrow(new InterruptedException()) + .thenReturn(null); + when(interruptingQueue.isEmpty()).thenReturn(false, true); + + final Field queueField = OtlpSinkBuffer.class.getDeclaredField("queue"); + queueField.setAccessible(true); + queueField.set(localBuffer, interruptingQueue); + + localBuffer.start(); + + TimeUnit.MILLISECONDS.sleep(200); // Let the thread hit the poll and continue + + localBuffer.stop(); + + await().atMost(2, SECONDS).untilAsserted(() -> + verify(metrics, atLeastOnce()).incrementErrorsCount() + ); + } + + @Test + void testWorkerThreadCatchThrowable_andRestart_fromEncoder() throws Exception { + final OTelProtoStandardCodec.OTelProtoEncoder crashingEncoder = mock(OTelProtoStandardCodec.OTelProtoEncoder.class); + when(crashingEncoder.convertToResourceSpans(any())).thenAnswer(invocation -> { + throw new AssertionError("simulated fatal crash"); + }); + + buffer = new OtlpSinkBuffer(config, metrics, crashingEncoder, sender); + buffer.start(); + + final Record record = createMockRecord(); + buffer.add(record); + + await().atMost(2, SECONDS).untilAsserted(() -> { + verify(metrics, atLeastOnce()).incrementErrorsCount(); + }); + } + + @Test + void testFinalFlushAfterStopFlushesBatch() throws Exception { + // Configure buffer to *not* flush by size or time + when(config.getMaxEvents()).thenReturn(1000); // very high + when(config.getMaxBatchSize()).thenReturn(Long.MAX_VALUE); // very high + when(config.getFlushTimeoutMillis()).thenReturn(Long.MAX_VALUE); // effectively disables time-based flush + + final ResourceSpans resourceSpans = ResourceSpans.getDefaultInstance(); + when(encoder.convertToResourceSpans(any())).thenReturn(resourceSpans); + + buffer = new OtlpSinkBuffer(config, metrics, encoder, sender); + buffer.start(); + + final Record record = createMockRecord(); + buffer.add(record); + + // Wait briefly to ensure the record is picked up + TimeUnit.MILLISECONDS.sleep(100); + + // Stop should trigger the final flush + buffer.stop(); + + // Verify final flush triggered send + await().atMost(1, SECONDS).untilAsserted(() -> + verify(sender).send(anyList()) + ); + } + + @Test + void testNoFinalFlushWhenBatchIsEmpty() throws Exception { + when(config.getMaxEvents()).thenReturn(1000); + when(config.getMaxBatchSize()).thenReturn(Long.MAX_VALUE); + when(config.getFlushTimeoutMillis()).thenReturn(Long.MAX_VALUE); + + buffer = new OtlpSinkBuffer(config, metrics, encoder, sender); + buffer.start(); + + // Don't add any records + TimeUnit.MILLISECONDS.sleep(50); + buffer.stop(); + + // Verify no send was called since batch was empty + verify(sender, never()).send(anyList()); + } + + @Test + void testMaxEventsZeroDoesNotTriggerFlush() throws Exception { + when(config.getMaxEvents()).thenReturn(0); + when(config.getMaxBatchSize()).thenReturn(Long.MAX_VALUE); + when(config.getFlushTimeoutMillis()).thenReturn(Long.MAX_VALUE); + + final ResourceSpans resourceSpans = ResourceSpans.getDefaultInstance(); + when(encoder.convertToResourceSpans(any())).thenReturn(resourceSpans); + + buffer = new OtlpSinkBuffer(config, metrics, encoder, sender); + buffer.start(); + + final Record record = createMockRecord(); + buffer.add(record); + + TimeUnit.MILLISECONDS.sleep(100); + buffer.stop(); + + // Should only flush on final flush, not by count + await().atMost(1, SECONDS).untilAsserted(() -> + verify(sender, times(1)).send(anyList()) + ); + } + + @Test + void testDaemonThreadConfiguration() { + // This test verifies that the thread is created as non-daemon + // We can't directly test this, but we can verify the thread factory is called + buffer.start(); + assertTrue(buffer.isRunning()); + buffer.stop(); + } + + @Test + void testBatchSizeAccumulation() throws Exception { + when(config.getMaxEvents()).thenReturn(Integer.MAX_VALUE); + when(config.getMaxBatchSize()).thenReturn(50L); + when(config.getFlushTimeoutMillis()).thenReturn(Long.MAX_VALUE); + + final ResourceSpans resourceSpans1 = mock(ResourceSpans.class); + final ResourceSpans resourceSpans2 = mock(ResourceSpans.class); + when(resourceSpans1.getSerializedSize()).thenReturn(20); + when(resourceSpans2.getSerializedSize()).thenReturn(35); + + when(encoder.convertToResourceSpans(any(Span.class))) + .thenReturn(resourceSpans1) + .thenReturn(resourceSpans2); + + buffer = new OtlpSinkBuffer(config, metrics, encoder, sender); + buffer.start(); + + final Record rec1 = createMockRecord(); + final Record rec2 = createMockRecord(); + + buffer.add(rec1); + buffer.add(rec2); + + // Total size: 20 + 35 = 55, which exceeds 50 + await().atMost(1, SECONDS).untilAsserted(() -> verify(sender).send(anyList())); + buffer.stop(); + } + + @Test + void testMultipleFlushCycles() throws Exception { + when(config.getMaxEvents()).thenReturn(1); + when(config.getMaxBatchSize()).thenReturn(Long.MAX_VALUE); + when(config.getFlushTimeoutMillis()).thenReturn(Long.MAX_VALUE); + + final ResourceSpans resourceSpans = ResourceSpans.getDefaultInstance(); + when(encoder.convertToResourceSpans(any())).thenReturn(resourceSpans); + + buffer = new OtlpSinkBuffer(config, metrics, encoder, sender); + buffer.start(); + + final Record rec1 = createMockRecord(); + final Record rec2 = createMockRecord(); + final Record rec3 = createMockRecord(); + + buffer.add(rec1); + buffer.add(rec2); + buffer.add(rec3); + + await().atMost(2, SECONDS).untilAsserted(() -> verify(sender, times(3)).send(anyList())); + buffer.stop(); + } + + @Test + void testEncodingFailureDoesNotStopProcessing() throws Exception { + // Configure to process multiple records + when(config.getMaxEvents()).thenReturn(2); + when(config.getMaxBatchSize()).thenReturn(Long.MAX_VALUE); + when(config.getFlushTimeoutMillis()).thenReturn(Long.MAX_VALUE); + + // First and third succeed, second fails + when(encoder.convertToResourceSpans(any(Span.class))) + .thenReturn(ResourceSpans.getDefaultInstance()) + .thenThrow(new RuntimeException("encoding error")) + .thenReturn(ResourceSpans.getDefaultInstance()); + + buffer = new OtlpSinkBuffer(config, metrics, encoder, sender); + buffer.start(); + + final Record rec1 = createMockRecord(); + final Record rec2 = createMockRecord(); + final Record rec3 = createMockRecord(); + + buffer.add(rec1); + buffer.add(rec2); + buffer.add(rec3); + + // Should still send the batch with the 2 successful records + await().atMost(2, SECONDS).untilAsserted(() -> { + verify(sender).send(anyList()); + verify(metrics).incrementFailedSpansCount(1); + verify(metrics, atLeastOnce()).incrementErrorsCount(); + }); + + buffer.stop(); + } + + private Record createMockRecord() { + final Record record = mock(Record.class); + final Span span = mock(Span.class); + final EventHandle eventHandle = mock(EventHandle.class); + when(record.getData()).thenReturn(span); + when(span.getEventHandle()).thenReturn(eventHandle); + return record; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/configuration/OtlpSinkConfigTest.java b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/configuration/OtlpSinkConfigTest.java new file mode 100644 index 0000000000..85363165b7 --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/configuration/OtlpSinkConfigTest.java @@ -0,0 +1,267 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.otlp.configuration; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.types.ByteCount; +import software.amazon.awssdk.regions.Region; + +import java.io.IOException; +import java.time.Duration; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class OtlpSinkConfigTest { + + private static ObjectMapper mapper; + + private static final String EXPECTED_ENDPOINT = "https://example.com/otlp"; + private static final int DEFAULT_MAX_RETRIES = 5; + + private static final int CUSTOM_MAX_EVENTS = 100; + private static final String CUSTOM_BATCH_SIZE = "2mb"; + private static final long CUSTOM_BATCH_BYTES = ByteCount.parse(CUSTOM_BATCH_SIZE).getBytes(); + private static final long CUSTOM_FLUSH_TIMEOUT = 500L; + + private static final int DEFAULT_MAX_EVENTS = 512; + private static final long DEFAULT_BATCH_BYTES = ByteCount.parse("1mb").getBytes(); + private static final long DEFAULT_FLUSH_TIMEOUT = 200L; + + private static final String EXPECTED_ROLE_ARN = "arn:aws:iam::123456789012:role/OtlpRole"; + private static final String EXPECTED_EXTERNAL_ID = "my-ext-id"; + + @BeforeAll + static void setupMapper() { + mapper = new ObjectMapper(new YAMLFactory()) + .findAndRegisterModules(); // for default Duration support + + // custom deserializer for ByteCount strings like "2mb" + final SimpleModule byteCountModule = new SimpleModule(); + byteCountModule.addDeserializer(ByteCount.class, new JsonDeserializer<>() { + @Override + public ByteCount deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { + return ByteCount.parse(p.getValueAsString()); + } + }); + mapper.registerModule(byteCountModule); + + // custom deserializer for Duration strings like "500ms" + final SimpleModule durationModule = new SimpleModule(); + durationModule.addDeserializer(Duration.class, new JsonDeserializer<>() { + @Override + public Duration deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { + final String text = p.getValueAsString(); + if (text.endsWith("ms")) { + final long ms = Long.parseLong(text.substring(0, text.length() - 2)); + return Duration.ofMillis(ms); + } + return Duration.parse(text); + } + }); + mapper.registerModule(durationModule); + } + + @Test + void testGetStsRoleArnReturnsNullWhenNotSet() throws Exception { + final String yaml = String.join("\n", + "endpoint: \"" + EXPECTED_ENDPOINT + "\"", + "aws: {}", // sts_role_arn not set + "max_retries: 5" + ); + + final OtlpSinkConfig config = mapper.readValue(yaml, OtlpSinkConfig.class); + + assertNull(config.getStsRoleArn()); + } + + @Test + void testGetStsExternalIdReturnsNullWhenNotSet() throws Exception { + final String yaml = String.join("\n", + "endpoint: \"" + EXPECTED_ENDPOINT + "\"", + "aws: {}", // sts_external_id not set + "max_retries: 5" + ); + + final OtlpSinkConfig config = mapper.readValue(yaml, OtlpSinkConfig.class); + + assertNull(config.getStsExternalId()); + } + + @Test + void testAwsRegion_throwsWhenHostIsNull() throws Exception { + final String yaml = String.join("\n", + "endpoint: \"mailto:user@example.com\"", // No host component + "aws: {}" + ); + + final OtlpSinkConfig config = mapper.readValue(yaml, OtlpSinkConfig.class); + + assertThrows(IllegalArgumentException.class, config::getAwsRegion); + } + + @Test + void testAwsRegion_throwsWhenUriIsMalformed() throws Exception { + final String yaml = String.join("\n", + "endpoint: \"https://xray .us-west-2.amazonaws.com\"", // Invalid URI with space + "aws: {}" + ); + + final OtlpSinkConfig config = mapper.readValue(yaml, OtlpSinkConfig.class); + + final IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, config::getAwsRegion); + assertTrue(thrown.getMessage().contains("Failed to parse AWS region from endpoint")); + } + + @Test + void testMinimumConfigDefaults() throws Exception { + final String yaml = String.join("\n", + "endpoint: \"" + EXPECTED_ENDPOINT + "\"", + "aws: {}" + ); + + final OtlpSinkConfig config = mapper.readValue(yaml, OtlpSinkConfig.class); + + assertEquals(EXPECTED_ENDPOINT, config.getEndpoint()); + assertEquals(DEFAULT_MAX_RETRIES, config.getMaxRetries()); + assertEquals(DEFAULT_MAX_EVENTS, config.getMaxEvents()); + assertEquals(DEFAULT_BATCH_BYTES, config.getMaxBatchSize()); + assertEquals(DEFAULT_FLUSH_TIMEOUT, config.getFlushTimeoutMillis()); + + assertThat(config.getStsRoleArn(), nullValue()); + assertThat(config.getStsExternalId(), nullValue()); + } + + @Test + void testCustomThresholdAndRetries() throws Exception { + final String yaml = String.join("\n", + "endpoint: \"" + EXPECTED_ENDPOINT + "\"", + "aws: {}", + "max_retries: 3", + "threshold:", + " max_events: " + CUSTOM_MAX_EVENTS, + " max_batch_size: \"" + CUSTOM_BATCH_SIZE + "\"", + " flush_timeout: \"" + CUSTOM_FLUSH_TIMEOUT + "ms\"" + ); + + final OtlpSinkConfig config = mapper.readValue(yaml, OtlpSinkConfig.class); + + assertEquals(EXPECTED_ENDPOINT, config.getEndpoint()); + assertEquals(3, config.getMaxRetries()); + assertEquals(CUSTOM_MAX_EVENTS, config.getMaxEvents()); + assertEquals(CUSTOM_BATCH_BYTES, config.getMaxBatchSize()); + assertEquals(CUSTOM_FLUSH_TIMEOUT, config.getFlushTimeoutMillis()); + } + + @Test + void testIsAwsConfigValid_returnsTrue_whenPresent() throws Exception { + final String yaml = String.join("\n", + "endpoint: \"" + EXPECTED_ENDPOINT + "\"", + "aws: {}" + ); + final OtlpSinkConfig config = mapper.readValue(yaml, OtlpSinkConfig.class); + assertTrue(config.isAwsConfigValid()); + } + + @Test + void testIsAwsConfigValid_returnsFalse_whenMissing() throws Exception { + final String yaml = String.join("\n", + "endpoint: \"" + EXPECTED_ENDPOINT + "\"" + ); + final OtlpSinkConfig config = mapper.readValue(yaml, OtlpSinkConfig.class); + assertFalse(config.isAwsConfigValid()); + } + + @Test + void testAwsBlockDeserialization() throws Exception { + final String yaml = String.join("\n", + "endpoint: \"" + EXPECTED_ENDPOINT + "\"", + "max_retries: " + DEFAULT_MAX_RETRIES, + "aws:", + " sts_role_arn: \"" + EXPECTED_ROLE_ARN + "\"", + " sts_external_id: \"" + EXPECTED_EXTERNAL_ID + "\"" + ); + + final OtlpSinkConfig config = mapper.readValue(yaml, OtlpSinkConfig.class); + + assertEquals(EXPECTED_ENDPOINT, config.getEndpoint()); + assertEquals(DEFAULT_MAX_RETRIES, config.getMaxRetries()); + + assertEquals(EXPECTED_ROLE_ARN, config.getStsRoleArn()); + assertEquals(EXPECTED_EXTERNAL_ID, config.getStsExternalId()); + } + + @Test + void testAwsSectionPresentButEmpty_doesNotThrow() throws Exception { + final String yaml = String.join("\n", + "endpoint: \"" + EXPECTED_ENDPOINT + "\"", + "aws: {}", + "max_retries: " + DEFAULT_MAX_RETRIES + ); + + final OtlpSinkConfig config = mapper.readValue(yaml, OtlpSinkConfig.class); + + // aws block exists, so validateAwsConfig() will not throw + assertNull(config.getStsRoleArn()); + assertNull(config.getStsExternalId()); + } + + @Test + void testAwsRegion_parsedFromStandardXrayEndpoint() throws Exception { + final String yaml = String.join("\n", + "endpoint: \"https://xray.us-east-1.amazonaws.com\"", + "aws: {}", + "max_retries: 5" + ); + + final OtlpSinkConfig config = mapper.readValue(yaml, OtlpSinkConfig.class); + + assertEquals("https://xray.us-east-1.amazonaws.com", config.getEndpoint()); + assertEquals(5, config.getMaxRetries()); + + assertThat(config.getAwsRegion(), equalTo(Region.US_EAST_1)); + } + + @Test + void testAwsRegion_invalidEndpoint_throwsException() { + final String yaml = String.join("\n", + "endpoint: \"https://example.invalid-endpoint\"", + "aws: {}", + "max_retries: 5" + ); + + assertThrows(IllegalArgumentException.class, () -> { + final OtlpSinkConfig config = mapper.readValue(yaml, OtlpSinkConfig.class); + config.getAwsRegion(); // triggers parsing + }); + } + + @Test + void testAwsRegion_throwsException_onInvalidEndpoint() { + final String yaml = String.join("\n", + "endpoint: \"invalid-endpoint\"", + "aws: {}", + "max_retries: 5" + ); + + assertThrows(IllegalArgumentException.class, () -> { + final OtlpSinkConfig config = mapper.readValue(yaml, OtlpSinkConfig.class); + config.getAwsRegion(); // must trigger parsing logic + }); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/configuration/ThresholdConfigTest.java b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/configuration/ThresholdConfigTest.java new file mode 100644 index 0000000000..6e8d3834a6 --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/configuration/ThresholdConfigTest.java @@ -0,0 +1,73 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.otlp.configuration; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.types.ByteCount; + +import java.io.IOException; +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class ThresholdConfigTest { + + private static ObjectMapper mapper; + + private static final int CUSTOM_MAX_EVENTS = 100; + private static final String CUSTOM_MAX_BATCH_SIZE = "2mb"; + private static final Duration CUSTOM_FLUSH_TIMEOUT = Duration.ofMillis(500); + + private static final int DEFAULT_MAX_EVENTS = 512; + private static final String DEFAULT_MAX_BATCH_SIZE = "1mb"; + private static final Duration DEFAULT_FLUSH_TIMEOUT = Duration.ofMillis(200); + + @BeforeAll + static void setupMapper() { + mapper = new ObjectMapper(new YAMLFactory()) + .findAndRegisterModules(); // for Duration + + // Register a simple deserializer for ByteCount.from “2mb”-style strings + final SimpleModule byteCountModule = new SimpleModule(); + byteCountModule.addDeserializer(ByteCount.class, new JsonDeserializer<>() { + @Override + public ByteCount deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { + return ByteCount.parse(p.getValueAsString()); + } + }); + mapper.registerModule(byteCountModule); + } + + @Test + void testDeserializationFromYaml() throws Exception { + final String yaml = String.join("\n", + "max_events: " + CUSTOM_MAX_EVENTS, + "max_batch_size: \"" + CUSTOM_MAX_BATCH_SIZE + "\"", + "flush_timeout: \"" + CUSTOM_FLUSH_TIMEOUT.toString() + "\"" + ); + + final ThresholdConfig config = mapper.readValue(yaml, ThresholdConfig.class); + + assertEquals(CUSTOM_MAX_EVENTS, config.getMaxEvents()); + assertEquals(ByteCount.parse(CUSTOM_MAX_BATCH_SIZE), config.getMaxBatchSize()); + assertEquals(CUSTOM_FLUSH_TIMEOUT, config.getFlushTimeout()); + } + + @Test + void testDefaultsWhenYamlEmpty() throws Exception { + final ThresholdConfig config = mapper.readValue("{}", ThresholdConfig.class); + + assertEquals(DEFAULT_MAX_EVENTS, config.getMaxEvents()); + assertEquals(ByteCount.parse(DEFAULT_MAX_BATCH_SIZE), config.getMaxBatchSize()); + assertEquals(DEFAULT_FLUSH_TIMEOUT, config.getFlushTimeout()); + } +} diff --git a/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/http/GzipCompressorTest.java b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/http/GzipCompressorTest.java new file mode 100644 index 0000000000..36a07327f0 --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/http/GzipCompressorTest.java @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.otlp.http; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.sink.otlp.metrics.OtlpSinkMetrics; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.zip.GZIPInputStream; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +class GzipCompressorTest { + + private OtlpSinkMetrics sinkMetrics; + + @BeforeEach + void setUp() { + sinkMetrics = mock(OtlpSinkMetrics.class); + } + + @Test + void apply_returnsCompressedPayload() throws IOException { + byte[] input = "test-payload".getBytes(); + GzipCompressor gzipCompressor = new GzipCompressor(sinkMetrics); + + final byte[] compressed = gzipCompressor.apply(input); + + // Validate decompression gives original input + assertNotNull(compressed); + final byte[] decompressed = decompress(compressed); + assertArrayEquals(input, decompressed); + } + + @Test + void apply_handlesIOException_andIncrementsErrorMetric() throws IOException { + GzipCompressor gzipCompressor = spy(new GzipCompressor(sinkMetrics)); + doThrow(new IOException("boom")).when(gzipCompressor).compressInternal(any()); + + final byte[] result = gzipCompressor.apply("payload".getBytes(StandardCharsets.UTF_8)); + + assertEquals(0, result.length); + verify(sinkMetrics).incrementErrorsCount(); + } + + private byte[] decompress(byte[] compressed) throws IOException { + try (GZIPInputStream gzipStream = new GZIPInputStream(new ByteArrayInputStream(compressed))) { + return gzipStream.readAllBytes(); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/http/OtlpHttpSenderTest.java b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/http/OtlpHttpSenderTest.java new file mode 100644 index 0000000000..f50a7c5f82 --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/http/OtlpHttpSenderTest.java @@ -0,0 +1,441 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.otlp.http; + +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.HttpData; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.ResponseHeaders; +import io.opentelemetry.proto.collector.trace.v1.ExportTracePartialSuccess; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.plugins.sink.otlp.configuration.OtlpSinkConfig; +import org.opensearch.dataprepper.plugins.sink.otlp.metrics.OtlpSinkMetrics; +import software.amazon.awssdk.http.SdkHttpFullRequest; +import software.amazon.awssdk.http.SdkHttpMethod; +import software.amazon.awssdk.utils.Pair; + +import java.lang.reflect.Method; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +class OtlpHttpSenderTest { + + private static final byte[] PAYLOAD = "test-otlp".getBytes(StandardCharsets.UTF_8); + private static final int SPANS_COUNT = 3; + + private OtlpSinkMetrics metrics; + private SigV4Signer signer; + private WebClient webClient; + private Function gzipCompressor; + private OtlpHttpSender sender; + private AwsCredentialsSupplier mockAwsCredSupplier; + private List> testBatch; + private EventHandle mockEventHandle1; + private EventHandle mockEventHandle2; + private EventHandle mockEventHandle3; + + @BeforeEach + void setup() { + metrics = mock(OtlpSinkMetrics.class); + signer = mock(SigV4Signer.class); + webClient = mock(WebClient.class); + gzipCompressor = mock(Function.class); + mockAwsCredSupplier = mock(AwsCredentialsSupplier.class); + + mockEventHandle1 = mock(EventHandle.class); + mockEventHandle2 = mock(EventHandle.class); + mockEventHandle3 = mock(EventHandle.class); + + // Create test batch with ResourceSpans and EventHandles + testBatch = Arrays.asList( + Pair.of(ResourceSpans.newBuilder().build(), mockEventHandle1), + Pair.of(ResourceSpans.newBuilder().build(), mockEventHandle2), + Pair.of(ResourceSpans.newBuilder().build(), mockEventHandle3) + ); + + when(gzipCompressor.apply(any())).thenReturn(PAYLOAD); + when(signer.signRequest(any())).thenReturn( + SdkHttpFullRequest.builder() + .method(SdkHttpMethod.POST) + .uri(URI.create("https://localhost/v1/traces")) + .putHeader("Authorization", "sig") + .build() + ); + + sender = new OtlpHttpSender(metrics, gzipCompressor, signer, webClient); + } + + @Test + void testSend_emptyBatch_returnsEarly() { + final List> emptyBatch = Collections.emptyList(); + + // Prevent accidental NPE if send logic changes + when(webClient.execute(any(HttpRequest.class))).thenThrow(new AssertionError("Should not call execute")); + + sender.send(emptyBatch); + + verifyNoInteractions(metrics); + } + + @Test + void testSend_successfulResponse() { + when(webClient.execute(any(HttpRequest.class))).thenReturn( + HttpResponse.of(ResponseHeaders.of(200), HttpData.empty()) + ); + + sender.send(testBatch); + + await().untilAsserted(() -> { + verify(metrics).incrementRecordsOut(SPANS_COUNT); + verify(metrics).incrementPayloadSize(anyLong()); + verify(metrics).incrementPayloadGzipSize(PAYLOAD.length); + verify(metrics).recordHttpLatency(anyLong()); + verify(metrics).recordResponseCode(200); + + // Verify all event handles are released with success=true + verify(mockEventHandle1).release(true); + verify(mockEventHandle2).release(true); + verify(mockEventHandle3).release(true); + }); + } + + @Test + void testSend_partialSuccessResponse() { + final ExportTraceServiceResponse proto = ExportTraceServiceResponse.newBuilder() + .setPartialSuccess(ExportTracePartialSuccess.newBuilder() + .setRejectedSpans(2) + .setErrorMessage("invalid span") + .build()) + .build(); + + when(webClient.execute(any(HttpRequest.class))).thenReturn( + HttpResponse.of(ResponseHeaders.of(200), HttpData.wrap(proto.toByteArray())) + ); + + sender.send(testBatch); + + await().untilAsserted(() -> { + verify(metrics).incrementRejectedSpansCount(2); + verify(metrics).incrementRecordsOut(SPANS_COUNT - 2); + verify(metrics).recordResponseCode(200); + + // All handles should still be released as true (optimistic) + verify(mockEventHandle1).release(true); + verify(mockEventHandle2).release(true); + verify(mockEventHandle3).release(true); + }); + } + + @Test + void testSend_partialSuccessWithZeroRejected() { + final ExportTraceServiceResponse proto = ExportTraceServiceResponse.newBuilder() + .setPartialSuccess(ExportTracePartialSuccess.newBuilder() + .setRejectedSpans(0) + .setErrorMessage("") + .build()) + .build(); + + when(webClient.execute(any(HttpRequest.class))).thenReturn( + HttpResponse.of(ResponseHeaders.of(200), HttpData.wrap(proto.toByteArray())) + ); + + sender.send(testBatch); + + await().untilAsserted(() -> { + verify(metrics, never()).incrementRejectedSpansCount(anyLong()); + verify(metrics).incrementRecordsOut(SPANS_COUNT); + verify(metrics).recordResponseCode(200); + + // All handles should be released as true + verify(mockEventHandle1).release(true); + verify(mockEventHandle2).release(true); + verify(mockEventHandle3).release(true); + }); + } + + @Test + void testSend_parseErrorOnSuccessResponse() { + when(webClient.execute(any(HttpRequest.class))).thenReturn( + HttpResponse.of(ResponseHeaders.of(200), HttpData.ofUtf8("not-protobuf")) + ); + + sender.send(testBatch); + + await().untilAsserted(() -> { + verify(metrics).incrementErrorsCount(); + verify(metrics).incrementRecordsOut(SPANS_COUNT); + verify(metrics).recordResponseCode(200); + + // Handles should still be released as true despite parse error + verify(mockEventHandle1).release(true); + verify(mockEventHandle2).release(true); + verify(mockEventHandle3).release(true); + }); + } + + @Test + void testSend_nonSuccessStatus() { + when(webClient.execute(any(HttpRequest.class))).thenReturn( + HttpResponse.of(ResponseHeaders.of(400), HttpData.ofUtf8("{\"error\":\"bad request\"}")) + ); + + sender.send(testBatch); + + await().untilAsserted(() -> { + verify(metrics).recordResponseCode(400); + verify(metrics).incrementRejectedSpansCount(SPANS_COUNT); + + // All handles should be released with success=false + verify(mockEventHandle1).release(false); + verify(mockEventHandle2).release(false); + verify(mockEventHandle3).release(false); + }); + } + + @Test + void testSend_nonSuccessStatusWithNullResponseBody() { + when(webClient.execute(any(HttpRequest.class))).thenReturn( + HttpResponse.of(ResponseHeaders.of(500), HttpData.empty()) + ); + + sender.send(testBatch); + + await().untilAsserted(() -> { + verify(metrics).recordResponseCode(500); + verify(metrics).incrementRejectedSpansCount(SPANS_COUNT); + + // All handles should be released with success=false + verify(mockEventHandle1).release(false); + verify(mockEventHandle2).release(false); + verify(mockEventHandle3).release(false); + }); + } + + @Test + void testSend_skipsSendIfGzipFails() { + sender = new OtlpHttpSender(metrics, ignored -> new byte[0], signer, webClient); + + sender.send(testBatch); + + verify(metrics).incrementFailedSpansCount(SPANS_COUNT); + verifyNoInteractions(webClient); + + // All handles should be released with success=false + verify(mockEventHandle1).release(false); + verify(mockEventHandle2).release(false); + verify(mockEventHandle3).release(false); + } + + @Test + void testSend_exceptionDuringSendIncrementsRejected() { + final HttpResponse failingResponse = HttpResponse.ofFailure(new RuntimeException("send failed")); + when(webClient.execute(any(HttpRequest.class))).thenReturn(failingResponse); + + sender.send(testBatch); + + await().untilAsserted(() -> { + verify(metrics).incrementRejectedSpansCount(SPANS_COUNT); + + // All handles should be released with success=false + verify(mockEventHandle1).release(false); + verify(mockEventHandle2).release(false); + verify(mockEventHandle3).release(false); + }); + } + + @Test + void testConstructor_withDefaultConfig() { + final OtlpSinkConfig config = mock(OtlpSinkConfig.class); + + when(config.getEndpoint()).thenReturn("https://localhost/v1/traces"); + when(config.getMaxBatchSize()).thenReturn(1_000_000L); + when(config.getMaxRetries()).thenReturn(2); + when(config.getFlushTimeoutMillis()).thenReturn(5000L); + when(config.getAwsRegion()).thenReturn(software.amazon.awssdk.regions.Region.US_WEST_2); + + final OtlpHttpSender defaultSender = new OtlpHttpSender(mockAwsCredSupplier, config, metrics); + assertNotNull(defaultSender); + } + + @Test + void testConstructor_withMinimumThresholdConfig() { + final OtlpSinkConfig config = mock(OtlpSinkConfig.class); + + // Set all threshold values to minimum valid input + when(config.getEndpoint()).thenReturn("https://localhost/v1/traces"); + when(config.getMaxBatchSize()).thenReturn(0L); + when(config.getMaxRetries()).thenReturn(0); + when(config.getFlushTimeoutMillis()).thenReturn(1L); + when(config.getAwsRegion()).thenReturn(software.amazon.awssdk.regions.Region.US_WEST_2); + + // Should not throw or crash + final OtlpHttpSender minimalSender = new OtlpHttpSender(mockAwsCredSupplier, config, metrics); + assertNotNull(minimalSender); + } + + @Test + void testGetPayloadAndCompressedPayload_privateMethod() throws Exception { + // Test the private method via reflection to ensure 100% coverage + final Method method = OtlpHttpSender.class.getDeclaredMethod("getPayloadAndCompressedPayload", List.class); + method.setAccessible(true); + + when(gzipCompressor.apply(any())).thenReturn("compressed".getBytes()); + + @SuppressWarnings("unchecked") final Pair result = (Pair) method.invoke(sender, testBatch); + + assertNotNull(result); + assertNotNull(result.left()); // payload + assertNotNull(result.right()); // compressed payload + verify(gzipCompressor).apply(any()); + } + + @Test + void testBuildHttpRequest_privateMethod() throws Exception { + // Test the private method via reflection + final Method method = OtlpHttpSender.class.getDeclaredMethod("buildHttpRequest", byte[].class); + method.setAccessible(true); + + final HttpRequest result = (HttpRequest) method.invoke(sender, PAYLOAD); + + assertNotNull(result); + verify(signer).signRequest(PAYLOAD); + } + + @Test + void testHandleResponse_privateMethod_success() throws Exception { + final Method method = OtlpHttpSender.class.getDeclaredMethod("handleResponse", int.class, byte[].class, List.class); + method.setAccessible(true); + + method.invoke(sender, 200, null, testBatch); + + verify(metrics).recordResponseCode(200); + verify(metrics).incrementRecordsOut(SPANS_COUNT); + verify(mockEventHandle1).release(true); + verify(mockEventHandle2).release(true); + verify(mockEventHandle3).release(true); + } + + @Test + void testHandleResponse_privateMethod_failure() throws Exception { + final Method method = OtlpHttpSender.class.getDeclaredMethod("handleResponse", int.class, byte[].class, List.class); + method.setAccessible(true); + + method.invoke(sender, 400, "error".getBytes(), testBatch); + + verify(metrics).recordResponseCode(400); + verify(metrics).incrementRejectedSpansCount(SPANS_COUNT); + verify(mockEventHandle1).release(false); + verify(mockEventHandle2).release(false); + verify(mockEventHandle3).release(false); + } + + @Test + void testHandleSuccessfulResponse_privateMethod_withNullBody() throws Exception { + final Method method = OtlpHttpSender.class.getDeclaredMethod("handleSuccessfulResponse", byte[].class, List.class); + method.setAccessible(true); + + method.invoke(sender, null, testBatch); + + verify(metrics).incrementRecordsOut(SPANS_COUNT); + verify(mockEventHandle1).release(true); + verify(mockEventHandle2).release(true); + verify(mockEventHandle3).release(true); + verifyNoMoreInteractions(metrics); + } + + @Test + void testHandleSuccessfulResponse_privateMethod_withoutPartialSuccess() throws Exception { + // Create a response with no partial_success + final ExportTraceServiceResponse response = ExportTraceServiceResponse.newBuilder().build(); + final byte[] responseBytes = response.toByteArray(); + + final Method method = OtlpHttpSender.class.getDeclaredMethod("handleSuccessfulResponse", byte[].class, List.class); + method.setAccessible(true); + + method.invoke(sender, responseBytes, testBatch); + + verify(metrics).incrementRecordsOut(SPANS_COUNT); + verify(mockEventHandle1).release(true); + verify(mockEventHandle2).release(true); + verify(mockEventHandle3).release(true); + verifyNoMoreInteractions(metrics); + } + + @Test + void testHandleSuccessfulResponse_privateMethod_withPartialSuccess() throws Exception { + final ExportTraceServiceResponse response = ExportTraceServiceResponse.newBuilder() + .setPartialSuccess(ExportTracePartialSuccess.newBuilder() + .setRejectedSpans(1) + .setErrorMessage("test error") + .build()) + .build(); + final byte[] responseBytes = response.toByteArray(); + + final Method method = OtlpHttpSender.class.getDeclaredMethod("handleSuccessfulResponse", byte[].class, List.class); + method.setAccessible(true); + + method.invoke(sender, responseBytes, testBatch); + + verify(metrics).incrementRejectedSpansCount(1); + verify(metrics).incrementRecordsOut(SPANS_COUNT - 1); + verify(mockEventHandle1).release(true); + verify(mockEventHandle2).release(true); + verify(mockEventHandle3).release(true); + } + + @Test + void testHandleSuccessfulResponse_privateMethod_parseException() throws Exception { + final byte[] invalidBytes = "invalid protobuf".getBytes(); + + final Method method = OtlpHttpSender.class.getDeclaredMethod("handleSuccessfulResponse", byte[].class, List.class); + method.setAccessible(true); + + method.invoke(sender, invalidBytes, testBatch); + + verify(metrics).incrementErrorsCount(); + verify(metrics).incrementRecordsOut(SPANS_COUNT); + verify(mockEventHandle1).release(true); + verify(mockEventHandle2).release(true); + verify(mockEventHandle3).release(true); + } + + @Test + void testHandleResponse_withNullResponseBody_logsNoBody() throws Exception { + final Method method = OtlpHttpSender.class.getDeclaredMethod("handleResponse", int.class, byte[].class, List.class); + method.setAccessible(true); + + method.invoke(sender, 500, null, testBatch); + + verify(metrics).recordResponseCode(500); + verify(metrics).incrementRejectedSpansCount(SPANS_COUNT); + + // Verifying event handles released with success=false + verify(mockEventHandle1).release(false); + verify(mockEventHandle2).release(false); + verify(mockEventHandle3).release(false); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/http/SigV4SignerTest.java b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/http/SigV4SignerTest.java new file mode 100644 index 0000000000..45db7131fd --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/http/SigV4SignerTest.java @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.otlp.http; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.sink.otlp.configuration.OtlpSinkConfig; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.http.SdkHttpFullRequest; +import software.amazon.awssdk.regions.Region; + +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class SigV4SignerTest { + + private static final byte[] PAYLOAD = "test-payload".getBytes(StandardCharsets.UTF_8); + private static final Region REGION = Region.US_WEST_2; + + private OtlpSinkConfig mockConfig; + private AwsCredentialsSupplier mockSupplier; + private SigV4Signer target; + + @BeforeEach + void setup() { + mockConfig = mock(OtlpSinkConfig.class); + mockSupplier = mock(AwsCredentialsSupplier.class); + + when(mockConfig.getAwsRegion()).thenReturn(REGION); + + final AwsBasicCredentials mockCredentials = AwsBasicCredentials.create("mockAccessKey", "mockSecretKey"); + final StaticCredentialsProvider mockCredentialsProvider = StaticCredentialsProvider.create(mockCredentials); + when(mockSupplier.getProvider(any())).thenReturn(mockCredentialsProvider); + } + + @Test + void testSignRequest_withExplicitEndpoint() { + final String endpoint = "https://performance.us-west-2.xray.cloudwatch.aws.dev/v1/traces"; + when(mockConfig.getEndpoint()).thenReturn(endpoint); + + target = new SigV4Signer(mockSupplier, mockConfig); + final SdkHttpFullRequest request = target.signRequest(PAYLOAD); + + assertNotNull(request); + assertEquals("POST", request.method().name()); + assertTrue(request.headers().containsKey("Authorization")); + assertEquals("application/x-protobuf", request.firstMatchingHeader("Content-Type").orElse(null)); + assertEquals(endpoint, request.getUri().toString()); + } + + @Test + void testSignRequest_withDefaultEndpoint() { + when(mockConfig.getEndpoint()).thenReturn(null); + + target = new SigV4Signer(mockSupplier, mockConfig); + final SdkHttpFullRequest request = target.signRequest(PAYLOAD); + + assertNotNull(request); + assertEquals("POST", request.method().name()); + assertTrue(request.headers().containsKey("Authorization")); + assertEquals("application/x-protobuf", request.firstMatchingHeader("Content-Type").orElse(null)); + assertEquals("https://xray.us-west-2.amazonaws.com/v1/traces", request.getUri().toString()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/metrics/OtlpSinkMetricsTest.java b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/metrics/OtlpSinkMetricsTest.java new file mode 100644 index 0000000000..10a33c225a --- /dev/null +++ b/data-prepper-plugins/otlp-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/otlp/metrics/OtlpSinkMetricsTest.java @@ -0,0 +1,171 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.otlp.metrics; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Timer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.function.ToDoubleFunction; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class OtlpSinkMetricsTest { + + private PluginMetrics pluginMetrics; + private PluginSetting pluginSetting; + private Counter counterMock; + private DistributionSummary summaryMock; + private Timer timerMock; + private OtlpSinkMetrics sinkMetrics; + + @BeforeEach + void setUp() { + pluginMetrics = mock(PluginMetrics.class); + pluginSetting = mock(PluginSetting.class); + + // basic pluginSetting behavior + when(pluginSetting.getPipelineName()).thenReturn("testPipeline"); + when(pluginSetting.getName()).thenReturn("otlp"); + + // stub out all counter() calls to return the same Counter mock + counterMock = mock(Counter.class); + when(pluginMetrics.counter(anyString())).thenReturn(counterMock); + + summaryMock = mock(DistributionSummary.class); + timerMock = mock(Timer.class); + + + sinkMetrics = new OtlpSinkMetrics(pluginMetrics, pluginSetting); + } + + @Test + void testIncrementRecordsOut() { + sinkMetrics.incrementRecordsOut(7); + verify(pluginMetrics).counter("recordsOut"); + verify(counterMock).increment(7.0); + } + + @Test + void testIncrementErrorsCount() { + sinkMetrics.incrementErrorsCount(); + verify(pluginMetrics).counter("errorsCount"); + verify(counterMock).increment(1.0); + } + + @Test + void testIncrementRejectedSpansCount() { + sinkMetrics.incrementRejectedSpansCount(5); + verify(pluginMetrics).counter("rejectedSpansCount"); + verify(counterMock).increment(5.0); + } + + @Test + void testIncrementFailedSpansCount() { + sinkMetrics.incrementFailedSpansCount(5); + verify(pluginMetrics).counter("failedSpansCount"); + verify(counterMock).increment(5.0); + } + + @Test + void testRecordResponseCodeCounters() { + // 5xx + sinkMetrics.recordResponseCode(503); + verify(pluginMetrics).counter("http_5xx_responses"); + verify(counterMock).increment(); + + // 4xx + sinkMetrics.recordResponseCode(404); + verify(pluginMetrics).counter("http_4xx_responses"); + // total increments called twice so far + verify(counterMock, times(2)).increment(); + + // 2xx + sinkMetrics.recordResponseCode(200); + verify(pluginMetrics).counter("http_2xx_responses"); + verify(counterMock, times(3)).increment(); + } + + @Test + void testRegisterQueueGauges() { + final BlockingQueue queue = new ArrayBlockingQueue<>(10); + sinkMetrics.registerQueueGauges(queue); + + // we expect two gauges registered: queueSize and queueCapacity + verify(pluginMetrics).gauge(eq("queueSize"), eq(queue), any()); + verify(pluginMetrics).gauge(eq("queueCapacity"), eq(queue), any()); + } + + @Test + @SuppressWarnings("unchecked") + void testQueueCapacityGaugeFunction() { + final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(10); + // put 3 elements so size=3, remainingCapacity=7 + queue.add("one"); + queue.add("two"); + queue.add("three"); + + // register the gauges + sinkMetrics.registerQueueGauges(queue); + + // capture the ToDoubleFunction passed to gauge(...) + @SuppressWarnings("rawtypes") final ArgumentCaptor captor = ArgumentCaptor.forClass(ToDoubleFunction.class); + verify(pluginMetrics).gauge(eq("queueCapacity"), eq(queue), captor.capture()); + + // apply it: remainingCapacity + size == 7 + 3 == 10 + final ToDoubleFunction> func = captor.getValue(); + assertEquals(10.0, func.applyAsDouble(queue), 0.0); + } + + @Test + void testIncrementPayloadSize_delegatesToSummary() throws Exception { + injectField("payloadSize", summaryMock); + + sinkMetrics.incrementPayloadSize(123L); + + verify(summaryMock).record(123L); + } + + @Test + void testIncrementPayloadGzipSize_delegatesToSummary() throws Exception { + injectField("payloadGzipSize", summaryMock); + + sinkMetrics.incrementPayloadGzipSize(77L); + + verify(summaryMock).record(77L); + } + + @Test + void testRecordHttpLatency_delegatesToTimer() throws Exception { + injectField("httpLatency", timerMock); + + sinkMetrics.recordHttpLatency(250L); + + verify(timerMock).record(Duration.ofMillis(250L)); + } + + private void injectField(final String fieldName, final Object mock) throws Exception { + final Field f = OtlpSinkMetrics.class.getDeclaredField(fieldName); + f.setAccessible(true); + f.set(sinkMetrics, mock); + } +} diff --git a/settings.gradle b/settings.gradle index da72cb66b4..725f81ab2d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -198,3 +198,4 @@ include 'data-prepper-plugins:saas-source-plugins:confluence-source' include 'data-prepper-plugins:saas-source-plugins:atlassian-commons' include 'data-prepper-plugins:saas-source-plugins:crowdstrike-source' include 'data-prepper-plugins:saas-source-plugins:microsoft-office365-source' +include 'data-prepper-plugins:otlp-sink'