Skip to content

Commit 8400825

Browse files
committed
add support for OpenSearch formats & update readme
Signed-off-by: Shenoy Pratik <sgguruda@amazon.com>
1 parent b6d8d47 commit 8400825

4 files changed

Lines changed: 67 additions & 6 deletions

File tree

data-prepper-plugins/otlp-source/README.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ otel-telemetry-pipeline:
1414
otlp:
1515
ssl: false
1616
route:
17-
- logs: 'getMetadata("eventType") == "LOG"'
18-
- traces: 'getMetadata("eventType") == "TRACE"'
19-
- metrics: 'getMetadata("eventType") == "METRIC"'
17+
- logs: 'getEventType() == "LOG"'
18+
- traces: 'getEventType() == "TRACE"'
19+
- metrics: 'getEventType() == "METRIC"'
2020
sink:
2121
- pipeline:
2222
name: "logs-pipeline"
@@ -47,6 +47,15 @@ otel-telemetry-pipeline:
4747
- compression(Optional) => The compression type applied on the client request payload. Defaults to `none`. Supported values are:
4848
- `none`: no compression
4949
- `gzip`: apply GZip de-compression on the incoming request.
50+
- logs_output_format(Optional) => Specifies the decoded output format for logs. Supported values are:
51+
- `otel`: OpenTelemetry format (default).
52+
- `opensearch`: OpenSearch format.
53+
- metrics_output_format(Optional) => Specifies the decoded output format for metrics. Supported values are:
54+
- `otel`: OpenTelemetry format (default).
55+
- `opensearch`: OpenSearch format.
56+
- traces_output_format(Optional) => Specifies the decoded output format for traces. Supported values are:
57+
- `otel`: OpenTelemetry format (default).
58+
- `opensearch`: OpenSearch format.
5059

5160
### Retry Information
5261

