Skip to content

Commit bc7ea65

Browse files
huypham612geekgirl10
authored andcommitted
Add OTLP sink plugin for exporting spans to AWS X-Ray (opensearch-project#5664)
Add OTLP sink plugin for exporting spans to AWS X-Ray (opensearch-project#5664) Feature/xray init feat(xray-otlp-sink): Add X-Ray OTLP Sink plugin skeleton - Added test resources and support for Span records - Added sample pipeline config and OTLP test span JSON under `src/test/resources` - Verified local pipeline ingest and logging using `grpcurl` - Added README with developer instructions for running and testing locally These changes establish the foundation for local testing and future X-Ray e2e testing. Signed-off-by: huy pham <huyp@amazon.com> Signed-off-by: Heli <desaiheli17@gmail.com> Co-authored-by: Heli <desaiheli17@gmail.com> Signed-off-by: Jeffrey Aaron Jeyasingh <jeffreyaaron06@gmail.com>
1 parent 32a99e7 commit bc7ea65

19 files changed

Lines changed: 3007 additions & 0 deletions

File tree

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# OTLP Sink Plugin
2+
3+
The OTLP sink plugin sends span data using the OpenTelemetry Protocol (OTLP) format.
4+
The initial release supports exporting spans to AWS X-Ray. Future releases will support sending spans, metrics, and logs
5+
to any OTLP Protobuf-compatible endpoint.
6+
7+
---
8+
9+
## Known Limitations
10+
11+
- Currently, supports only trace data (spans). Support for metrics and logs will be added in future releases.
12+
- No support for DQL-based loss-less delivery in this release.
13+
- Only AWS X-Ray-compatible OTLP endpoints are currently supported (`https://xray.<region>.amazonaws.com/v1/traces`).
14+
- Only OTLP over HTTP is supported; gRPC is not yet supported.
15+
16+
---
17+
18+
## Sample Pipeline Configuration
19+
20+
### Minimal Configuration (No STS)
21+
22+
Use this when Data Prepper has permission to write to AWS X-Ray directly.
23+
24+
```yaml
25+
otlp_pipeline:
26+
source:
27+
otel_trace_source:
28+
29+
sink:
30+
- otlp:
31+
endpoint: "https://xray.us-west-2.amazonaws.com/v1/traces"
32+
aws: { }
33+
```
34+
35+
### Full Configuration with STS
36+
37+
Use this when assuming a cross-account role is required.
38+
39+
```yaml
40+
otlp_pipeline:
41+
workers: 2
42+
43+
source:
44+
otel_trace_source:
45+
ssl: false
46+
port: 21890
47+
48+
buffer:
49+
bounded_blocking:
50+
buffer_size: 1000000
51+
batch_size: 125000
52+
53+
sink:
54+
- otlp:
55+
endpoint: "https://xray.us-west-2.amazonaws.com/v1/traces"
56+
max_retries: 5
57+
threshold:
58+
max_events: 512
59+
max_batch_size: 1mb
60+
flush_timeout: 200ms
61+
aws:
62+
sts_role_arn: arn:aws:iam::123456789012:role/MyRole
63+
sts_external_id: external-id-value
64+
```
65+
66+
---
67+
68+
## Configuration Options
69+
70+
| Property | Type | Required | Default | Description |
71+
|----------------------------|----------|----------|-----------------------|----------------------------------------------------------------------------------------------------------|
72+
| `endpoint` | `String` | Yes | — | AWS X-Ray OTLP endpoint where spans will be sent. |
73+
| `max_retries` | `int` | No | `5` | Maximum number of retry attempts on HTTP send failures. |
74+
| **threshold** | `Object` | No | — | Controls batching behavior. See below for sub-properties. |
75+
| `threshold.max_events` | `int` | No | `512` (recommended) | Maximum number of spans per batch. Use `0` to disable count-based flushing. Must be ≥ 0. |
76+
| `threshold.max_batch_size` | `String` | No | `1mb` (recommended) | Maximum total payload bytes per batch. Supports human-readable suffixes (`kb`, `mb`). |
77+
| `threshold.flush_timeout` | `String` | No | `200ms` (recommended) | Maximum time to wait before flushing a non-empty batch. Minimum: 1ms (e.g., `200ms`, `1s`) |
78+
| **aws** | `Object` | Yes | — | AWS authentication settings. Use `{}` if no STS role is needed. See below. |
79+
| `aws.sts_role_arn` | `String` | No | — | IAM Role ARN that Data Prepper (or OSI) assumes to send spans to X-Ray on behalf of a customer account. |
80+
| `aws.sts_external_id` | `String` | No | — | External ID to use when assuming the role. Required only if the target IAM role enforces sts:ExternalId. |
81+
82+
**Additional Notes:**
83+
84+
- `aws.region` is automatically derived from the endpoint.
85+
86+
---
87+
88+
## Performance Benchmark
89+
90+
### Summary
91+
92+
* Sustains ~3.5K TPS with ≤150ms p99 latency on t4g.large.
93+
* Uses only ~8% CPU, ~100MB heap.
94+
* 0 errors, retries, or drops during a 3-hour soak test.
95+
96+
### Tuning Recommendations
97+
98+
| Setting | Recommended | Reason |
99+
|------------------|-------------|-------------------------------------------------------------------------------------------------|
100+
| `max_retries` | `5` | Matches AWS SDK default. Gives ~8s of exponential backoff to tolerate transient 503/5xx errors. |
101+
| `max_events` | `512` | Supports up to 3.5K TPS with 2 workers. Keeps p99 latency around 130ms. |
102+
| `max_batch_size` | `1mb` | Aligns with OTEL + AWS X-Ray guidance. Larger batches get split, increasing latency/load. |
103+
| `flush_timeout` | `200ms` | Short enough to avoid delay, long enough to fill batches and keep CPU/GCs low. |
104+
105+
### Additional Tuning tips
106+
107+
* Lower `max_events` to **200–400** to reduce latency below 100 ms
108+
* Decrease `flush_timeout` to **100 ms** for faster flushes (with higher CPU/network cost)
109+
* Increase `max_batch_size` to **≥ 8 MB** only if p99 span > 9 KB
110+
* Add pipeline workers if queue saturates at >4K TPS
111+
112+
### Queue Sizing Rule
113+
114+
> Queue capacity = max_events * 10 (minimum 2000)
115+
>
116+
> To keep memory usage under ~50MB:
117+
> max_events ≤ 50_000_000 ÷ (10 × p99_span_size_bytes)
118+
>
119+
> Example: With p99 span size of 1 KB, max_events should be ≤ 5000
120+
121+
122+
---
123+
124+
## Protocol Details
125+
126+
* Protocol: OTLP over HTTP
127+
* Content-Type: `application/x-protobuf`
128+
* Compression: `gzip` (enabled by default)
129+
All outgoing HTTP requests use gzip compression to reduce payload size and bandwidth usage.
130+
131+
---
132+
133+
## Delivery Semantics
134+
135+
Currently, the sink provides at-most-once delivery. Once retries are exhausted, span batches are dropped.
136+
Future releases will support durable queueing via DQL for loss-less guarantees.
137+
138+
---
139+
140+
## Retry Behavior
141+
142+
- The sink uses an exponential backoff with jitter strategy for retryable HTTP status codes (e.g., 429, 502, 503, 504).
143+
- Maximum number of attempts is controlled by `max_retries`. Once exceeded:
144+
- The span batch is dropped.
145+
- The plugin logs the exception and increments the error metric.
146+
- Non-retryable errors (e.g., 400, 403) are logged and counted immediately without retry.
147+
- Retry logic follows
148+
the [OTLP/HTTP response specification](https://opentelemetry.io/docs/specs/otlp/#otlphttp-response).
149+
- `Retry-After` header is not used for dynamic backoff because:
150+
- Armeria’s retry rule API only supports boolean conditions or fixed `Backoff` strategies.
151+
- Supporting `Retry-After` would require a custom `Backoff` implementation, adding unnecessary complexity.
152+
- The exponential backoff already handles common retry intervals effectively.
153+
---
154+
155+
## Logging & Metrics
156+
157+
* Exceptions are logged with full stack traces. No customer data is logged.
158+
* Metrics are emitted via Micrometer and include:
159+
* recordsIn, recordsOut
160+
* httpLatency, HTTP codes
161+
* errorCount, rejectedSpansCount, failedSpansCount, retriesCount
162+
* queueSize, queueCapacity
163+
* payloadSize, payloadGzipSize
164+
* JVM stats if configured (e.g., heap usage, GC pauses)
165+
166+
---
167+
168+
## Developer Guide
169+
170+
See the [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) guide for general information on contributions.
171+
172+
### Run unit tests locally
173+
174+
```bash
175+
./gradlew :data-prepper-plugins:otlp-sink:test
176+
```
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
sourceSets {
7+
integrationTest {
8+
java.srcDir file('src/integrationTest/java')
9+
resources.srcDir file('src/integrationTest/resources')
10+
compileClasspath += sourceSets.main.output
11+
runtimeClasspath += sourceSets.main.output
12+
}
13+
}
14+
15+
configurations {
16+
integrationTestImplementation.extendsFrom testImplementation
17+
integrationTestRuntimeOnly.extendsFrom testRuntimeOnly
18+
}
19+
20+
dependencies {
21+
// AWS SDK
22+
implementation 'software.amazon.awssdk:sdk-core'
23+
implementation 'software.amazon.awssdk:auth'
24+
implementation 'software.amazon.awssdk:sts'
25+
implementation 'software.amazon.awssdk:regions'
26+
implementation 'software.amazon.awssdk:http-client-spi'
27+
implementation 'software.amazon.awssdk:apache-client'
28+
29+
// Hibernate
30+
implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final'
31+
32+
// OpenTelemetry Protobuf
33+
implementation libs.opentelemetry.proto
34+
implementation libs.protobuf.util
35+
testImplementation libs.opentelemetry.proto
36+
testImplementation libs.protobuf.util
37+
38+
// Jackson
39+
implementation 'com.fasterxml.jackson.core:jackson-databind'
40+
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
41+
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
42+
43+
// Lombok
44+
compileOnly 'org.projectlombok:lombok:1.18.30'
45+
annotationProcessor 'org.projectlombok:lombok:1.18.30'
46+
47+
// Data Prepper Projects
48+
implementation project(':data-prepper-api')
49+
implementation project(':data-prepper-plugins:aws-plugin-api')
50+
implementation project(':data-prepper-plugins:otel-proto-common')
51+
52+
// Armeria
53+
implementation libs.armeria.core
54+
55+
// Metrics
56+
implementation 'io.micrometer:micrometer-core'
57+
}
58+
59+
test {
60+
useJUnitPlatform()
61+
finalizedBy jacocoTestReport, jacocoTestCoverageVerification
62+
}
63+
64+
tasks.register('integrationTest', Test) {
65+
description = 'Runs integration tests.'
66+
group = 'verification'
67+
testClassesDirs = sourceSets.integrationTest.output.classesDirs
68+
classpath = sourceSets.integrationTest.runtimeClasspath
69+
shouldRunAfter test
70+
useJUnitPlatform()
71+
}
72+
73+
check.dependsOn integrationTest
74+
75+
jacocoTestCoverageVerification {
76+
violationRules {
77+
rule {
78+
enabled = true
79+
element = 'CLASS'
80+
includes = ['org.opensearch.dataprepper.plugins.sink.otlp.*']
81+
limit {
82+
counter = 'LINE'
83+
value = 'COVEREDRATIO'
84+
minimum = 1.00
85+
}
86+
}
87+
}
88+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.sink.otlp;
7+
8+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
9+
import org.opensearch.dataprepper.metrics.PluginMetrics;
10+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
11+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
12+
import org.opensearch.dataprepper.model.annotations.Experimental;
13+
import org.opensearch.dataprepper.model.configuration.PluginSetting;
14+
import org.opensearch.dataprepper.model.record.Record;
15+
import org.opensearch.dataprepper.model.sink.AbstractSink;
16+
import org.opensearch.dataprepper.model.sink.Sink;
17+
import org.opensearch.dataprepper.model.trace.Span;
18+
import org.opensearch.dataprepper.plugins.sink.otlp.buffer.OtlpSinkBuffer;
19+
import org.opensearch.dataprepper.plugins.sink.otlp.configuration.OtlpSinkConfig;
20+
import org.opensearch.dataprepper.plugins.sink.otlp.metrics.OtlpSinkMetrics;
21+
22+
import javax.annotation.Nonnull;
23+
import java.util.Collection;
24+
25+
/**
26+
* OTLP Sink Plugin for Data Prepper.
27+
*/
28+
@Experimental
29+
@DataPrepperPlugin(
30+
name = "otlp",
31+
pluginType = Sink.class,
32+
pluginConfigurationType = OtlpSinkConfig.class
33+
)
34+
public class OtlpSink extends AbstractSink<Record<Span>> {
35+
private volatile boolean initialized = false;
36+
37+
private final OtlpSinkBuffer buffer;
38+
private final OtlpSinkMetrics sinkMetrics;
39+
40+
/**
41+
* Constructor for the OTLP sink plugin.
42+
*
43+
* @param awsCredentialsSupplier the AWS credentials supplier
44+
* @param config the configuration for the sink
45+
* @param pluginMetrics the plugin metrics to use
46+
* @param pluginSetting the plugin setting to use
47+
*/
48+
@DataPrepperPluginConstructor
49+
public OtlpSink(@Nonnull final AwsCredentialsSupplier awsCredentialsSupplier, @Nonnull final OtlpSinkConfig config, @Nonnull final PluginMetrics pluginMetrics, @Nonnull final PluginSetting pluginSetting) {
50+
super(pluginSetting);
51+
52+
this.sinkMetrics = new OtlpSinkMetrics(pluginMetrics, pluginSetting);
53+
this.buffer = new OtlpSinkBuffer(awsCredentialsSupplier, config, sinkMetrics);
54+
}
55+
56+
/**
57+
* Initialize the buffer
58+
*/
59+
@Override
60+
public void doInitialize() {
61+
buffer.start();
62+
initialized = true;
63+
}
64+
65+
/**
66+
* Implement the sink's output logic
67+
*
68+
* @param records Records to be output
69+
*/
70+
@Override
71+
public void doOutput(@Nonnull final Collection<Record<Span>> records) {
72+
for (final Record<Span> record : records) {
73+
buffer.add(record);
74+
}
75+
}
76+
77+
/**
78+
* Indicates whether this sink is ready to receive data.
79+
*
80+
* @return true if the sink is ready
81+
*/
82+
@Override
83+
public boolean isReady() {
84+
return initialized && buffer.isRunning();
85+
}
86+
87+
/**
88+
* Hook called during pipeline shutdown.
89+
*/
90+
@Override
91+
public void shutdown() {
92+
super.shutdown();
93+
buffer.stop();
94+
}
95+
}

0 commit comments

Comments
 (0)