Skip to content

Commit be25ac6

Browse files
Sridhar Reddy SurviSridhar Reddy Survi
authored andcommitted
Add fallback endpoint support for OTLP exporters
When the primary OTLP endpoint fails with a transport error (after retries are exhausted), the exporter will automatically attempt to send telemetry data to a configurable fallback endpoint. This enables high-availability setups where a secondary collector can receive data when the primary is unavailable. Configuration via environment variables / system properties: - otel.exporter.otlp.fallback.endpoint (generic) - otel.exporter.otlp.<signal>.fallback.endpoint (signal-specific) Programmatic configuration via builder: - setFallbackEndpoint(String) on all exporter builders Supported for all signal types (traces, metrics, logs) and both HTTP/protobuf and gRPC protocols.
1 parent 2acd434 commit be25ac6

18 files changed

Lines changed: 623 additions & 14 deletions

File tree

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,19 @@
11
Comparing source compatibility of opentelemetry-exporter-otlp-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-otlp-1.60.1.jar
2-
No changes.
2+
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporterBuilder (not serializable)
3+
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
4+
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporterBuilder setFallbackEndpoint(java.lang.String)
5+
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder (not serializable)
6+
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
7+
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder setFallbackEndpoint(java.lang.String)
8+
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder (not serializable)
9+
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
10+
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder setFallbackEndpoint(java.lang.String)
11+
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder (not serializable)
12+
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
13+
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder setFallbackEndpoint(java.lang.String)
14+
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder (not serializable)
15+
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
16+
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder setFallbackEndpoint(java.lang.String)
17+
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder (not serializable)
18+
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
19+
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder setFallbackEndpoint(java.lang.String)

exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporter.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
import io.opentelemetry.sdk.common.internal.StandardComponentId;
1818
import io.opentelemetry.sdk.common.internal.ThrottlingLogger;
1919
import java.net.URI;
20+
import java.util.Arrays;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122
import java.util.function.Supplier;
2223
import java.util.logging.Level;
2324
import java.util.logging.Logger;
25+
import javax.annotation.Nullable;
2426

