diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java index 7b37ee361..c26a383d6 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java @@ -5,11 +5,10 @@ package io.opentelemetry.contrib.disk.buffering; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; -import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.logs.export.LogRecordExporter; import java.io.IOException; @@ -19,15 +18,12 @@ public class LogRecordFromDiskExporter implements FromDiskExporter { private final FromDiskExporterImpl delegate; - public static LogRecordFromDiskExporter create( - LogRecordExporter exporter, StorageConfiguration config) throws IOException { + public static LogRecordFromDiskExporter create(LogRecordExporter exporter, Storage storage) + throws IOException { FromDiskExporterImpl delegate = - FromDiskExporterImpl.builder() - .setFolderName(SignalTypes.logs.name()) - .setStorageConfiguration(config) + FromDiskExporterImpl.builder(storage) .setDeserializer(SignalDeserializer.ofLogs()) .setExportFunction(exporter::export) - .setDebugEnabled(config.isDebugEnabled()) .build(); return new LogRecordFromDiskExporter(delegate); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java index 7570aed8e..665e90f76 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java @@ -5,10 +5,9 @@ package io.opentelemetry.contrib.disk.buffering; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; -import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.logs.export.LogRecordExporter; @@ -26,16 +25,12 @@ public class LogRecordToDiskExporter implements LogRecordExporter { * Creates a new LogRecordToDiskExporter that will buffer LogRecordData telemetry on disk storage. * * @param delegate - The LogRecordExporter to delegate to if disk writing fails. - * @param config - The StorageConfiguration that specifies how storage is managed. + * @param storage - The Storage instance that specifies how storage is managed. * @return A new LogRecordToDiskExporter instance. - * @throws IOException if the delegate ToDiskExporter could not be created. */ - public static LogRecordToDiskExporter create( - LogRecordExporter delegate, StorageConfiguration config) throws IOException { + public static LogRecordToDiskExporter create(LogRecordExporter delegate, Storage storage) { ToDiskExporter toDisk = - ToDiskExporter.builder() - .setFolderName(SignalTypes.logs.name()) - .setStorageConfiguration(config) + ToDiskExporter.builder(storage) .setSerializer(SignalSerializer.ofLogs()) .setExportFunction(delegate::export) .build(); diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java index bf652f8f8..8bb4f3dcd 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java @@ -5,11 +5,10 @@ package io.opentelemetry.contrib.disk.buffering; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; -import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricExporter; import java.io.IOException; @@ -19,15 +18,12 @@ public class MetricFromDiskExporter implements FromDiskExporter { private final FromDiskExporterImpl delegate; - public static MetricFromDiskExporter create(MetricExporter exporter, StorageConfiguration config) + public static MetricFromDiskExporter create(MetricExporter exporter, Storage storage) throws IOException { FromDiskExporterImpl delegate = - FromDiskExporterImpl.builder() - .setFolderName(SignalTypes.metrics.name()) - .setStorageConfiguration(config) + FromDiskExporterImpl.builder(storage) .setDeserializer(SignalDeserializer.ofMetrics()) .setExportFunction(exporter::export) - .setDebugEnabled(config.isDebugEnabled()) .build(); return new MetricFromDiskExporter(delegate); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java index bf2e7066f..83d2fc73c 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java @@ -5,10 +5,9 @@ package io.opentelemetry.contrib.disk.buffering; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; -import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; @@ -31,16 +30,12 @@ public class MetricToDiskExporter implements MetricExporter { * Creates a new MetricToDiskExporter that will buffer Metric telemetry on disk storage. * * @param delegate - The MetricExporter to delegate to if disk writing fails. - * @param config - The StorageConfiguration that specifies how storage is managed. + * @param storage - The Storage instance that specifies how storage is managed. * @return A new MetricToDiskExporter instance. - * @throws IOException if the delegate ToDiskExporter could not be created. */ - public static MetricToDiskExporter create(MetricExporter delegate, StorageConfiguration config) - throws IOException { + public static MetricToDiskExporter create(MetricExporter delegate, Storage storage) { ToDiskExporter toDisk = - ToDiskExporter.builder() - .setFolderName(SignalTypes.metrics.name()) - .setStorageConfiguration(config) + ToDiskExporter.builder(storage) .setSerializer(SignalSerializer.ofMetrics()) .setExportFunction(delegate::export) .build(); diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java index c23ac043e..e3c7992ba 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java @@ -5,11 +5,10 @@ package io.opentelemetry.contrib.disk.buffering; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; -import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; import java.io.IOException; @@ -19,15 +18,12 @@ public class SpanFromDiskExporter implements FromDiskExporter { private final FromDiskExporterImpl delegate; - public static SpanFromDiskExporter create(SpanExporter exporter, StorageConfiguration config) + public static SpanFromDiskExporter create(SpanExporter exporter, Storage storage) throws IOException { FromDiskExporterImpl delegate = - FromDiskExporterImpl.builder() - .setFolderName(SignalTypes.spans.name()) - .setStorageConfiguration(config) + FromDiskExporterImpl.builder(storage) .setDeserializer(SignalDeserializer.ofSpans()) .setExportFunction(exporter::export) - .setDebugEnabled(config.isDebugEnabled()) .build(); return new SpanFromDiskExporter(delegate); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java index d64a4cd71..d5ca81518 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java @@ -5,10 +5,9 @@ package io.opentelemetry.contrib.disk.buffering; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; -import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; @@ -27,16 +26,12 @@ public class SpanToDiskExporter implements SpanExporter { * Creates a new SpanToDiskExporter that will buffer Span telemetry on disk storage. * * @param delegate - The SpanExporter to delegate to if disk writing fails. - * @param config - The StorageConfiguration that specifies how storage is managed. + * @param storage - The Storage instance that specifies how storage is managed. * @return A new SpanToDiskExporter instance. - * @throws IOException if the delegate ToDiskExporter could not be created. */ - public static SpanToDiskExporter create(SpanExporter delegate, StorageConfiguration config) - throws IOException { + public static SpanToDiskExporter create(SpanExporter delegate, Storage storage) { ToDiskExporter toDisk = - ToDiskExporter.builder() - .setFolderName(SignalTypes.spans.name()) - .setStorageConfiguration(config) + ToDiskExporter.builder(storage) .setSerializer(SignalSerializer.ofSpans()) .setExportFunction(delegate::export) .build(); diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java index eec298469..a91ded1f3 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java @@ -8,11 +8,8 @@ import static java.util.Collections.emptyList; import com.google.errorprone.annotations.CanIgnoreReturnValue; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; -import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder; -import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; import java.io.IOException; import java.util.Collection; @@ -22,36 +19,23 @@ public class FromDiskExporterBuilder { private SignalDeserializer serializer = noopDeserializer(); + private final Storage storage; + private Function, CompletableResultCode> exportFunction = x -> CompletableResultCode.ofFailure(); - private boolean debugEnabled = false; + public FromDiskExporterBuilder(Storage storage) { + if (storage == null) { + throw new NullPointerException("Storage cannot be null"); + } + this.storage = storage; + } @NotNull private static SignalDeserializer noopDeserializer() { return x -> emptyList(); } - private final StorageBuilder storageBuilder = Storage.builder(); - - @CanIgnoreReturnValue - public FromDiskExporterBuilder setFolderName(String folderName) { - storageBuilder.setFolderName(folderName); - return this; - } - - @CanIgnoreReturnValue - public FromDiskExporterBuilder setStorageConfiguration(StorageConfiguration configuration) { - storageBuilder.setStorageConfiguration(configuration); - return this; - } - - @CanIgnoreReturnValue - public FromDiskExporterBuilder setStorageClock(Clock clock) { - storageBuilder.setStorageClock(clock); - return this; - } - @CanIgnoreReturnValue public FromDiskExporterBuilder setDeserializer(SignalDeserializer serializer) { this.serializer = serializer; @@ -65,19 +49,7 @@ public FromDiskExporterBuilder setExportFunction( return this; } - @CanIgnoreReturnValue - public FromDiskExporterBuilder enableDebug() { - return setDebugEnabled(true); - } - - @CanIgnoreReturnValue - public FromDiskExporterBuilder setDebugEnabled(boolean debugEnabled) { - this.debugEnabled = debugEnabled; - return this; - } - public FromDiskExporterImpl build() throws IOException { - Storage storage = storageBuilder.build(); - return new FromDiskExporterImpl<>(serializer, exportFunction, storage, debugEnabled); + return new FromDiskExporterImpl<>(serializer, exportFunction, storage); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java index 19ef6fe2c..5ba5c2390 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java @@ -32,17 +32,17 @@ public final class FromDiskExporterImpl implements FromDiskExporter FromDiskExporterImpl( SignalDeserializer deserializer, Function, CompletableResultCode> exportFunction, - Storage storage, - boolean debugEnabled) { + Storage storage) { this.deserializer = deserializer; this.exportFunction = exportFunction; this.storage = storage; this.logger = - DebugLogger.wrap(Logger.getLogger(FromDiskExporterImpl.class.getName()), debugEnabled); + DebugLogger.wrap( + Logger.getLogger(FromDiskExporterImpl.class.getName()), storage.isDebugEnabled()); } - public static FromDiskExporterBuilder builder() { - return new FromDiskExporterBuilder<>(); + public static FromDiskExporterBuilder builder(Storage storage) { + return new FromDiskExporterBuilder<>(storage); } /** diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java index 1a43cb5eb..b54e3cc16 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java @@ -25,16 +25,17 @@ public class ToDiskExporter { ToDiskExporter( SignalSerializer serializer, Function, CompletableResultCode> exportFunction, - Storage storage, - boolean debugEnabled) { + Storage storage) { this.serializer = serializer; this.exportFunction = exportFunction; this.storage = storage; - this.logger = DebugLogger.wrap(Logger.getLogger(ToDiskExporter.class.getName()), debugEnabled); + this.logger = + DebugLogger.wrap( + Logger.getLogger(ToDiskExporter.class.getName()), storage.isDebugEnabled()); } - public static ToDiskExporterBuilder builder() { - return new ToDiskExporterBuilder<>(); + public static ToDiskExporterBuilder builder(Storage storage) { + return new ToDiskExporterBuilder<>(storage); } public CompletableResultCode export(Collection data) { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java index 3ac7d2503..069e08986 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java @@ -6,13 +6,9 @@ package io.opentelemetry.contrib.disk.buffering.internal.exporter; import com.google.errorprone.annotations.CanIgnoreReturnValue; -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; -import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder; -import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; -import java.io.IOException; import java.util.Collection; import java.util.function.Function; @@ -20,42 +16,16 @@ public final class ToDiskExporterBuilder { private SignalSerializer serializer = ts -> new byte[0]; - private final StorageBuilder storageBuilder = Storage.builder(); + private final Storage storage; private Function, CompletableResultCode> exportFunction = x -> CompletableResultCode.ofFailure(); - private boolean debugEnabled = false; - ToDiskExporterBuilder() {} - - @CanIgnoreReturnValue - public ToDiskExporterBuilder enableDebug() { - return setDebugEnabled(true); - } - - @CanIgnoreReturnValue - public ToDiskExporterBuilder setDebugEnabled(boolean debugEnabled) { - this.debugEnabled = debugEnabled; - return this; - } - - @CanIgnoreReturnValue - public ToDiskExporterBuilder setFolderName(String folderName) { - storageBuilder.setFolderName(folderName); - return this; - } - - @CanIgnoreReturnValue - public ToDiskExporterBuilder setStorageConfiguration(StorageConfiguration configuration) { - validateConfiguration(configuration); - storageBuilder.setStorageConfiguration(configuration); - return this; - } - - @CanIgnoreReturnValue - public ToDiskExporterBuilder setStorageClock(Clock clock) { - storageBuilder.setStorageClock(clock); - return this; + ToDiskExporterBuilder(Storage storage) { + if (storage == null) { + throw new NullPointerException("Storage cannot be null"); + } + this.storage = storage; } @CanIgnoreReturnValue @@ -71,15 +41,7 @@ public ToDiskExporterBuilder setExportFunction( return this; } - public ToDiskExporter build() throws IOException { - Storage storage = storageBuilder.build(); - return new ToDiskExporter<>(serializer, exportFunction, storage, debugEnabled); - } - - private static void validateConfiguration(StorageConfiguration configuration) { - if (configuration.getMinFileAgeForReadMillis() <= configuration.getMaxFileAgeForWriteMillis()) { - throw new IllegalArgumentException( - "The configured max file age for writing must be lower than the configured min file age for reading"); - } + public ToDiskExporter build() { + return new ToDiskExporter<>(serializer, exportFunction, storage); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/models/LogRecordDataImpl.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/models/LogRecordDataImpl.java index 9ff0f9410..51322b41e 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/models/LogRecordDataImpl.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/models/LogRecordDataImpl.java @@ -6,7 +6,6 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.models; import com.google.auto.value.AutoValue; -import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.Value; import io.opentelemetry.api.logs.Severity; @@ -51,17 +50,6 @@ public abstract static class Builder { public abstract Builder setSeverityText(String value); - @Deprecated - @CanIgnoreReturnValue - public Builder setBody(io.opentelemetry.sdk.logs.data.Body body) { - if (body.getType() == io.opentelemetry.sdk.logs.data.Body.Type.STRING) { - setBodyValue(Value.of(body.asString())); - } else if (body.getType() == io.opentelemetry.sdk.logs.data.Body.Type.EMPTY) { - setBodyValue(null); - } - return this; - } - public abstract Builder setBodyValue(@Nullable Value value); public abstract Builder setAttributes(Attributes value); diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java index 4ff60cbdc..73a263490 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java @@ -14,6 +14,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -24,8 +25,8 @@ public final class Storage implements Closeable { private static final int MAX_ATTEMPTS = 3; private final DebugLogger logger; - private final FolderManager folderManager; + private final boolean debugEnabled; private final AtomicBoolean isClosed = new AtomicBoolean(false); @Nullable private WritableFile writableFile; @Nullable private ReadableFile readableFile; @@ -34,10 +35,15 @@ public Storage(FolderManager folderManager, boolean debugEnabled) { this.folderManager = folderManager; this.logger = DebugLogger.wrap(Logger.getLogger(FromDiskExporterImpl.class.getName()), debugEnabled); + this.debugEnabled = debugEnabled; + } + + public static StorageBuilder builder(SignalTypes types) { + return new StorageBuilder(types); } - public static StorageBuilder builder() { - return new StorageBuilder(); + public boolean isDebugEnabled() { + return debugEnabled; } /** @@ -72,6 +78,14 @@ private boolean write(byte[] item, int attemptNumber) throws IOException { return true; } + public void flush() throws IOException { + if (writableFile != null) { + writableFile.flush(); + } else { + logger.log("No writable file to flush."); + } + } + /** * Attempts to read an item from a ready-to-read file. * diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java index c8b0435ca..d43bc18b2 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java @@ -7,6 +7,7 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.common.Clock; import java.io.File; import java.io.IOException; @@ -17,20 +18,17 @@ public class StorageBuilder { private static final Logger logger = Logger.getLogger(StorageBuilder.class.getName()); - private String folderName = "data"; + private final String folderName; private StorageConfiguration configuration = StorageConfiguration.getDefault(new File(".")); private Clock clock = Clock.getDefault(); - StorageBuilder() {} - - @CanIgnoreReturnValue - public StorageBuilder setFolderName(String folderName) { - this.folderName = folderName; - return this; + StorageBuilder(SignalTypes types) { + folderName = types.name(); } @CanIgnoreReturnValue public StorageBuilder setStorageConfiguration(StorageConfiguration configuration) { + validateConfiguration(configuration); this.configuration = configuration; return this; } @@ -57,4 +55,11 @@ private static File ensureSubdir(File rootDir, String child) throws IOException } throw new IOException("Could not create the subdir: '" + child + "' inside: " + rootDir); } + + private static void validateConfiguration(StorageConfiguration configuration) { + if (configuration.getMinFileAgeForReadMillis() <= configuration.getMaxFileAgeForWriteMillis()) { + throw new IllegalArgumentException( + "The configured max file age for writing must be lower than the configured min file age for reading"); + } + } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java index 519e9da66..6bf082ca5 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFile.java @@ -94,4 +94,8 @@ public synchronized void close() throws IOException { public String toString() { return "WritableFile{" + "file=" + file + '}'; } + + public void flush() throws IOException { + out.flush(); + } } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java index b2955630e..65c81b842 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java @@ -18,6 +18,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; @@ -40,7 +41,7 @@ class FromDiskExporterImplTest { private FromDiskExporterImpl exporter; private final List deserializedData = Collections.emptyList(); @TempDir File rootDir; - private static final String STORAGE_FOLDER_NAME = "testName"; + private static final String STORAGE_FOLDER_NAME = SignalTypes.spans.name(); @BeforeEach void setUp() throws IOException { @@ -49,12 +50,10 @@ void setUp() throws IOException { setUpSerializer(); wrapped = mock(); exporter = - FromDiskExporterImpl.builder() - .setFolderName(STORAGE_FOLDER_NAME) - .setStorageConfiguration(TestData.getDefaultConfiguration(rootDir)) + FromDiskExporterImpl.builder( + TestData.getDefaultStorage(rootDir, SignalTypes.spans, clock)) .setDeserializer(deserializer) .setExportFunction(wrapped::export) - .setStorageClock(clock) .build(); } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java index c1c42b0bb..b2da05c24 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java @@ -22,6 +22,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; @@ -53,25 +54,28 @@ public class IntegrationTest { private InMemorySpanExporter memorySpanExporter; - private SpanToDiskExporter spanToDiskExporter; private Tracer tracer; private InMemoryMetricExporter memoryMetricExporter; - private MetricToDiskExporter metricToDiskExporter; private SdkMeterProvider meterProvider; private Meter meter; private InMemoryLogRecordExporter memoryLogRecordExporter; - private LogRecordToDiskExporter logToDiskExporter; private Logger logger; private Clock clock; @TempDir File rootDir; private static final long INITIAL_TIME_IN_MILLIS = 1000; private static final long NOW_NANOS = MILLISECONDS.toNanos(INITIAL_TIME_IN_MILLIS); private StorageConfiguration storageConfig; + private Storage spanStorage; @BeforeEach void setUp() throws IOException { - storageConfig = StorageConfiguration.getDefault(rootDir); clock = mock(); + storageConfig = StorageConfiguration.getDefault(rootDir); + spanStorage = + Storage.builder(SignalTypes.spans) + .setStorageConfiguration(storageConfig) + .setStorageClock(clock) + .build(); when(clock.now()).thenReturn(NOW_NANOS); @@ -79,14 +83,15 @@ void setUp() throws IOException { memorySpanExporter = InMemorySpanExporter.create(); ToDiskExporter toDiskSpanExporter = buildToDiskExporter(SignalSerializer.ofSpans(), memorySpanExporter::export); - spanToDiskExporter = new SpanToDiskExporter(toDiskSpanExporter); + SpanToDiskExporter spanToDiskExporter = new SpanToDiskExporter(toDiskSpanExporter); tracer = createTracerProvider(spanToDiskExporter).get("SpanInstrumentationScope"); // Setting up metrics memoryMetricExporter = InMemoryMetricExporter.create(); ToDiskExporter toDiskMetricExporter = buildToDiskExporter(SignalSerializer.ofMetrics(), memoryMetricExporter::export); - metricToDiskExporter = new MetricToDiskExporter(toDiskMetricExporter, memoryMetricExporter); + MetricToDiskExporter metricToDiskExporter = + new MetricToDiskExporter(toDiskMetricExporter, memoryMetricExporter); meterProvider = createMeterProvider(metricToDiskExporter); meter = meterProvider.get("MetricInstrumentationScope"); @@ -94,36 +99,26 @@ void setUp() throws IOException { memoryLogRecordExporter = InMemoryLogRecordExporter.create(); ToDiskExporter toDiskLogExporter = buildToDiskExporter(SignalSerializer.ofLogs(), memoryLogRecordExporter::export); - logToDiskExporter = new LogRecordToDiskExporter(toDiskLogExporter); + LogRecordToDiskExporter logToDiskExporter = new LogRecordToDiskExporter(toDiskLogExporter); logger = createLoggerProvider(logToDiskExporter).get("LogInstrumentationScope"); } @NotNull private ToDiskExporter buildToDiskExporter( - SignalSerializer serializer, Function, CompletableResultCode> exporter) - throws IOException { - return ToDiskExporter.builder() - .setFolderName(SignalTypes.spans.name()) - .setStorageConfiguration(storageConfig) + SignalSerializer serializer, Function, CompletableResultCode> exporter) { + return ToDiskExporter.builder(spanStorage) .setSerializer(serializer) .setExportFunction(exporter) - .setStorageClock(clock) .build(); } @NotNull - private FromDiskExporterImpl buildFromDiskExporter( + private static FromDiskExporterImpl buildFromDiskExporter( FromDiskExporterBuilder builder, Function, CompletableResultCode> exportFunction, SignalDeserializer deserializer) throws IOException { - return builder - .setExportFunction(exportFunction) - .setFolderName(SignalTypes.spans.name()) - .setStorageConfiguration(storageConfig) - .setDeserializer(deserializer) - .setStorageClock(clock) - .build(); + return builder.setExportFunction(exportFunction).setDeserializer(deserializer).build(); } @Test @@ -132,7 +127,7 @@ void verifySpansIntegration() throws IOException { span.end(); FromDiskExporterImpl fromDiskExporter = buildFromDiskExporter( - FromDiskExporterImpl.builder(), + FromDiskExporterImpl.builder(spanStorage), memorySpanExporter::export, SignalDeserializer.ofSpans()); assertExporter(fromDiskExporter, () -> memorySpanExporter.getFinishedSpanItems().size()); @@ -145,7 +140,7 @@ void verifyMetricsIntegration() throws IOException { FromDiskExporterImpl fromDiskExporter = buildFromDiskExporter( - FromDiskExporterImpl.builder(), + FromDiskExporterImpl.builder(spanStorage), memoryMetricExporter::export, SignalDeserializer.ofMetrics()); assertExporter(fromDiskExporter, () -> memoryMetricExporter.getFinishedMetricItems().size()); @@ -157,7 +152,7 @@ void verifyLogRecordsIntegration() throws IOException { FromDiskExporterImpl fromDiskExporter = buildFromDiskExporter( - FromDiskExporterImpl.builder(), + FromDiskExporterImpl.builder(spanStorage), memoryLogRecordExporter::export, SignalDeserializer.ofLogs()); assertExporter( diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java index 05eef5a03..ae503ecdf 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java @@ -6,7 +6,6 @@ package io.opentelemetry.contrib.disk.buffering; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -20,12 +19,12 @@ import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; -import io.opentelemetry.contrib.disk.buffering.internal.files.DefaultTemporaryFileProvider; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.models.SpanDataImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; import io.opentelemetry.contrib.disk.buffering.testutils.TestData; +import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.data.SpanData; @@ -48,22 +47,24 @@ class SpanFromDiskExporterTest { @SuppressWarnings("unchecked") @Test void fromDisk() throws Exception { - StorageConfiguration config = - StorageConfiguration.builder() - .setRootDir(tempDir) - .setMaxFileAgeForWriteMillis(TimeUnit.HOURS.toMillis(24)) - .setMinFileAgeForReadMillis(0) - .setMaxFileAgeForReadMillis(TimeUnit.HOURS.toMillis(24)) - .setTemporaryFileProvider(DefaultTemporaryFileProvider.getInstance()) + Clock clock = mock(Clock.class); + long start = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()); + when(clock.now()).thenReturn(start); + Storage storage = + Storage.builder(SignalTypes.spans) + .setStorageConfiguration(StorageConfiguration.builder().setRootDir(tempDir).build()) + .setStorageClock(clock) .build(); - List spans = writeSomeSpans(config); + List spans = writeSomeSpans(storage); + + when(clock.now()).thenReturn(start + TimeUnit.SECONDS.toNanos(60)); SpanExporter exporter = mock(); ArgumentCaptor> capture = ArgumentCaptor.forClass(Collection.class); when(exporter.export(capture.capture())).thenReturn(CompletableResultCode.ofSuccess()); - SpanFromDiskExporter testClass = SpanFromDiskExporter.create(exporter, config); + SpanFromDiskExporter testClass = SpanFromDiskExporter.create(exporter, storage); boolean result = testClass.exportStoredBatch(30, TimeUnit.SECONDS); assertThat(result).isTrue(); List exportedSpans = (List) capture.getValue(); @@ -79,23 +80,14 @@ void fromDisk() throws Exception { verify(exporter).export(eq(Arrays.asList(expected1, expected2))); } - private static List writeSomeSpans(StorageConfiguration config) throws Exception { + private static List writeSomeSpans(Storage storage) throws Exception { long now = System.currentTimeMillis() * 1_000_000; SpanData span1 = makeSpan1(TraceFlags.getDefault(), now); SpanData span2 = makeSpan2(TraceFlags.getSampled(), now); List spans = Arrays.asList(span1, span2); - SignalSerializer serializer = SignalSerializer.ofSpans(); - File subdir = new File(config.getRootDir(), SignalTypes.spans.name()); - assertTrue(subdir.mkdir()); - - Storage storage = - Storage.builder() - .setStorageConfiguration(config) - .setFolderName(SignalTypes.spans.name()) - .build(); - storage.write(serializer.serialize(spans)); - storage.close(); + storage.write(SignalSerializer.ofSpans().serialize(spans)); + storage.flush(); return spans; } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilderTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilderTest.java deleted file mode 100644 index 288388e03..000000000 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilderTest.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.contrib.disk.buffering.internal.exporter; - -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; -import io.opentelemetry.sdk.trace.data.SpanData; -import java.io.File; -import org.junit.jupiter.api.Test; - -class ToDiskExporterBuilderTest { - - @Test - void whenMinFileReadIsNotGraterThanMaxFileWrite_throwException() { - StorageConfiguration invalidConfig = - StorageConfiguration.builder() - .setMaxFileAgeForWriteMillis(2) - .setMinFileAgeForReadMillis(1) - .setRootDir(new File(".")) - .build(); - - assertThatThrownBy( - () -> ToDiskExporter.builder().setStorageConfiguration(invalidConfig)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage( - "The configured max file age for writing must be lower than the configured min file age for reading"); - } -} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java index 865aa6298..f7b6e3ff6 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java @@ -49,7 +49,7 @@ void setup() { exportedFnSeen = x; return exportFnResultToReturn.get(); }; - toDiskExporter = new ToDiskExporter<>(serializer, exportFn, storage, true); + toDiskExporter = new ToDiskExporter<>(serializer, exportFn, storage); when(serializer.serialize(records)).thenReturn(serialized); } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java index d59c3464f..9accaefff 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java @@ -6,6 +6,7 @@ package io.opentelemetry.contrib.disk.buffering.internal.storage; import static io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult.TRY_LATER; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.ArgumentMatchers.any; @@ -15,11 +16,14 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import java.io.File; import java.io.IOException; import java.util.function.Function; import org.junit.jupiter.api.BeforeEach; @@ -222,6 +226,22 @@ void whenClosing_closeWriterAndReaderIfNotNull() throws IOException { verify(readableFile).close(); } + @Test + void whenMinFileReadIsNotGraterThanMaxFileWrite_throwException() { + StorageConfiguration invalidConfig = + StorageConfiguration.builder() + .setMaxFileAgeForWriteMillis(2) + .setMinFileAgeForReadMillis(1) + .setRootDir(new File(".")) + .build(); + + assertThatThrownBy( + () -> Storage.builder(SignalTypes.logs).setStorageConfiguration(invalidConfig)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "The configured max file age for writing must be lower than the configured min file age for reading"); + } + private static WritableFile createWritableFile() throws IOException { WritableFile mock = mock(); when(mock.append(any())).thenReturn(WritableResult.SUCCEEDED); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java index a9a2003ae..8e66dde04 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java @@ -8,7 +8,10 @@ import io.opentelemetry.contrib.disk.buffering.config.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.config.TemporaryFileProvider; import io.opentelemetry.contrib.disk.buffering.internal.files.DefaultTemporaryFileProvider; +import io.opentelemetry.contrib.disk.buffering.internal.utils.SignalTypes; +import io.opentelemetry.sdk.common.Clock; import java.io.File; +import java.io.IOException; public final class TestData { @@ -23,6 +26,15 @@ public static StorageConfiguration getDefaultConfiguration(File rootDir) { return getConfiguration(fileProvider, rootDir); } + public static Storage getDefaultStorage(File rootDir, SignalTypes types, Clock clock) + throws IOException { + TemporaryFileProvider fileProvider = DefaultTemporaryFileProvider.getInstance(); + return Storage.builder(types) + .setStorageConfiguration(getConfiguration(fileProvider, rootDir)) + .setStorageClock(clock) + .build(); + } + public static StorageConfiguration getConfiguration( TemporaryFileProvider fileProvider, File rootDir) { return StorageConfiguration.builder()