data-prepper-plugins/otlp-source/src/main/java/org/opensearch/dataprepper/plugins/source/otlp/OTLPSource.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
import org.opensearch.dataprepper.model.source.Source;
1515
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
16+
import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat;
17+
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoOpensearchCodec;
1618
import org.opensearch.dataprepper.plugins.otel.codec.OTelProtoStandardCodec;
1719
import org.opensearch.dataprepper.plugins.source.otellogs.OTelLogsGrpcService;
1820
import org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsGrpcService;
@@ -87,17 +89,17 @@ public void start(Buffer<Record<Object>> buffer) {
8789

8890
final OTelLogsGrpcService oTelLogsGrpcService = new OTelLogsGrpcService(
8991
(int) (otlpSourceConfig.getRequestTimeoutInMillis() * 0.8),
90-
new OTelProtoStandardCodec.OTelProtoDecoder(),
92+
otlpSourceConfig.getLogsOutputFormat() == OTelOutputFormat.OPENSEARCH ? new OTelProtoOpensearchCodec.OTelProtoDecoder() : new OTelProtoStandardCodec.OTelProtoDecoder(),
9193
buffer, pluginMetrics);
9294

9395
final OTelMetricsGrpcService oTelMetricsGrpcService = new OTelMetricsGrpcService(
9496
(int) (otlpSourceConfig.getRequestTimeoutInMillis() * 0.8),
95-
new OTelProtoStandardCodec.OTelProtoDecoder(),
97+
otlpSourceConfig.getMetricsOutputFormat() == OTelOutputFormat.OPENSEARCH ? new OTelProtoOpensearchCodec.OTelProtoDecoder() : new OTelProtoStandardCodec.OTelProtoDecoder(),
9698
metricBuffer, pluginMetrics);
9799

98100
final OTelTraceGrpcService oTelTraceGrpcService = new OTelTraceGrpcService(
99101
(int) (otlpSourceConfig.getRequestTimeoutInMillis() * 0.8),
100-
new OTelProtoStandardCodec.OTelProtoDecoder(),
102+
otlpSourceConfig.getTracesOutputFormat() == OTelOutputFormat.OPENSEARCH ? new OTelProtoOpensearchCodec.OTelProtoDecoder() : new OTelProtoStandardCodec.OTelProtoDecoder(),
101103
buffer, pluginMetrics);
102104

103105
ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(otlpSourceConfig);

data-prepper-plugins/otlp-source/src/main/java/org/opensearch/dataprepper/plugins/source/otlp/OTLPSourceConfig.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.commons.lang3.StringUtils;
1212
import org.opensearch.dataprepper.model.types.ByteCount;
1313
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
14+
import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat;
1415
import org.opensearch.dataprepper.plugins.server.RetryInfoConfig;
1516
import org.opensearch.dataprepper.model.configuration.PluginModel;
1617

@@ -21,6 +22,9 @@ public class OTLPSourceConfig {
2122
static final String METRICS_PATH = "metrics_path";
2223
static final String TRACES_PATH = "traces_path";
2324
static final String SSL = "ssl";
25+
static final String LOGS_OUTPUT_FORMAT = "logs_output_format";
26+
static final String METRICS_OUTPUT_FORMAT = "metrics_output_format";
27+
static final String TRACES_OUTPUT_FORMAT = "traces_output_format";
2428
static final String USE_ACM_CERT_FOR_SSL = "useAcmCertForSSL";
2529
static final String ACM_CERT_ISSUE_TIME_OUT_MILLIS = "acmCertIssueTimeOutMillis";
2630
static final String HEALTH_CHECK_SERVICE = "health_check_service";
@@ -78,6 +82,15 @@ public class OTLPSourceConfig {
7882
@JsonProperty(SSL)
7983
private boolean ssl = DEFAULT_SSL;
8084

85+
@JsonProperty(LOGS_OUTPUT_FORMAT)
86+
private OTelOutputFormat logsOutputFormat = OTelOutputFormat.OTEL;
87+
88+
@JsonProperty(METRICS_OUTPUT_FORMAT)
89+
private OTelOutputFormat metricsOutputFormat = OTelOutputFormat.OTEL;
90+
91+
@JsonProperty(TRACES_OUTPUT_FORMAT)
92+
private OTelOutputFormat tracesOutputFormat = OTelOutputFormat.OTEL;
93+
8194
@JsonProperty(USE_ACM_CERT_FOR_SSL)
8295
private boolean useAcmCertForSSL = DEFAULT_USE_ACM_CERT_FOR_SSL;
8396

@@ -181,6 +194,18 @@ public int getRequestTimeoutInMillis() {
181194
return requestTimeoutInMillis;
182195
}
183196

197+
public OTelOutputFormat getLogsOutputFormat() {
198+
return logsOutputFormat;
199+
}
200+
201+
public OTelOutputFormat getMetricsOutputFormat() {
202+
return metricsOutputFormat;
203+
}
204+
205+
public OTelOutputFormat getTracesOutputFormat() {
206+
return tracesOutputFormat;
207+
}
208+
184209
public int getPort() {
185210
return port;
186211
}

data-prepper-plugins/otlp-source/src/test/java/org/opensearch/dataprepper/plugins/source/otlp/OTLPSourceConfigTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.junit.jupiter.params.provider.MethodSource;
1414
import org.opensearch.dataprepper.model.configuration.PluginSetting;
1515
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
16+
import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat;
1617
import org.opensearch.dataprepper.plugins.server.RetryInfoConfig;
1718

1819
import java.time.Duration;
@@ -369,6 +370,30 @@ void testInvalidPathsDoNotStartWithSlash() {
369370
assertFalse(config.isTracesPathValid());
370371
}
371372

373+
@Test
374+
void testDefaultOutputFormats() {
375+
final OTLPSourceConfig config = new OTLPSourceConfig();
376+
377+
assertEquals(OTelOutputFormat.OTEL, config.getLogsOutputFormat());
378+
assertEquals(OTelOutputFormat.OTEL, config.getMetricsOutputFormat());
379+
assertEquals(OTelOutputFormat.OTEL, config.getTracesOutputFormat());
380+
}
381+
382+
@Test
383+
void testCustomOutputFormats() {
384+
final Map<String, Object> settings = new HashMap<>();
385+
settings.put(OTLPSourceConfig.LOGS_OUTPUT_FORMAT, OTelOutputFormat.OPENSEARCH.getFormatName());
386+
settings.put(OTLPSourceConfig.METRICS_OUTPUT_FORMAT, OTelOutputFormat.OPENSEARCH.getFormatName());
387+
settings.put(OTLPSourceConfig.TRACES_OUTPUT_FORMAT, OTelOutputFormat.OPENSEARCH.getFormatName());
388+
389+
final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, settings);
390+
final OTLPSourceConfig config = OBJECT_MAPPER.convertValue(pluginSetting.getSettings(), OTLPSourceConfig.class);
391+
392+
assertEquals(OTelOutputFormat.OPENSEARCH, config.getLogsOutputFormat());
393+
assertEquals(OTelOutputFormat.OPENSEARCH, config.getMetricsOutputFormat());
394+
assertEquals(OTelOutputFormat.OPENSEARCH, config.getTracesOutputFormat());
395+
}
396+
372397
private PluginSetting completePluginSetting(final int requestTimeoutInMillis, final int port, final String path,
373398
final boolean healthCheck, final boolean protoReflectionService,
374399
final boolean enableUnframedRequests, final boolean isSSL,

0 commit comments

Comments
 (0)