2527
/**
2628
* Generic gRPC exporter.
@@ -41,6 +43,7 @@ public final class GrpcExporter {
4143

4244
private final String type;
4345
private final GrpcSender grpcSender;
46+
@Nullable private final GrpcSender fallbackGrpcSender;
4447
private final ExporterInstrumentation exporterMetrics;
4548

4649
public GrpcExporter(
@@ -49,8 +52,19 @@ public GrpcExporter(
4952
StandardComponentId componentId,
5053
Supplier<MeterProvider> meterProviderSupplier,
5154
URI endpoint) {
55+
this(grpcSender, internalTelemetryVersion, componentId, meterProviderSupplier, endpoint, null);
56+
}
57+
58+
public GrpcExporter(
59+
GrpcSender grpcSender,
60+
InternalTelemetryVersion internalTelemetryVersion,
61+
StandardComponentId componentId,
62+
Supplier<MeterProvider> meterProviderSupplier,
63+
URI endpoint,
64+
@Nullable GrpcSender fallbackGrpcSender) {
5265
this.type = componentId.getStandardType().signal().logFriendlyName();
5366
this.grpcSender = grpcSender;
67+
this.fallbackGrpcSender = fallbackGrpcSender;
5468
this.exporterMetrics =
5569
new ExporterInstrumentation(
5670
internalTelemetryVersion, meterProviderSupplier, componentId, endpoint);
@@ -69,7 +83,22 @@ public CompletableResultCode export(Marshaler exportRequest, int numItems) {
6983
grpcSender.send(
7084
exportRequest.toBinaryMessageWriter(),
7185
grpcResponse -> onResponse(result, metricRecording, grpcResponse),
72-
throwable -> onError(result, metricRecording, throwable));
86+
throwable -> {
87+
if (fallbackGrpcSender != null) {
88+
logger.log(
89+
Level.INFO,
90+
"Primary endpoint failed for "
91+
+ type
92+
+ "s, attempting fallback endpoint. Error: "
93+
+ throwable.getMessage());
94+
fallbackGrpcSender.send(
95+
exportRequest.toBinaryMessageWriter(),
96+
grpcResponse -> onResponse(result, metricRecording, grpcResponse),
97+
fallbackThrowable -> onError(result, metricRecording, fallbackThrowable));
98+
} else {
99+
onError(result, metricRecording, throwable);
100+
}
101+
});
73102

74103
return result;
75104
}
@@ -143,6 +172,11 @@ public CompletableResultCode shutdown() {
143172
logger.log(Level.INFO, "Calling shutdown() multiple times.");
144173
return CompletableResultCode.ofSuccess();
145174
}
146-
return grpcSender.shutdown();
175+
CompletableResultCode primaryResult = grpcSender.shutdown();
176+
if (fallbackGrpcSender != null) {
177+
CompletableResultCode fallbackResult = fallbackGrpcSender.shutdown();
178+
return CompletableResultCode.ofAll(Arrays.asList(primaryResult, fallbackResult));
179+
}
180+
return primaryResult;
147181
}
148182
}

exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public class GrpcExporterBuilder {
5757
private Duration timeout;
5858
private Duration connectTimeout = Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_SECS);
5959
private URI endpoint;
60+
@Nullable private URI fallbackEndpoint;
6061
@Nullable private Compressor compressor;
6162
private final Map<String, String> constantHeaders = new HashMap<>();
6263
private Supplier<Map<String, String>> headerSupplier = Collections::emptyMap;
@@ -104,6 +105,11 @@ public GrpcExporterBuilder setEndpoint(String endpoint) {
104105
return this;
105106
}
106107

108+
public GrpcExporterBuilder setFallbackEndpoint(String fallbackEndpoint) {
109+
this.fallbackEndpoint = ExporterBuilderUtil.validateEndpoint(fallbackEndpoint);
110+
return this;
111+
}
112+
107113
public GrpcExporterBuilder setCompression(@Nullable Compressor compressor) {
108114
this.compressor = compressor;
109115
return this;
@@ -190,6 +196,7 @@ public GrpcExporterBuilder copy() {
190196
copy.internalTelemetryVersion = internalTelemetryVersion;
191197
copy.grpcChannel = grpcChannel;
192198
copy.componentLoader = componentLoader;
199+
copy.fallbackEndpoint = fallbackEndpoint;
193200
return copy;
194201
}
195202

@@ -233,12 +240,33 @@ public GrpcExporter build() {
233240
grpcChannel));
234241
LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName());
235242

243+
GrpcSender fallbackSender = null;
244+
if (fallbackEndpoint != null) {
245+
boolean isFallbackPlainHttp = "http".equals(fallbackEndpoint.getScheme());
246+
fallbackSender =
247+
grpcSenderProvider.createSender(
248+
ImmutableGrpcSenderConfig.create(
249+
fallbackEndpoint,
250+
fullMethodName,
251+
compressor,
252+
timeout,
253+
connectTimeout,
254+
headerSupplier,
255+
retryPolicy,
256+
isFallbackPlainHttp ? null : tlsConfigHelper.getSslContext(),
257+
isFallbackPlainHttp ? null : tlsConfigHelper.getTrustManager(),
258+
executorService,
259+
grpcChannel));
260+
LOGGER.log(Level.FINE, "Using fallback GrpcSender: " + fallbackSender.getClass().getName());
261+
}
262+
236263
return new GrpcExporter(
237264
grpcSender,
238265
internalTelemetryVersion,
239266
ComponentId.generateLazy(exporterType),
240267
meterProviderSupplier,
241-
endpoint);
268+
endpoint,
269+
fallbackSender);
242270
}
243271

244272
public String toString(boolean includePrefixAndSuffix) {
@@ -247,6 +275,9 @@ public String toString(boolean includePrefixAndSuffix) {
247275
? new StringJoiner(", ", "GrpcExporterBuilder{", "}")
248276
: new StringJoiner(", ");
249277
joiner.add("endpoint=" + endpoint.toString());
278+
if (fallbackEndpoint != null) {
279+
joiner.add("fallbackEndpoint=" + fallbackEndpoint);
280+
}
250281
joiner.add("fullMethodName=" + fullMethodName);
251282
joiner.add("timeoutNanos=" + timeout.toNanos());
252283
joiner.add("connectTimeoutNanos=" + connectTimeout.toNanos());

exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.opentelemetry.sdk.common.internal.ThrottlingLogger;
2020
import java.io.IOException;
2121
import java.net.URI;
22+
import java.util.Arrays;
2223
import java.util.concurrent.atomic.AtomicBoolean;
2324
import java.util.function.Supplier;
2425
import java.util.logging.Level;
@@ -41,6 +42,7 @@ public final class HttpExporter {
4142

4243
private final String type;
4344
private final HttpSender httpSender;
45+
@Nullable private final HttpSender fallbackHttpSender;
4446
private final ExporterInstrumentation exporterMetrics;
4547
private final boolean exportAsJson;
4648

@@ -51,8 +53,27 @@ public HttpExporter(
5153
InternalTelemetryVersion internalTelemetryVersion,
5254
URI endpoint,
5355
boolean exportAsJson) {
56+
this(
57+
componentId,
58+
httpSender,
59+
meterProviderSupplier,
60+
internalTelemetryVersion,
61+
endpoint,
62+
exportAsJson,
63+
null);
64+
}
65+
66+
public HttpExporter(
67+
StandardComponentId componentId,
68+
HttpSender httpSender,
69+
Supplier<MeterProvider> meterProviderSupplier,
70+
InternalTelemetryVersion internalTelemetryVersion,
71+
URI endpoint,
72+
boolean exportAsJson,
73+
@Nullable HttpSender fallbackHttpSender) {
5474
this.type = componentId.getStandardType().signal().logFriendlyName();
5575
this.httpSender = httpSender;
76+
this.fallbackHttpSender = fallbackHttpSender;
5677
this.exporterMetrics =
5778
new ExporterInstrumentation(
5879
internalTelemetryVersion, meterProviderSupplier, componentId, endpoint);
@@ -74,7 +95,26 @@ public CompletableResultCode export(Marshaler exportRequest, int numItems) {
7495
httpSender.send(
7596
messageWriter,
7697
httpResponse -> onResponse(result, metricRecording, httpResponse),
77-
throwable -> onError(result, metricRecording, throwable));
98+
throwable -> {
99+
if (fallbackHttpSender != null) {
100+
logger.log(
101+
Level.INFO,
102+
"Primary endpoint failed for "
103+
+ type
104+
+ "s, attempting fallback endpoint. Error: "
105+
+ throwable.getMessage());
106+
MessageWriter fallbackMessageWriter =
107+
exportAsJson
108+
? exportRequest.toJsonMessageWriter()
109+
: exportRequest.toBinaryMessageWriter();
110+
fallbackHttpSender.send(
111+
fallbackMessageWriter,
112+
httpResponse -> onResponse(result, metricRecording, httpResponse),
113+
fallbackThrowable -> onError(result, metricRecording, fallbackThrowable));
114+
} else {
115+
onError(result, metricRecording, throwable);
116+
}
117+
});
78118

79119
return result;
80120
}
@@ -131,7 +171,12 @@ public CompletableResultCode shutdown() {
131171
logger.log(Level.INFO, "Calling shutdown() multiple times.");
132172
return CompletableResultCode.ofSuccess();
133173
}
134-
return httpSender.shutdown();
174+
CompletableResultCode primaryResult = httpSender.shutdown();
175+
if (fallbackHttpSender != null) {
176+
CompletableResultCode fallbackResult = fallbackHttpSender.shutdown();
177+
return CompletableResultCode.ofAll(Arrays.asList(primaryResult, fallbackResult));
178+
}
179+
return primaryResult;
135180
}
136181

137182
private static String extractErrorStatus(String statusMessage, @Nullable byte[] responseBody) {

exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public final class HttpExporterBuilder {
5353
private StandardComponentId.ExporterType exporterType;
5454

5555
private URI endpoint;
56+
@Nullable private URI fallbackEndpoint;
5657

5758
private Duration timeout = Duration.ofSeconds(DEFAULT_TIMEOUT_SECS);
5859
@Nullable private Compressor compressor;
@@ -96,6 +97,11 @@ public HttpExporterBuilder setEndpoint(String endpoint) {
9697
return this;
9798
}
9899

100+
public HttpExporterBuilder setFallbackEndpoint(String fallbackEndpoint) {
101+
this.fallbackEndpoint = ExporterBuilderUtil.validateEndpoint(fallbackEndpoint);
102+
return this;
103+
}
104+
99105
public HttpExporterBuilder setCompression(@Nullable Compressor compressor) {
100106
this.compressor = compressor;
101107
return this;
@@ -205,6 +211,7 @@ public HttpExporterBuilder copy() {
205211
copy.internalTelemetryVersion = internalTelemetryVersion;
206212
copy.proxyOptions = proxyOptions;
207213
copy.componentLoader = componentLoader;
214+
copy.fallbackEndpoint = fallbackEndpoint;
208215
return copy;
209216
}
210217

@@ -231,12 +238,13 @@ public HttpExporter build() {
231238
};
232239

233240
boolean isPlainHttp = endpoint.getScheme().equals("http");
241+
String contentType = exportAsJson ? "application/json" : "application/x-protobuf";
234242
HttpSenderProvider httpSenderProvider = SenderUtil.resolveHttpSenderProvider(componentLoader);
235243
HttpSender httpSender =
236244
httpSenderProvider.createSender(
237245
ImmutableHttpSenderConfig.create(
238246
endpoint,
239-
exportAsJson ? "application/json" : "application/x-protobuf",
247+
contentType,
240248
compressor,
241249
timeout,
242250
connectTimeout,
@@ -248,13 +256,34 @@ public HttpExporter build() {
248256
executorService));
249257
LOGGER.log(Level.FINE, "Using HttpSender: " + httpSender.getClass().getName());
250258

259+
HttpSender fallbackSender = null;
260+
if (fallbackEndpoint != null) {
261+
boolean isFallbackPlainHttp = fallbackEndpoint.getScheme().equals("http");
262+
fallbackSender =
263+
httpSenderProvider.createSender(
264+
ImmutableHttpSenderConfig.create(
265+
fallbackEndpoint,
266+
contentType,
267+
compressor,
268+
timeout,
269+
connectTimeout,
270+
headerSupplier,
271+
proxyOptions,
272+
retryPolicy,
273+
isFallbackPlainHttp ? null : tlsConfigHelper.getSslContext(),
274+
isFallbackPlainHttp ? null : tlsConfigHelper.getTrustManager(),
275+
executorService));
276+
LOGGER.log(Level.FINE, "Using fallback HttpSender: " + fallbackSender.getClass().getName());
277+
}
278+
251279
return new HttpExporter(
252280
ComponentId.generateLazy(exporterType),
253281
httpSender,
254282
meterProviderSupplier,
255283
internalTelemetryVersion,
256284
endpoint,
257-
exportAsJson);
285+
exportAsJson,
286+
fallbackSender);
258287
}
259288

260289
public String toString(boolean includePrefixAndSuffix) {
@@ -263,6 +292,9 @@ public String toString(boolean includePrefixAndSuffix) {
263292
? new StringJoiner(", ", "HttpExporterBuilder{", "}")
264293
: new StringJoiner(", ");
265294
joiner.add("endpoint=" + endpoint);
295+
if (fallbackEndpoint != null) {
296+
joiner.add("fallbackEndpoint=" + fallbackEndpoint);
297+
}
266298
joiner.add("timeoutNanos=" + timeout.toNanos());
267299
joiner.add("proxyOptions=" + proxyOptions);
268300
joiner.add(

0 commit comments

Comments
 (0)