Skip to content

Commit de0902d

Browse files
authored
Replacing CompletableFuture by CompletableResultCode (#2670)
1 parent fdbaf63 commit de0902d

12 files changed

Lines changed: 50 additions & 128 deletions

File tree

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/LogRecordToDiskExporter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public final class LogRecordToDiskExporter implements LogRecordExporter {
2222
private final ExporterCallback<LogRecordData> callback;
2323
private static final ExporterCallback<LogRecordData> DEFAULT_CALLBACK =
2424
new NoopExporterCallback<>();
25-
private static final Duration DEFAULT_EXPORT_TIMEOUT = Duration.ofSeconds(10);
25+
private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofSeconds(10);
2626

2727
private LogRecordToDiskExporter(
2828
SignalStorageExporter<LogRecordData> storageExporter,
@@ -54,7 +54,7 @@ public CompletableResultCode shutdown() {
5454
public static final class Builder {
5555
private final SignalStorage.LogRecord storage;
5656
private ExporterCallback<LogRecordData> callback = DEFAULT_CALLBACK;
57-
private Duration writeTimeout = DEFAULT_EXPORT_TIMEOUT;
57+
private Duration writeTimeout = DEFAULT_WRITE_TIMEOUT;
5858

5959
@CanIgnoreReturnValue
6060
public Builder setExporterCallback(ExporterCallback<LogRecordData> value) {

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/MetricToDiskExporter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public final class MetricToDiskExporter implements MetricExporter {
2525
private final AggregationTemporalitySelector aggregationTemporalitySelector;
2626
private final ExporterCallback<MetricData> callback;
2727
private static final ExporterCallback<MetricData> DEFAULT_CALLBACK = new NoopExporterCallback<>();
28-
private static final Duration DEFAULT_EXPORT_TIMEOUT = Duration.ofSeconds(10);
28+
private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofSeconds(10);
2929

3030
private MetricToDiskExporter(
3131
SignalStorageExporter<MetricData> storageExporter,
@@ -66,7 +66,7 @@ public static final class Builder {
6666
private AggregationTemporalitySelector aggregationTemporalitySelector =
6767
AggregationTemporalitySelector.alwaysCumulative();
6868
private ExporterCallback<MetricData> callback = DEFAULT_CALLBACK;
69-
private Duration writeTimeout = DEFAULT_EXPORT_TIMEOUT;
69+
private Duration writeTimeout = DEFAULT_WRITE_TIMEOUT;
7070

7171
@CanIgnoreReturnValue
7272
public Builder setExporterCallback(ExporterCallback<MetricData> value) {

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/SpanToDiskExporter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public final class SpanToDiskExporter implements SpanExporter {
2121
private final SignalStorageExporter<SpanData> storageExporter;
2222
private final ExporterCallback<SpanData> callback;
2323
private static final ExporterCallback<SpanData> DEFAULT_CALLBACK = new NoopExporterCallback<>();
24-
private static final Duration DEFAULT_EXPORT_TIMEOUT = Duration.ofSeconds(10);
24+
private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofSeconds(10);
2525

2626
private SpanToDiskExporter(
2727
SignalStorageExporter<SpanData> storageExporter, ExporterCallback<SpanData> callback) {
@@ -52,7 +52,7 @@ public CompletableResultCode shutdown() {
5252
public static final class Builder {
5353
private final SignalStorage.Span storage;
5454
private ExporterCallback<SpanData> callback = DEFAULT_CALLBACK;
55-
private Duration writeTimeout = DEFAULT_EXPORT_TIMEOUT;
55+
private Duration writeTimeout = DEFAULT_WRITE_TIMEOUT;
5656

5757
private Builder(SignalStorage.Span storage) {
5858
this.storage = storage;

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/SignalStorageExporter.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,12 @@
55

66
package io.opentelemetry.contrib.disk.buffering.internal.exporters;
77

8-
import static java.util.concurrent.TimeUnit.MILLISECONDS;
9-
108
import io.opentelemetry.contrib.disk.buffering.exporters.callback.ExporterCallback;
119
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
12-
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
1310
import io.opentelemetry.sdk.common.CompletableResultCode;
1411
import java.time.Duration;
1512
import java.util.Collection;
16-
import java.util.concurrent.CompletableFuture;
17-
import java.util.concurrent.ExecutionException;
13+
import java.util.concurrent.TimeUnit;
1814
import java.util.concurrent.TimeoutException;
1915

2016
/** Internal utility for common export to disk operations across all exporters. */
@@ -31,23 +27,26 @@ public SignalStorageExporter(
3127
}
3228

3329
public CompletableResultCode exportToStorage(Collection<T> items) {
34-
CompletableFuture<WriteResult> future = storage.write(items);
35-
try {
36-
WriteResult operation = future.get(writeTimeout.toMillis(), MILLISECONDS);
37-
if (operation.isSuccessful()) {
38-
callback.onExportSuccess(items);
39-
return CompletableResultCode.ofSuccess();
40-
}
30+
CompletableResultCode result =
31+
storage.write(items).join(writeTimeout.toMillis(), TimeUnit.MILLISECONDS);
4132

42-
Throwable error = operation.getError();
33+
if (!result.isDone()) {
34+
TimeoutException timeout =
35+
new TimeoutException("Storage write timed out after " + writeTimeout.toMillis() + "ms");
36+
callback.onExportError(items, timeout);
37+
return CompletableResultCode.ofExceptionalFailure(timeout);
38+
}
39+
40+
if (!result.isSuccess()) {
41+
Throwable error = result.getFailureThrowable();
4342
callback.onExportError(items, error);
4443
if (error != null) {
4544
return CompletableResultCode.ofExceptionalFailure(error);
4645
}
4746
return CompletableResultCode.ofFailure();
48-
} catch (ExecutionException | InterruptedException | TimeoutException e) {
49-
callback.onExportError(items, e);
50-
return CompletableResultCode.ofExceptionalFailure(e);
5147
}
48+
49+
callback.onExportSuccess(items);
50+
return CompletableResultCode.ofSuccess();
5251
}
5352
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FileSignalStorage.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@
88
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
99
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
1010
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
11-
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
11+
import io.opentelemetry.sdk.common.CompletableResultCode;
1212
import java.io.IOException;
1313
import java.util.Collection;
1414
import java.util.Iterator;
15-
import java.util.concurrent.CompletableFuture;
1615
import java.util.concurrent.atomic.AtomicBoolean;
1716
import java.util.logging.Level;
1817
import java.util.logging.Logger;
@@ -46,34 +45,34 @@ public FileSignalStorage(
4645
}
4746

4847
@Override
49-
public CompletableFuture<WriteResult> write(Collection<T> items) {
48+
public CompletableResultCode write(Collection<T> items) {
5049
logger.finer("Intercepting batch.");
5150
try {
5251
serializer.initialize(items);
5352
if (storage.write(serializer)) {
54-
return CompletableFuture.completedFuture(WriteResult.successful());
53+
return CompletableResultCode.ofSuccess();
5554
}
5655
logger.fine("Could not store batch in disk.");
57-
return CompletableFuture.completedFuture(
58-
WriteResult.error(new Exception("Could not store batch in disk for an unknown reason.")));
56+
return CompletableResultCode.ofExceptionalFailure(
57+
new Exception("Could not store batch in disk for an unknown reason."));
5958
} catch (IOException e) {
6059
logger.log(
6160
Level.WARNING,
6261
"An unexpected error happened while attempting to write the data in disk.",
6362
e);
64-
return CompletableFuture.completedFuture(WriteResult.error(e));
63+
return CompletableResultCode.ofExceptionalFailure(e);
6564
} finally {
6665
serializer.reset();
6766
}
6867
}
6968

7069
@Override
71-
public CompletableFuture<WriteResult> clear() {
70+
public CompletableResultCode clear() {
7271
try {
7372
storage.clear();
74-
return CompletableFuture.completedFuture(WriteResult.successful());
73+
return CompletableResultCode.ofSuccess();
7574
} catch (IOException e) {
76-
return CompletableFuture.completedFuture(WriteResult.error(e));
75+
return CompletableResultCode.ofExceptionalFailure(e);
7776
}
7877
}
7978

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/SignalStorage.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,12 @@
55

66
package io.opentelemetry.contrib.disk.buffering.storage;
77

8-
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
8+
import io.opentelemetry.sdk.common.CompletableResultCode;
99
import io.opentelemetry.sdk.logs.data.LogRecordData;
1010
import io.opentelemetry.sdk.metrics.data.MetricData;
1111
import io.opentelemetry.sdk.trace.data.SpanData;
1212
import java.io.Closeable;
1313
import java.util.Collection;
14-
import java.util.concurrent.CompletableFuture;
1514

1615
/**
1716
* Allows writing and iterating over written signal items.
@@ -24,16 +23,16 @@ public interface SignalStorage<T> extends Iterable<Collection<T>>, Closeable {
2423
* Stores signal items.
2524
*
2625
* @param items The items to be stored.
27-
* @return A future with {@link WriteResult}.
26+
* @return A {@link CompletableResultCode} representing the outcome of the write operation.
2827
*/
29-
CompletableFuture<WriteResult> write(Collection<T> items);
28+
CompletableResultCode write(Collection<T> items);
3029

3130
/**
3231
* Removes all the previously stored items.
3332
*
34-
* @return A future with {@link WriteResult}.
33+
* @return A {@link CompletableResultCode} representing the outcome of the clear operation.
3534
*/
36-
CompletableFuture<WriteResult> clear();
35+
CompletableResultCode clear();
3736

3837
/**
3938
* Abstraction for Spans. Implementations should use this instead of {@link SignalStorage}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileLogRecordStorage.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,13 @@
1111
import io.opentelemetry.contrib.disk.buffering.internal.storage.FolderManager;
1212
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
1313
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
14-
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
1514
import io.opentelemetry.sdk.common.Clock;
15+
import io.opentelemetry.sdk.common.CompletableResultCode;
1616
import io.opentelemetry.sdk.logs.data.LogRecordData;
1717
import java.io.File;
1818
import java.io.IOException;
1919
import java.util.Collection;
2020
import java.util.Iterator;
21-
import java.util.concurrent.CompletableFuture;
2221
import javax.annotation.Nonnull;
2322

2423
public final class FileLogRecordStorage implements SignalStorage.LogRecord {
@@ -45,12 +44,12 @@ private FileLogRecordStorage(FileSignalStorage<LogRecordData> fileSignalStorage)
4544
}
4645

4746
@Override
48-
public CompletableFuture<WriteResult> write(Collection<LogRecordData> items) {
47+
public CompletableResultCode write(Collection<LogRecordData> items) {
4948
return fileSignalStorage.write(items);
5049
}
5150

5251
@Override
53-
public CompletableFuture<WriteResult> clear() {
52+
public CompletableResultCode clear() {
5453
return fileSignalStorage.clear();
5554
}
5655

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileMetricStorage.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,13 @@
1111
import io.opentelemetry.contrib.disk.buffering.internal.storage.FolderManager;
1212
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
1313
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
14-
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
1514
import io.opentelemetry.sdk.common.Clock;
15+
import io.opentelemetry.sdk.common.CompletableResultCode;
1616
import io.opentelemetry.sdk.metrics.data.MetricData;
1717
import java.io.File;
1818
import java.io.IOException;
1919
import java.util.Collection;
2020
import java.util.Iterator;
21-
import java.util.concurrent.CompletableFuture;
2221
import javax.annotation.Nonnull;
2322

2423
public final class FileMetricStorage implements SignalStorage.Metric {
@@ -45,12 +44,12 @@ private FileMetricStorage(FileSignalStorage<MetricData> fileSignalStorage) {
4544
}
4645

4746
@Override
48-
public CompletableFuture<WriteResult> write(Collection<MetricData> items) {
47+
public CompletableResultCode write(Collection<MetricData> items) {
4948
return fileSignalStorage.write(items);
5049
}
5150

5251
@Override
53-
public CompletableFuture<WriteResult> clear() {
52+
public CompletableResultCode clear() {
5453
return fileSignalStorage.clear();
5554
}
5655

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileSpanStorage.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,13 @@
1111
import io.opentelemetry.contrib.disk.buffering.internal.storage.FolderManager;
1212
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
1313
import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage;
14-
import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult;
1514
import io.opentelemetry.sdk.common.Clock;
15+
import io.opentelemetry.sdk.common.CompletableResultCode;
1616
import io.opentelemetry.sdk.trace.data.SpanData;
1717
import java.io.File;
1818
import java.io.IOException;
1919
import java.util.Collection;
2020
import java.util.Iterator;
21-
import java.util.concurrent.CompletableFuture;
2221
import javax.annotation.Nonnull;
2322

2423
public final class FileSpanStorage implements SignalStorage.Span {
@@ -45,12 +44,12 @@ private FileSpanStorage(FileSignalStorage<SpanData> fileSignalStorage) {
4544
}
4645

4746
@Override
48-
public CompletableFuture<WriteResult> write(Collection<SpanData> items) {
47+
public CompletableResultCode write(Collection<SpanData> items) {
4948
return fileSignalStorage.write(items);
5049
}
5150

5251
@Override
53-
public CompletableFuture<WriteResult> clear() {
52+
public CompletableResultCode clear() {
5453
return fileSignalStorage.clear();
5554
}
5655

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/result/DefaultWriteResult.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

0 commit comments

Comments
 (0)