diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/LogRecordToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/LogRecordToDiskExporter.java index 6ed7ae2b4..fbdf11bd3 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/LogRecordToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/LogRecordToDiskExporter.java @@ -22,7 +22,7 @@ public final class LogRecordToDiskExporter implements LogRecordExporter { private final ExporterCallback callback; private static final ExporterCallback DEFAULT_CALLBACK = new NoopExporterCallback<>(); - private static final Duration DEFAULT_EXPORT_TIMEOUT = Duration.ofSeconds(10); + private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofSeconds(10); private LogRecordToDiskExporter( SignalStorageExporter storageExporter, @@ -54,7 +54,7 @@ public CompletableResultCode shutdown() { public static final class Builder { private final SignalStorage.LogRecord storage; private ExporterCallback callback = DEFAULT_CALLBACK; - private Duration writeTimeout = DEFAULT_EXPORT_TIMEOUT; + private Duration writeTimeout = DEFAULT_WRITE_TIMEOUT; @CanIgnoreReturnValue public Builder setExporterCallback(ExporterCallback value) { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/MetricToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/MetricToDiskExporter.java index fe7a86abf..3145b7249 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/MetricToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/MetricToDiskExporter.java @@ -25,7 +25,7 @@ public final class MetricToDiskExporter implements MetricExporter { private final AggregationTemporalitySelector aggregationTemporalitySelector; private final ExporterCallback callback; private static final ExporterCallback DEFAULT_CALLBACK = new NoopExporterCallback<>(); - private static final Duration DEFAULT_EXPORT_TIMEOUT = Duration.ofSeconds(10); + private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofSeconds(10); private MetricToDiskExporter( SignalStorageExporter storageExporter, @@ -66,7 +66,7 @@ public static final class Builder { private AggregationTemporalitySelector aggregationTemporalitySelector = AggregationTemporalitySelector.alwaysCumulative(); private ExporterCallback callback = DEFAULT_CALLBACK; - private Duration writeTimeout = DEFAULT_EXPORT_TIMEOUT; + private Duration writeTimeout = DEFAULT_WRITE_TIMEOUT; @CanIgnoreReturnValue public Builder setExporterCallback(ExporterCallback value) { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/SpanToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/SpanToDiskExporter.java index 9558a2767..1caa39892 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/SpanToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/exporters/SpanToDiskExporter.java @@ -21,7 +21,7 @@ public final class SpanToDiskExporter implements SpanExporter { private final SignalStorageExporter storageExporter; private final ExporterCallback callback; private static final ExporterCallback DEFAULT_CALLBACK = new NoopExporterCallback<>(); - private static final Duration DEFAULT_EXPORT_TIMEOUT = Duration.ofSeconds(10); + private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofSeconds(10); private SpanToDiskExporter( SignalStorageExporter storageExporter, ExporterCallback callback) { @@ -52,7 +52,7 @@ public CompletableResultCode shutdown() { public static final class Builder { private final SignalStorage.Span storage; private ExporterCallback callback = DEFAULT_CALLBACK; - private Duration writeTimeout = DEFAULT_EXPORT_TIMEOUT; + private Duration writeTimeout = DEFAULT_WRITE_TIMEOUT; private Builder(SignalStorage.Span storage) { this.storage = storage; diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/SignalStorageExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/SignalStorageExporter.java index 22ba6b61c..45412b9f3 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/SignalStorageExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/SignalStorageExporter.java @@ -5,16 +5,12 @@ package io.opentelemetry.contrib.disk.buffering.internal.exporters; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - import io.opentelemetry.contrib.disk.buffering.exporters.callback.ExporterCallback; import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; -import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult; import io.opentelemetry.sdk.common.CompletableResultCode; import java.time.Duration; import java.util.Collection; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** Internal utility for common export to disk operations across all exporters. */ @@ -31,23 +27,26 @@ public SignalStorageExporter( } public CompletableResultCode exportToStorage(Collection items) { - CompletableFuture future = storage.write(items); - try { - WriteResult operation = future.get(writeTimeout.toMillis(), MILLISECONDS); - if (operation.isSuccessful()) { - callback.onExportSuccess(items); - return CompletableResultCode.ofSuccess(); - } + CompletableResultCode result = + storage.write(items).join(writeTimeout.toMillis(), TimeUnit.MILLISECONDS); - Throwable error = operation.getError(); + if (!result.isDone()) { + TimeoutException timeout = + new TimeoutException("Storage write timed out after " + writeTimeout.toMillis() + "ms"); + callback.onExportError(items, timeout); + return CompletableResultCode.ofExceptionalFailure(timeout); + } + + if (!result.isSuccess()) { + Throwable error = result.getFailureThrowable(); callback.onExportError(items, error); if (error != null) { return CompletableResultCode.ofExceptionalFailure(error); } return CompletableResultCode.ofFailure(); - } catch (ExecutionException | InterruptedException | TimeoutException e) { - callback.onExportError(items, e); - return CompletableResultCode.ofExceptionalFailure(e); } + + callback.onExportSuccess(items); + return CompletableResultCode.ofSuccess(); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FileSignalStorage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FileSignalStorage.java index 2ec344e38..7c0417f1e 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FileSignalStorage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FileSignalStorage.java @@ -8,11 +8,10 @@ import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; -import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult; +import io.opentelemetry.sdk.common.CompletableResultCode; import java.io.IOException; import java.util.Collection; import java.util.Iterator; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -46,34 +45,34 @@ public FileSignalStorage( } @Override - public CompletableFuture write(Collection items) { + public CompletableResultCode write(Collection items) { logger.finer("Intercepting batch."); try { serializer.initialize(items); if (storage.write(serializer)) { - return CompletableFuture.completedFuture(WriteResult.successful()); + return CompletableResultCode.ofSuccess(); } logger.fine("Could not store batch in disk."); - return CompletableFuture.completedFuture( - WriteResult.error(new Exception("Could not store batch in disk for an unknown reason."))); + return CompletableResultCode.ofExceptionalFailure( + new Exception("Could not store batch in disk for an unknown reason.")); } catch (IOException e) { logger.log( Level.WARNING, "An unexpected error happened while attempting to write the data in disk.", e); - return CompletableFuture.completedFuture(WriteResult.error(e)); + return CompletableResultCode.ofExceptionalFailure(e); } finally { serializer.reset(); } } @Override - public CompletableFuture clear() { + public CompletableResultCode clear() { try { storage.clear(); - return CompletableFuture.completedFuture(WriteResult.successful()); + return CompletableResultCode.ofSuccess(); } catch (IOException e) { - return CompletableFuture.completedFuture(WriteResult.error(e)); + return CompletableResultCode.ofExceptionalFailure(e); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/SignalStorage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/SignalStorage.java index e8daf391a..fb5d5ea78 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/SignalStorage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/SignalStorage.java @@ -5,13 +5,12 @@ package io.opentelemetry.contrib.disk.buffering.storage; -import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.trace.data.SpanData; import java.io.Closeable; import java.util.Collection; -import java.util.concurrent.CompletableFuture; /** * Allows writing and iterating over written signal items. @@ -24,16 +23,16 @@ public interface SignalStorage extends Iterable>, Closeable { * Stores signal items. * * @param items The items to be stored. - * @return A future with {@link WriteResult}. + * @return A {@link CompletableResultCode} representing the outcome of the write operation. */ - CompletableFuture write(Collection items); + CompletableResultCode write(Collection items); /** * Removes all the previously stored items. * - * @return A future with {@link WriteResult}. + * @return A {@link CompletableResultCode} representing the outcome of the clear operation. */ - CompletableFuture clear(); + CompletableResultCode clear(); /** * Abstraction for Spans. Implementations should use this instead of {@link SignalStorage} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileLogRecordStorage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileLogRecordStorage.java index 18653136b..895af745e 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileLogRecordStorage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileLogRecordStorage.java @@ -11,14 +11,13 @@ import io.opentelemetry.contrib.disk.buffering.internal.storage.FolderManager; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; -import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult; import io.opentelemetry.sdk.common.Clock; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.data.LogRecordData; import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.Iterator; -import java.util.concurrent.CompletableFuture; import javax.annotation.Nonnull; public final class FileLogRecordStorage implements SignalStorage.LogRecord { @@ -45,12 +44,12 @@ private FileLogRecordStorage(FileSignalStorage fileSignalStorage) } @Override - public CompletableFuture write(Collection items) { + public CompletableResultCode write(Collection items) { return fileSignalStorage.write(items); } @Override - public CompletableFuture clear() { + public CompletableResultCode clear() { return fileSignalStorage.clear(); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileMetricStorage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileMetricStorage.java index 17fc5c3b3..046ed1c67 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileMetricStorage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileMetricStorage.java @@ -11,14 +11,13 @@ import io.opentelemetry.contrib.disk.buffering.internal.storage.FolderManager; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; -import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult; import io.opentelemetry.sdk.common.Clock; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.metrics.data.MetricData; import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.Iterator; -import java.util.concurrent.CompletableFuture; import javax.annotation.Nonnull; public final class FileMetricStorage implements SignalStorage.Metric { @@ -45,12 +44,12 @@ private FileMetricStorage(FileSignalStorage fileSignalStorage) { } @Override - public CompletableFuture write(Collection items) { + public CompletableResultCode write(Collection items) { return fileSignalStorage.write(items); } @Override - public CompletableFuture clear() { + public CompletableResultCode clear() { return fileSignalStorage.clear(); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileSpanStorage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileSpanStorage.java index e45d25d66..9679bfdae 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileSpanStorage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileSpanStorage.java @@ -11,14 +11,13 @@ import io.opentelemetry.contrib.disk.buffering.internal.storage.FolderManager; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; -import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult; import io.opentelemetry.sdk.common.Clock; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.Iterator; -import java.util.concurrent.CompletableFuture; import javax.annotation.Nonnull; public final class FileSpanStorage implements SignalStorage.Span { @@ -45,12 +44,12 @@ private FileSpanStorage(FileSignalStorage fileSignalStorage) { } @Override - public CompletableFuture write(Collection items) { + public CompletableResultCode write(Collection items) { return fileSignalStorage.write(items); } @Override - public CompletableFuture clear() { + public CompletableResultCode clear() { return fileSignalStorage.clear(); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/result/DefaultWriteResult.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/result/DefaultWriteResult.java deleted file mode 100644 index ff693c74b..000000000 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/result/DefaultWriteResult.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.contrib.disk.buffering.storage.result; - -import javax.annotation.Nullable; - -final class DefaultWriteResult implements WriteResult { - private final boolean successful; - @Nullable private final Throwable error; - - DefaultWriteResult(boolean successful, @Nullable Throwable error) { - this.successful = successful; - this.error = error; - } - - @Override - public boolean isSuccessful() { - return successful; - } - - @Nullable - @Override - public Throwable getError() { - return error; - } -} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/result/WriteResult.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/result/WriteResult.java deleted file mode 100644 index 85c08ed37..000000000 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/result/WriteResult.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.contrib.disk.buffering.storage.result; - -import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; -import javax.annotation.Nullable; - -/** The result of a {@link SignalStorage} write operation. */ -public interface WriteResult { - /** - * Whether the operation succeeded or not. - * - * @return `true` if the items have been successfully stored, `false` otherwise. - */ - boolean isSuccessful(); - - /** - * Provides details of why the operation failed. - * - * @return The error (if any) for the failed operation. It must be null for successful operations. - */ - @Nullable - Throwable getError(); - - static WriteResult successful() { - return new DefaultWriteResult(/* successful= */ true, null); - } - - static WriteResult error(@Nullable Throwable t) { - return new DefaultWriteResult(/* successful= */ false, t); - } -} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/SignalStorageExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/SignalStorageExporterTest.java index 30bfe310a..acfa6b61f 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/SignalStorageExporterTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/SignalStorageExporterTest.java @@ -15,7 +15,6 @@ import io.opentelemetry.contrib.disk.buffering.exporters.callback.ExporterCallback; import io.opentelemetry.contrib.disk.buffering.storage.SignalStorage; -import io.opentelemetry.contrib.disk.buffering.storage.result.WriteResult; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import java.time.Duration; @@ -25,7 +24,6 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.concurrent.CompletableFuture; import javax.annotation.Nonnull; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -78,8 +76,7 @@ void verifyExportToStorage_failure() { SpanData item1 = mock(); // Without exception - when(storage.write(anyCollection())) - .thenReturn(CompletableFuture.completedFuture(WriteResult.error(null))); + when(storage.write(anyCollection())).thenReturn(CompletableResultCode.ofFailure()); List items = Collections.singletonList(item1); CompletableResultCode resultCode = storageExporter.exportToStorage(items); @@ -93,7 +90,7 @@ void verifyExportToStorage_failure() { clearInvocations(callback); Exception exception = new Exception(); when(storage.write(anyCollection())) - .thenReturn(CompletableFuture.completedFuture(WriteResult.error(exception))); + .thenReturn(CompletableResultCode.ofExceptionalFailure(exception)); resultCode = storageExporter.exportToStorage(items); @@ -107,15 +104,15 @@ private static class TestSpanStorage implements SignalStorage.Span { private final List> storedItems = new ArrayList<>(); @Override - public CompletableFuture write(Collection items) { + public CompletableResultCode write(Collection items) { storedItems.add(items); - return getSuccessfulFuture(); + return CompletableResultCode.ofSuccess(); } @Override - public CompletableFuture clear() { + public CompletableResultCode clear() { storedItems.clear(); - return getSuccessfulFuture(); + return CompletableResultCode.ofSuccess(); } @Override @@ -126,10 +123,5 @@ public void close() {} public Iterator> iterator() { return storedItems.iterator(); } - - @Nonnull - private static CompletableFuture getSuccessfulFuture() { - return CompletableFuture.completedFuture(WriteResult.successful()); - } } }