diff --git a/disk-buffering/README.md b/disk-buffering/README.md index 6c78769a61..bfad6fb862 100644 --- a/disk-buffering/README.md +++ b/disk-buffering/README.md @@ -36,6 +36,8 @@ The available configuration parameters are the following: * Max age for file reading. After that time passes, the file will be considered stale and will be removed when new files are created. No more data will be read from a file past this time. Defaults to 18 hours. +* Delete items on iteration. Controls whether items are automatically removed from disk as the + iterator advances. Defaults to `true`. See [Deleting data](#deleting-data) for more details. ```java // Root dir @@ -105,7 +107,8 @@ Now when creating signals using your `OpenTelemetry` instance, those will get st ### Reading data In order to read data, we can iterate through our signal storage objects and then forward them to -a network exporter, as shown in the example for spans below. +a network exporter. By default, items are automatically deleted from disk as the iterator advances, +so a simple iteration is all that's needed: ```java /** @@ -129,11 +132,52 @@ public boolean exportSpansFromDisk(SpanExporter networkExporter, long timeout) { } ``` -The `File*Storage` iterators delete the previously returned collection when `next()` is called, -assuming that if the next collection is requested is because the previous one was successfully -consumed. +### Deleting data -Both the writing and reading processes can run in parallel and they don't overlap +By default, items are automatically deleted from disk as the iterator advances. You can also +clear all data at once by calling `SignalStorage.clear()`. + +#### Automatic vs explicit deletion + +The default behavior (`deleteItemsOnIteration = true`) automatically removes items from disk during +iteration. This means you don't need to call `Iterator.remove()` since the data is cleaned up as the +iterator advances. + +If you need more control (e.g., only deleting items after a successful network export), set +`deleteItemsOnIteration` to `false` in the configuration: + +```java +FileStorageConfiguration config = FileStorageConfiguration.builder() + .setDeleteItemsOnIteration(false) + .build(); +SignalStorage.Span spanStorage = FileSpanStorage.create(new File(rootDir, "spans"), config); +``` + +With this setting, items remain on disk until explicitly removed via `Iterator.remove()`: + +```java +public boolean exportSpansFromDisk(SpanExporter networkExporter, long timeout) { + Iterator> spansIterator = spanStorage.iterator(); + while (spansIterator.hasNext()) { + CompletableResultCode resultCode = networkExporter.export(spansIterator.next()); + resultCode.join(timeout, TimeUnit.MILLISECONDS); + + if (resultCode.isSuccess()) { + spansIterator.remove(); + } else { + return false; + } + } + return true; +} +``` + +Note that even with explicit deletion, disk usage is still bounded by the configured max folder size and max file +age, so stale files are automatically purged when there's not enough space available before new data is written. + +### More details on the writing and reading processes + +Both the writing and reading processes can run in parallel as they won't overlap because each is supposed to happen in different files. We ensure that reader and writer don't accidentally meet in the same file by using the configurable parameters. These parameters set non-overlapping time frames for each action to be done on a single file at a time. On top of that, diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FileSignalStorage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FileSignalStorage.java index 743dee7573..2ec344e386 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FileSignalStorage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FileSignalStorage.java @@ -25,6 +25,7 @@ public final class FileSignalStorage implements SignalStorage { private final Storage storage; private final SignalSerializer serializer; private final SignalDeserializer deserializer; + private final boolean deleteItemsOnIteration; private final Logger logger = Logger.getLogger(FileSignalStorage.class.getName()); private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Object iteratorLock = new Object(); @@ -34,10 +35,14 @@ public final class FileSignalStorage implements SignalStorage { private Iterator> iterator; public FileSignalStorage( - Storage storage, SignalSerializer serializer, SignalDeserializer deserializer) { + Storage storage, + SignalSerializer serializer, + SignalDeserializer deserializer, + boolean deleteItemsOnIteration) { this.storage = storage; this.serializer = serializer; this.deserializer = deserializer; + this.deleteItemsOnIteration = deleteItemsOnIteration; } @Override @@ -84,7 +89,7 @@ public void close() throws IOException { public Iterator> iterator() { synchronized (iteratorLock) { if (iterator == null) { - iterator = new StorageIterator<>(storage, deserializer); + iterator = new StorageIterator<>(storage, deserializer, deleteItemsOnIteration); } return iterator; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManager.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManager.java index d93751fa15..64a982d7d9 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManager.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManager.java @@ -15,8 +15,12 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.function.Predicate; +import java.util.logging.Logger; +import java.util.regex.Pattern; import javax.annotation.Nullable; import org.jetbrains.annotations.NotNull; @@ -24,6 +28,8 @@ public final class FolderManager implements Closeable { private final File folder; private final Clock clock; private final FileStorageConfiguration configuration; + private final Logger logger = Logger.getLogger(FolderManager.class.getName()); + private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); @Nullable private ReadableFile currentReadableFile; @Nullable private WritableFile currentWritableFile; @@ -46,19 +52,33 @@ public FolderManager(File folder, FileStorageConfiguration configuration, Clock this.clock = clock; } + static class CacheFile { + private final File file; + private final long createdTimeMillis; + + CacheFile(File file, long createdTimeMillis) { + this.file = file; + this.createdTimeMillis = createdTimeMillis; + } + + long getCreatedTimeMillis() { + return createdTimeMillis; + } + } + @Override public void close() throws IOException { closeCurrentFiles(); } @Nullable - public synchronized ReadableFile getReadableFile() throws IOException { + public synchronized ReadableFile getReadableFile(Predicate excludeFiles) + throws IOException { currentReadableFile = null; - File readableFile = findReadableFile(); - if (readableFile != null) { + CacheFile selectedFile = selectReadableFile(listCacheFiles(excludeFiles)); + if (selectedFile != null) { currentReadableFile = - new ReadableFile( - readableFile, Long.parseLong(readableFile.getName()), clock, configuration); + new ReadableFile(selectedFile.file, selectedFile.createdTimeMillis, clock, configuration); return currentReadableFile; } return null; @@ -93,29 +113,56 @@ public synchronized void clear() throws IOException { } } + private List listCacheFiles(Predicate exclude) { + File[] existingFiles = folder.listFiles(); + if (existingFiles == null) { + return Collections.emptyList(); + } + ArrayList files = new ArrayList<>(); + for (File file : existingFiles) { + CacheFile cacheFile = fileToCacheFile(file); + if (cacheFile != null && !exclude.test(cacheFile)) { + files.add(cacheFile); + } + } + return Collections.unmodifiableList(files); + } + + @Nullable + private CacheFile fileToCacheFile(File file) { + String fileName = file.getName(); + if (!NUMBER_PATTERN.matcher(fileName).matches()) { + logger.finer(String.format("Invalid cache file name: '%s'", fileName)); + return null; + } + return new CacheFile(file, Long.parseLong(fileName)); + } + @Nullable - private File findReadableFile() throws IOException { + private CacheFile selectReadableFile(List files) throws IOException { + if (files.isEmpty()) { + return null; + } + long currentTime = nowMillis(clock); - File[] existingFiles = folder.listFiles(); - File oldestFileAvailable = null; + CacheFile oldestFileAvailable = null; long oldestFileCreationTimeMillis = 0; - if (existingFiles != null) { - for (File existingFile : existingFiles) { - long existingFileCreationTimeMillis = Long.parseLong(existingFile.getName()); - if (isReadyToBeRead(currentTime, existingFileCreationTimeMillis) - && !hasExpiredForReading(currentTime, existingFileCreationTimeMillis)) { - if (oldestFileAvailable == null - || existingFileCreationTimeMillis < oldestFileCreationTimeMillis) { - oldestFileCreationTimeMillis = existingFileCreationTimeMillis; - oldestFileAvailable = existingFile; - } + for (CacheFile existingFile : files) { + long existingFileCreationTimeMillis = existingFile.createdTimeMillis; + if (isReadyToBeRead(currentTime, existingFileCreationTimeMillis) + && !hasExpiredForReading(currentTime, existingFileCreationTimeMillis)) { + if (oldestFileAvailable == null + || existingFileCreationTimeMillis < oldestFileCreationTimeMillis) { + oldestFileCreationTimeMillis = existingFileCreationTimeMillis; + oldestFileAvailable = existingFile; } } } + // Checking if the oldest available file is currently the writable file. if (oldestFileAvailable != null && currentWritableFile != null - && oldestFileAvailable.equals(currentWritableFile.getFile())) { + && oldestFileAvailable.file.equals(currentWritableFile.getFile())) { currentWritableFile.close(); } return oldestFileAvailable; diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java index 6b2ee05f7b..02ae1ad37f 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java @@ -21,6 +21,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -28,6 +29,7 @@ public final class Storage implements Closeable { private static final int MAX_ATTEMPTS = 3; private final Logger logger = Logger.getLogger(Storage.class.getName()); private final FolderManager folderManager; + private volatile Predicate fileExclusion = file -> false; private final AtomicBoolean isClosed = new AtomicBoolean(false); private final AtomicBoolean activeReadResultAvailable = new AtomicBoolean(false); private final AtomicReference writableFileRef = new AtomicReference<>(); @@ -91,7 +93,11 @@ public ReadableResult readNext(SignalDeserializer deserializer) throws IOE throw new IllegalStateException( "You must close any previous ReadableResult before requesting a new one"); } - return doReadNext(deserializer, 1); + ReadableResult result = doReadNext(deserializer, 1); + if (result == null) { + fileExclusion = file -> false; + } + return result; } @Nullable @@ -106,9 +112,13 @@ private ReadableResult doReadNext(SignalDeserializer deserializer, int att return null; } ReadableFile readableFile = readableFileRef.get(); + if (readableFile != null && readableFile.isClosed()) { + readableFileRef.set(null); + readableFile = null; + } if (readableFile == null) { logger.finer("Obtaining a new readableFile from the folderManager."); - readableFile = folderManager.getReadableFile(); + readableFile = folderManager.getReadableFile(Objects.requireNonNull(fileExclusion)); readableFileRef.set(readableFile); if (readableFile == null) { logger.fine("Unable to get or create readable file."); @@ -117,19 +127,27 @@ private ReadableResult doReadNext(SignalDeserializer deserializer, int att } logger.finer("Attempting to read data from " + readableFile); - byte[] result = readableFile.readNext(); - if (result != null) { - try { - List items = deserializer.deserialize(result); - activeReadResultAvailable.set(true); - return new FileReadResult(items, readableFile); - } catch (DeserializationException e) { - // Data corrupted, clear file. - readableFile.clear(); + long currentFileCreatedTime = readableFile.getCreatedTimeMillis(); + try { + byte[] result = readableFile.readNext(); + if (result != null) { + try { + List items = deserializer.deserialize(result); + activeReadResultAvailable.set(true); + return new FileReadResult(items, readableFile); + } catch (DeserializationException e) { + // Data corrupted, clear file. + readableFile.clear(); + } } + } catch (IOException e) { + // Proto data corrupted, clear file. + readableFile.clear(); } - // Retry with new file + // Search for newer files than the current one. + fileExclusion = file -> file.getCreatedTimeMillis() <= currentFileCreatedTime; + readableFile.close(); readableFileRef.set(null); return doReadNext(deserializer, ++attemptNumber); } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageIterator.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageIterator.java index 871238b03b..cb65c9b1c7 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageIterator.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageIterator.java @@ -19,6 +19,7 @@ final class StorageIterator implements Iterator> { private final Storage storage; private final SignalDeserializer deserializer; + private final boolean deleteOnIteration; private final Logger logger = Logger.getLogger(StorageIterator.class.getName()); @GuardedBy("this") @@ -28,9 +29,14 @@ final class StorageIterator implements Iterator> { @GuardedBy("this") private boolean currentResultConsumed = false; - StorageIterator(Storage storage, SignalDeserializer deserializer) { + @GuardedBy("this") + private boolean removeAllowed = false; + + StorageIterator( + Storage storage, SignalDeserializer deserializer, boolean deleteOnIteration) { this.storage = storage; this.deserializer = deserializer; + this.deleteOnIteration = deleteOnIteration; } @Override @@ -49,6 +55,7 @@ public synchronized Collection next() { } if (findNext()) { currentResultConsumed = true; + removeAllowed = true; return Objects.requireNonNull(currentResult).getContent(); } return null; @@ -56,6 +63,10 @@ public synchronized Collection next() { @Override public synchronized void remove() { + if (!removeAllowed) { + throw new IllegalStateException("next() must be called before remove()"); + } + removeAllowed = false; if (currentResult != null) { try { currentResult.delete(); @@ -71,7 +82,9 @@ private synchronized boolean findNext() { if (!currentResultConsumed) { return true; } - currentResult.delete(); + if (deleteOnIteration) { + currentResult.delete(); + } currentResult.close(); currentResult = null; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java index 59429d187c..247d045f32 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java @@ -15,16 +15,12 @@ import java.io.File; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; import javax.annotation.Nonnull; import javax.annotation.Nullable; /** - * Reads from a file and updates it in parallel in order to avoid re-reading the same items later. - * The way it does so is by creating a temporary file where all the contents are added during the - * instantiation of this class. Then, the contents are read from the temporary file, after an item - * has been read from the temporary file, the original file gets updated to remove the recently read - * data. + * Reads items sequentially from a cache file. Items can be explicitly removed after reading via + * {@link #removeTopItem()}. If not removed, items remain on disk for future reads. * *

More information on the overall storage process in the CONTRIBUTING.md file. */ @@ -33,6 +29,7 @@ public final class ReadableFile implements FileOperations { private final FileStream fileStream; private final StreamReader reader; private final Clock clock; + private final long createdTimeMillis; private final long expireTimeMillis; private final AtomicBoolean isClosed = new AtomicBoolean(false); @@ -56,15 +53,13 @@ public ReadableFile( throws IOException { this.file = file; this.clock = clock; + this.createdTimeMillis = createdTimeMillis; expireTimeMillis = createdTimeMillis + configuration.getMaxFileAgeForReadMillis(); fileStream = FileStream.create(file); reader = readerFactory.create(fileStream); } - /** - * Reads the next line available in the file and provides it to a {@link Function processing} - * which will determine whether to remove the provided line or not. - */ + /** Reads the next item available in the file. Returns null if no more items or file is closed. */ @Nullable public synchronized byte[] readNext() throws IOException { if (isClosed.get()) { @@ -76,7 +71,7 @@ public synchronized byte[] readNext() throws IOException { } byte[] resultBytes = reader.readNext(); if (resultBytes == null) { - clear(); + close(); return null; } return resultBytes; @@ -87,6 +82,13 @@ public synchronized boolean hasExpired() { return nowMillis(clock) >= expireTimeMillis; } + public synchronized long getCreatedTimeMillis() { + if (isClosed.get()) { + throw new IllegalStateException("File is closed"); + } + return createdTimeMillis; + } + @Override public synchronized boolean isClosed() { return isClosed.get(); diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/DelimitedProtoStreamReader.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/DelimitedProtoStreamReader.java index d638c118b7..574160c1fe 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/DelimitedProtoStreamReader.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/DelimitedProtoStreamReader.java @@ -32,7 +32,7 @@ public byte[] readNext() throws IOException { offset += readCt; } while (readCt != -1 && offset < itemSize); if (offset != itemSize) { - return null; // unable to read the whole item correctly + throw new IOException("Unable to read the whole item correctly"); } return bytes; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileLogRecordStorage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileLogRecordStorage.java index 71d5d884bb..18653136b2 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileLogRecordStorage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileLogRecordStorage.java @@ -33,7 +33,11 @@ public static FileLogRecordStorage create( Storage storage = new Storage<>(FolderManager.create(destinationDir, configuration, Clock.getDefault())); return new FileLogRecordStorage( - new FileSignalStorage<>(storage, SignalSerializer.ofLogs(), SignalDeserializer.ofLogs())); + new FileSignalStorage<>( + storage, + SignalSerializer.ofLogs(), + SignalDeserializer.ofLogs(), + configuration.getDeleteItemsOnIteration())); } private FileLogRecordStorage(FileSignalStorage fileSignalStorage) { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileMetricStorage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileMetricStorage.java index 8f8b41508e..17fc5c3b3b 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileMetricStorage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileMetricStorage.java @@ -34,7 +34,10 @@ public static FileMetricStorage create( new Storage<>(FolderManager.create(destinationDir, configuration, Clock.getDefault())); return new FileMetricStorage( new FileSignalStorage<>( - storage, SignalSerializer.ofMetrics(), SignalDeserializer.ofMetrics())); + storage, + SignalSerializer.ofMetrics(), + SignalDeserializer.ofMetrics(), + configuration.getDeleteItemsOnIteration())); } private FileMetricStorage(FileSignalStorage fileSignalStorage) { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileSpanStorage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileSpanStorage.java index b8cfa8996c..e45d25d662 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileSpanStorage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileSpanStorage.java @@ -33,7 +33,11 @@ public static FileSpanStorage create( Storage storage = new Storage<>(FolderManager.create(destinationDir, configuration, Clock.getDefault())); return new FileSpanStorage( - new FileSignalStorage<>(storage, SignalSerializer.ofSpans(), SignalDeserializer.ofSpans())); + new FileSignalStorage<>( + storage, + SignalSerializer.ofSpans(), + SignalDeserializer.ofSpans(), + configuration.getDeleteItemsOnIteration())); } private FileSpanStorage(FileSignalStorage fileSignalStorage) { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileStorageConfiguration.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileStorageConfiguration.java index 0a34c12b48..27e21b07cb 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileStorageConfiguration.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/storage/impl/FileStorageConfiguration.java @@ -43,6 +43,13 @@ public abstract class FileStorageConfiguration { */ public abstract int getMaxFolderSize(); + /** + * Whether to automatically delete items from disk during iteration. When true (the default), + * items are removed from disk as the iterator advances. When false, items remain on disk until + * explicitly removed via {@link java.util.Iterator#remove()}. + */ + public abstract boolean getDeleteItemsOnIteration(); + public static FileStorageConfiguration getDefault() { return builder().build(); } @@ -53,7 +60,8 @@ public static Builder builder() { .setMaxFolderSize(10 * 1024 * 1024) // 10MB .setMaxFileAgeForWriteMillis(SECONDS.toMillis(30)) .setMinFileAgeForReadMillis(SECONDS.toMillis(33)) - .setMaxFileAgeForReadMillis(HOURS.toMillis(18)); + .setMaxFileAgeForReadMillis(HOURS.toMillis(18)) + .setDeleteItemsOnIteration(true); } @AutoValue.Builder @@ -68,6 +76,8 @@ public abstract static class Builder { public abstract Builder setMaxFolderSize(int value); + public abstract Builder setDeleteItemsOnIteration(boolean value); + abstract FileStorageConfiguration autoBuild(); public final FileStorageConfiguration build() { diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java index d30466430c..b93c9bfda5 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java @@ -8,6 +8,7 @@ import static java.lang.Thread.sleep; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -39,6 +40,8 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; import java.util.List; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -48,6 +51,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +@SuppressWarnings("unchecked") @ExtendWith(MockitoExtension.class) class IntegrationTest { private Tracer tracer; @@ -64,31 +68,37 @@ class IntegrationTest { @Mock private ExporterCallback logCallback; @Mock private ExporterCallback metricCallback; @TempDir private File rootDir; + private File spansDir; + private File logsDir; + private File metricsDir; private static final long DELAY_BEFORE_READING_MILLIS = 500; + private static final long MAX_WRITING_TIME_MILLIS = 200; @BeforeEach void setUp() { - FileStorageConfiguration storageConfig = + initStorage( FileStorageConfiguration.builder() - .setMaxFileAgeForWriteMillis(DELAY_BEFORE_READING_MILLIS - 1) + .setMaxFileAgeForWriteMillis(MAX_WRITING_TIME_MILLIS) .setMinFileAgeForReadMillis(DELAY_BEFORE_READING_MILLIS) - .build(); + .build()); + } - // Setting up spans - spanStorage = FileSpanStorage.create(new File(rootDir, "spans"), storageConfig); + private void initStorage(FileStorageConfiguration storageConfig) { + spansDir = new File(rootDir, "spans"); + spanStorage = FileSpanStorage.create(spansDir, storageConfig); spanToDiskExporter = SpanToDiskExporter.builder(spanStorage).setExporterCallback(spanCallback).build(); tracer = createTracerProvider(spanToDiskExporter).get("SpanInstrumentationScope"); - // Setting up metrics - metricStorage = FileMetricStorage.create(new File(rootDir, "metrics"), storageConfig); + metricsDir = new File(rootDir, "metrics"); + metricStorage = FileMetricStorage.create(metricsDir, storageConfig); metricToDiskExporter = MetricToDiskExporter.builder(metricStorage).setExporterCallback(metricCallback).build(); meterProvider = createMeterProvider(metricToDiskExporter); meter = meterProvider.get("MetricInstrumentationScope"); - // Setting up logs - logStorage = FileLogRecordStorage.create(new File(rootDir, "logs"), storageConfig); + logsDir = new File(rootDir, "logs"); + logStorage = FileLogRecordStorage.create(logsDir, storageConfig); logToDiskExporter = LogRecordToDiskExporter.builder(logStorage).setExporterCallback(logCallback).build(); logger = createLoggerProvider(logToDiskExporter).get("LogInstrumentationScope"); @@ -118,28 +128,67 @@ void tearDown() throws IOException { } @Test - void verifyIntegration() throws InterruptedException { - // Creating span - Span span = tracer.spanBuilder("Span name").startSpan(); - span.end(); - verify(spanCallback).onExportSuccess(anyCollection()); - verifyNoMoreInteractions(spanCallback); + void verifyIntegration_defaultAutoDelete() throws InterruptedException { + // Writing to first file + createSpan(); + createLog(); + createMetric(); - // Creating log - logger.logRecordBuilder().setBody("Log body").emit(); - verify(logCallback).onExportSuccess(anyCollection()); - verifyNoMoreInteractions(spanCallback); + // Waiting to write on second file + sleep(MAX_WRITING_TIME_MILLIS); - // Creating metric - meter.counterBuilder("counter").build().add(1); - meterProvider.forceFlush(); - verify(metricCallback).onExportSuccess(anyCollection()); - verifyNoMoreInteractions(spanCallback); + // Writing to second file + createSpan(); + createLog(); + createMetric(); + + // Waiting for read time + sleep(DELAY_BEFORE_READING_MILLIS); + + // Read (default: items auto-deleted during iteration) + List storedSpans = new ArrayList<>(); + List storedLogs = new ArrayList<>(); + List storedMetrics = new ArrayList<>(); + spanStorage.forEach(storedSpans::addAll); + logStorage.forEach(storedLogs::addAll); + metricStorage.forEach(storedMetrics::addAll); + + assertThat(storedSpans).hasSize(2); + assertThat(storedLogs).hasSize(2); + assertThat(storedMetrics).hasSize(2); + + // Data is auto-deleted from disk + assertDirectoryFileCount(spansDir, 0); + assertDirectoryFileCount(logsDir, 0); + assertDirectoryFileCount(metricsDir, 0); + } + + @Test + void verifyIntegration_withoutAutoDelete() throws InterruptedException { + initStorage( + FileStorageConfiguration.builder() + .setMaxFileAgeForWriteMillis(MAX_WRITING_TIME_MILLIS) + .setMinFileAgeForReadMillis(DELAY_BEFORE_READING_MILLIS) + .setDeleteItemsOnIteration(false) + .build()); + + // Writing to first file + createSpan(); + createLog(); + createMetric(); + + // Waiting to write on second file + sleep(MAX_WRITING_TIME_MILLIS); + + // Writing to second file + createSpan(); + createLog(); + createMetric(); // Waiting for read time sleep(DELAY_BEFORE_READING_MILLIS); - // Read + // Read (items not auto-deleted) List storedSpans = new ArrayList<>(); List storedLogs = new ArrayList<>(); List storedMetrics = new ArrayList<>(); @@ -147,9 +196,97 @@ void verifyIntegration() throws InterruptedException { logStorage.forEach(storedLogs::addAll); metricStorage.forEach(storedMetrics::addAll); - assertThat(storedSpans).hasSize(1); - assertThat(storedLogs).hasSize(1); - assertThat(storedMetrics).hasSize(1); + assertThat(storedSpans).hasSize(2); + assertThat(storedLogs).hasSize(2); + assertThat(storedMetrics).hasSize(2); + + // Data stays on disk + assertDirectoryFileCount(spansDir, 2); + assertDirectoryFileCount(logsDir, 2); + assertDirectoryFileCount(metricsDir, 2); + } + + @Test + void verifyIntegration_withoutAutoDelete_explicitRemove() throws InterruptedException { + initStorage( + FileStorageConfiguration.builder() + .setMaxFileAgeForWriteMillis(MAX_WRITING_TIME_MILLIS) + .setMinFileAgeForReadMillis(DELAY_BEFORE_READING_MILLIS) + .setDeleteItemsOnIteration(false) + .build()); + + // Writing to first file + createSpan(); + createLog(); + createMetric(); + + // Waiting to write on second file + sleep(MAX_WRITING_TIME_MILLIS); + + // Writing to second file + createSpan(); + createLog(); + createMetric(); + + // Waiting for read time + sleep(DELAY_BEFORE_READING_MILLIS); + + // Read with explicit removal + List storedSpans = new ArrayList<>(); + List storedLogs = new ArrayList<>(); + List storedMetrics = new ArrayList<>(); + Iterator> spanIterator = spanStorage.iterator(); + while (spanIterator.hasNext()) { + storedSpans.addAll(spanIterator.next()); + spanIterator.remove(); + } + Iterator> logIterator = logStorage.iterator(); + while (logIterator.hasNext()) { + storedLogs.addAll(logIterator.next()); + logIterator.remove(); + } + Iterator> metricIterator = metricStorage.iterator(); + while (metricIterator.hasNext()) { + storedMetrics.addAll(metricIterator.next()); + metricIterator.remove(); + } + + assertThat(storedSpans).hasSize(2); + assertThat(storedLogs).hasSize(2); + assertThat(storedMetrics).hasSize(2); + + // Data explicitly cleared + assertDirectoryFileCount(spansDir, 0); + assertDirectoryFileCount(logsDir, 0); + assertDirectoryFileCount(metricsDir, 0); + } + + private void createSpan() { + Span span = tracer.spanBuilder("Span name").startSpan(); + span.end(); + verify(spanCallback).onExportSuccess(anyCollection()); + verifyNoMoreInteractions(spanCallback); + clearInvocations(spanCallback); + } + + private void createLog() { + logger.logRecordBuilder().setBody("Log body").emit(); + verify(logCallback).onExportSuccess(anyCollection()); + verifyNoMoreInteractions(logCallback); + clearInvocations(logCallback); + } + + private void createMetric() { + meter.counterBuilder("counter").build().add(1); + meterProvider.forceFlush(); + verify(metricCallback).onExportSuccess(anyCollection()); + verifyNoMoreInteractions(metricCallback); + clearInvocations(metricCallback); + } + + private static void assertDirectoryFileCount(File directory, int fileCount) { + assertThat(directory).isDirectory(); + assertThat(directory.listFiles()).hasSize(fileCount); } private static SdkTracerProvider createTracerProvider(SpanExporter exporter) { diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java index 044e7be9ba..16162349bf 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.util.function.Predicate; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -98,7 +99,7 @@ void closeCurrentlyWritableFile_whenItIsReadyToBeRead_andNoOtherReadableFilesAre when(clock.now()) .thenReturn(MILLISECONDS.toNanos(createdFileTime + MIN_FILE_AGE_FOR_READ_MILLIS)); - ReadableFile readableFile = folderManager.getReadableFile(); + ReadableFile readableFile = getReadableFile(); assertThat(readableFile.getFile()).isEqualTo(writableFile.getFile()); assertThat(writableFile.isClosed()).isTrue(); @@ -117,7 +118,7 @@ void closeCurrentlyWritableFile_whenItIsReadyToBeRead_andNoOtherReadableFilesAre fillWithBytes(existingFile3, MAX_FILE_SIZE); when(clock.now()).thenReturn(MILLISECONDS.toNanos(1000L + MIN_FILE_AGE_FOR_READ_MILLIS)); - ReadableFile readableFile = folderManager.getReadableFile(); + ReadableFile readableFile = getReadableFile(); assertThat(readableFile.getFile()).isEqualTo(existingFile1); folderManager.createWritableFile(); @@ -175,7 +176,7 @@ void closeExpiredReadableFileInUseIfAny_whenPurgingExpiredForReadFiles_whenCreat createFiles(expiredReadableFile, expiredWritableFile, expiredReadableFileBeingRead); when(clock.now()).thenReturn(MILLISECONDS.toNanos(900 + MIN_FILE_AGE_FOR_READ_MILLIS)); - ReadableFile readableFile = folderManager.getReadableFile(); + ReadableFile readableFile = getReadableFile(); assertThat(readableFile.getFile()).isEqualTo(expiredReadableFileBeingRead); when(clock.now()).thenReturn(MILLISECONDS.toNanos(11_500L)); @@ -199,40 +200,66 @@ void provideFileForRead_afterItsMinFileAgeForReadTimePassed() throws IOException File readableFile = new File(rootDir, String.valueOf(readableFileCreationTime)); createFiles(writableFile, readableFile); - ReadableFile file = folderManager.getReadableFile(); + ReadableFile file = getReadableFile(); assertThat(file.getFile()).isEqualTo(readableFile); } @Test void provideOldestFileForRead_whenMultipleReadableFilesAreAvailable() throws IOException { - long newerReadableFileCreationTime = 1000; - long olderReadableFileCreationTime = 900; + long firstReadableFileTimestamp = 900; + long secondReadableFileTimestamp = 1000; + long thirdReadableFileTimestamp = 1500; long currentTime = - MILLISECONDS.toNanos(newerReadableFileCreationTime + MIN_FILE_AGE_FOR_READ_MILLIS); + MILLISECONDS.toNanos(thirdReadableFileTimestamp + MIN_FILE_AGE_FOR_READ_MILLIS); when(clock.now()).thenReturn(currentTime); File writableFile = new File(rootDir, String.valueOf(currentTime)); - File readableFileOlder = new File(rootDir, String.valueOf(olderReadableFileCreationTime)); - File readableFileNewer = new File(rootDir, String.valueOf(newerReadableFileCreationTime)); - createFiles(writableFile, readableFileNewer, readableFileOlder); + File firstReadableFile = new File(rootDir, String.valueOf(firstReadableFileTimestamp)); + File secondReadableFile = new File(rootDir, String.valueOf(secondReadableFileTimestamp)); + File thirdReadableFile = new File(rootDir, String.valueOf(thirdReadableFileTimestamp)); + createFiles(writableFile, firstReadableFile, secondReadableFile, thirdReadableFile); - ReadableFile file = folderManager.getReadableFile(); + ReadableFile file = getReadableFile(); - assertThat(file.getFile()).isEqualTo(readableFileOlder); + assertThat(file.getFile()).isEqualTo(firstReadableFile); + } + + @Test + void provideOldestFileForRead_withCustomFilter() throws IOException { + long firstReadableFileTimestamp = 900; + long secondReadableFileTimestamp = 1000; + long thirdReadableFileTimestamp = 1500; + long currentTime = + MILLISECONDS.toNanos(thirdReadableFileTimestamp + MIN_FILE_AGE_FOR_READ_MILLIS); + when(clock.now()).thenReturn(currentTime); + File writableFile = new File(rootDir, String.valueOf(currentTime)); + File firstReadableFile = new File(rootDir, String.valueOf(firstReadableFileTimestamp)); + File secondReadableFile = new File(rootDir, String.valueOf(secondReadableFileTimestamp)); + File thirdReadableFile = new File(rootDir, String.valueOf(thirdReadableFileTimestamp)); + createFiles(writableFile, firstReadableFile, secondReadableFile, thirdReadableFile); + + ReadableFile file = + getReadableFile( + it -> { + // Exclude the oldest file so that the next oldest is selected. + return it.getCreatedTimeMillis() <= firstReadableFileTimestamp; + }); + + assertThat(file.getFile()).isEqualTo(secondReadableFile); } @Test void provideNullFileForRead_whenNoFilesAreAvailable() throws IOException { - assertThat(folderManager.getReadableFile()).isNull(); + assertThat(getReadableFile()).isNull(); } @Test - void provideNullFileForRead_whenOnlyReadableFilesAreAvailable() throws IOException { + void provideNullFileForRead_whenOnlyWritableFilesAreAvailable() throws IOException { long currentTime = 1000; File writableFile = new File(rootDir, String.valueOf(currentTime)); createFiles(writableFile); - assertThat(folderManager.getReadableFile()).isNull(); + assertThat(getReadableFile()).isNull(); } @Test @@ -243,7 +270,16 @@ void provideNullFileForRead_whenReadableFilesAreExpired() throws IOException { createFiles(expiredReadableFile1, expiredReadableFile2); when(clock.now()).thenReturn(creationReferenceTime + MAX_FILE_AGE_FOR_READ_MILLIS); - assertThat(folderManager.getReadableFile()).isNull(); + assertThat(getReadableFile()).isNull(); + } + + private ReadableFile getReadableFile() throws IOException { + return getReadableFile(file -> false); + } + + private ReadableFile getReadableFile(Predicate exclude) + throws IOException { + return folderManager.getReadableFile(exclude); } private static void fillWithBytes(File file, int size) throws IOException { diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageIteratorTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageIteratorTest.java new file mode 100644 index 0000000000..bf30769632 --- /dev/null +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageIteratorTest.java @@ -0,0 +1,283 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.storage; + +import static io.opentelemetry.contrib.disk.buffering.internal.storage.TestData.FIRST_LOG_RECORD; +import static io.opentelemetry.contrib.disk.buffering.internal.storage.TestData.MAX_FILE_AGE_FOR_WRITE_MILLIS; +import static io.opentelemetry.contrib.disk.buffering.internal.storage.TestData.MIN_FILE_AGE_FOR_READ_MILLIS; +import static io.opentelemetry.contrib.disk.buffering.internal.storage.TestData.SECOND_LOG_RECORD; +import static io.opentelemetry.contrib.disk.buffering.internal.storage.TestData.THIRD_LOG_RECORD; +import static io.opentelemetry.contrib.disk.buffering.internal.storage.TestData.getConfiguration; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.sdk.common.Clock; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class StorageIteratorTest { + @TempDir private File destinationDir; + private Storage storage; + private SignalSerializer serializer; + private AtomicLong currentTimeMillis; + private static final SignalDeserializer DESERIALIZER = SignalDeserializer.ofLogs(); + + @BeforeEach + void setUp() { + currentTimeMillis = new AtomicLong(0); + serializer = SignalSerializer.ofLogs(); + FolderManager folderManager = + FolderManager.create(destinationDir, getConfiguration(), new TestClock()); + storage = new Storage<>(folderManager); + } + + @AfterEach + void tearDown() throws IOException { + storage.close(); + } + + // -- Tests with deleteOnIteration=false (explicit removal mode) -- + + @Test + void explicitMode_removeBeforeNext_throwsIllegalStateException() throws IOException { + writeItem(FIRST_LOG_RECORD); + forwardToReadTime(); + + Iterator> iterator = + new StorageIterator<>(storage, DESERIALIZER, false); + assertThat(iterator.hasNext()).isTrue(); + + assertThatThrownBy(iterator::remove) + .isInstanceOf(IllegalStateException.class) + .hasMessage("next() must be called before remove()"); + } + + @Test + void explicitMode_doubleRemove_throwsIllegalStateException() throws IOException { + writeItem(FIRST_LOG_RECORD); + forwardToReadTime(); + + Iterator> iterator = + new StorageIterator<>(storage, DESERIALIZER, false); + iterator.next(); + iterator.remove(); + + assertThatThrownBy(iterator::remove) + .isInstanceOf(IllegalStateException.class) + .hasMessage("next() must be called before remove()"); + } + + @Test + void explicitMode_iterateWithRemove_deletesFiles() throws IOException { + writeItem(FIRST_LOG_RECORD); + forwardToReadTime(); + + Iterator> iterator = + new StorageIterator<>(storage, DESERIALIZER, false); + List items = new ArrayList<>(); + while (iterator.hasNext()) { + items.addAll(iterator.next()); + iterator.remove(); + } + + assertThat(items).containsExactly(FIRST_LOG_RECORD); + assertThat(destinationDir.list()).isEmpty(); + } + + @Test + void explicitMode_iterateWithoutRemove_preservesFiles() throws IOException { + writeItem(FIRST_LOG_RECORD); + forwardToReadTime(); + + Iterator> iterator = + new StorageIterator<>(storage, DESERIALIZER, false); + List items = new ArrayList<>(); + while (iterator.hasNext()) { + items.addAll(iterator.next()); + } + + assertThat(items).containsExactly(FIRST_LOG_RECORD); + assertThat(destinationDir.list()).hasSize(1); + } + + @Test + void explicitMode_iterateAcrossMultipleFiles_withRemove() throws IOException { + long firstWriteTime = 1000; + long secondWriteTime = firstWriteTime + MAX_FILE_AGE_FOR_WRITE_MILLIS + 1; + + currentTimeMillis.set(firstWriteTime); + writeItem(FIRST_LOG_RECORD); + + currentTimeMillis.set(secondWriteTime); + writeItem(SECOND_LOG_RECORD); + + currentTimeMillis.set(secondWriteTime + MIN_FILE_AGE_FOR_READ_MILLIS); + + Iterator> iterator = + new StorageIterator<>(storage, DESERIALIZER, false); + List items = new ArrayList<>(); + while (iterator.hasNext()) { + items.addAll(iterator.next()); + iterator.remove(); + } + + assertThat(items).containsExactly(FIRST_LOG_RECORD, SECOND_LOG_RECORD); + assertThat(destinationDir.list()).isEmpty(); + } + + @Test + void explicitMode_iterateAcrossMultipleFiles_withoutRemove() throws IOException { + long firstWriteTime = 1000; + long secondWriteTime = firstWriteTime + MAX_FILE_AGE_FOR_WRITE_MILLIS + 1; + + currentTimeMillis.set(firstWriteTime); + writeItem(FIRST_LOG_RECORD); + + currentTimeMillis.set(secondWriteTime); + writeItem(SECOND_LOG_RECORD); + + currentTimeMillis.set(secondWriteTime + MIN_FILE_AGE_FOR_READ_MILLIS); + + Iterator> iterator = + new StorageIterator<>(storage, DESERIALIZER, false); + List items = new ArrayList<>(); + while (iterator.hasNext()) { + items.addAll(iterator.next()); + } + + assertThat(items).containsExactly(FIRST_LOG_RECORD, SECOND_LOG_RECORD); + assertThat(destinationDir.list()).hasSize(2); + } + + @Test + void explicitMode_selectiveRemove_onlyDeletesRemovedItems() throws IOException { + long firstWriteTime = 1000; + long secondWriteTime = firstWriteTime + MAX_FILE_AGE_FOR_WRITE_MILLIS + 1; + long thirdWriteTime = secondWriteTime + MAX_FILE_AGE_FOR_WRITE_MILLIS + 1; + + currentTimeMillis.set(firstWriteTime); + writeItem(FIRST_LOG_RECORD); + + currentTimeMillis.set(secondWriteTime); + writeItem(SECOND_LOG_RECORD); + + currentTimeMillis.set(thirdWriteTime); + writeItem(THIRD_LOG_RECORD); + + currentTimeMillis.set(thirdWriteTime + MIN_FILE_AGE_FOR_READ_MILLIS); + + Iterator> iterator = + new StorageIterator<>(storage, DESERIALIZER, false); + List items = new ArrayList<>(); + int index = 0; + while (iterator.hasNext()) { + items.addAll(iterator.next()); + if (index == 0 || index == 2) { + iterator.remove(); + } + index++; + } + + assertThat(items).containsExactly(FIRST_LOG_RECORD, SECOND_LOG_RECORD, THIRD_LOG_RECORD); + assertThat(destinationDir.list()).hasSize(1); + assertThat(destinationDir.list()).containsExactly(String.valueOf(secondWriteTime)); + } + + // -- Tests with deleteOnIteration=true (auto-delete mode) -- + + @Test + void autoDeleteMode_iterateDeletesFiles() throws IOException { + writeItem(FIRST_LOG_RECORD); + forwardToReadTime(); + + Iterator> iterator = + new StorageIterator<>(storage, DESERIALIZER, true); + List items = new ArrayList<>(); + while (iterator.hasNext()) { + items.addAll(iterator.next()); + } + + assertThat(items).containsExactly(FIRST_LOG_RECORD); + assertThat(destinationDir.list()).isEmpty(); + } + + @Test + void autoDeleteMode_iterateAcrossMultipleFiles_deletesAll() throws IOException { + long firstWriteTime = 1000; + long secondWriteTime = firstWriteTime + MAX_FILE_AGE_FOR_WRITE_MILLIS + 1; + + currentTimeMillis.set(firstWriteTime); + writeItem(FIRST_LOG_RECORD); + + currentTimeMillis.set(secondWriteTime); + writeItem(SECOND_LOG_RECORD); + + currentTimeMillis.set(secondWriteTime + MIN_FILE_AGE_FOR_READ_MILLIS); + + Iterator> iterator = + new StorageIterator<>(storage, DESERIALIZER, true); + List items = new ArrayList<>(); + while (iterator.hasNext()) { + items.addAll(iterator.next()); + } + + assertThat(items).containsExactly(FIRST_LOG_RECORD, SECOND_LOG_RECORD); + assertThat(destinationDir.list()).isEmpty(); + } + + @Test + void autoDeleteMode_removeBeforeNext_stillThrows() throws IOException { + writeItem(FIRST_LOG_RECORD); + forwardToReadTime(); + + Iterator> iterator = + new StorageIterator<>(storage, DESERIALIZER, true); + assertThat(iterator.hasNext()).isTrue(); + + assertThatThrownBy(iterator::remove) + .isInstanceOf(IllegalStateException.class) + .hasMessage("next() must be called before remove()"); + } + + private void writeItem(LogRecordData item) throws IOException { + serializer.initialize(Collections.singletonList(item)); + try { + storage.write(serializer); + } finally { + serializer.reset(); + } + } + + private void forwardToReadTime() { + currentTimeMillis.set(currentTimeMillis.get() + MIN_FILE_AGE_FOR_READ_MILLIS); + } + + private class TestClock implements Clock { + @Override + public long now() { + return TimeUnit.MILLISECONDS.toNanos(currentTimeMillis.get()); + } + + @Override + public long nanoTime() { + return 0; + } + } +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java index 927e6f3ecf..4283308266 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java @@ -84,10 +84,10 @@ void writeAndRead() throws IOException { .hasMessage("You must close any previous ReadableResult before requesting a new one"); } - // Read again when no more data is available (delete file) + // Read again when no more data is available readResult2.close(); assertThat(storage.readNext(DESERIALIZER)).isNull(); - assertThat(destinationDir.list()).isEmpty(); + assertThat(destinationDir.list()).hasSize(1); } @Test @@ -170,6 +170,87 @@ void whenNoMoreLinesToRead_lookForNewFileToRead() throws IOException { result2.close(); } + @Test + void whenDeleteClosesFile_nextReadRecoversGracefully() throws IOException { + long firstFileWriteTime = 1000; + long secondFileWriteTime = firstFileWriteTime + MAX_FILE_AGE_FOR_WRITE_MILLIS + 1; + currentTimeMillis.set(firstFileWriteTime); + assertThat(write(Collections.singletonList(FIRST_LOG_RECORD))).isTrue(); + + currentTimeMillis.set(secondFileWriteTime); + assertThat(write(Collections.singletonList(SECOND_LOG_RECORD))).isTrue(); + + currentTimeMillis.set(secondFileWriteTime + MIN_FILE_AGE_FOR_READ_MILLIS); + + // Read first item and delete it (this empties and closes the first file) + ReadableResult result = storage.readNext(DESERIALIZER); + assertThat(result).isNotNull(); + assertThat(result.getContent()).containsExactly(FIRST_LOG_RECORD); + result.delete(); + result.close(); + + // Next read should recover and find the second file + ReadableResult result2 = storage.readNext(DESERIALIZER); + assertThat(result2).isNotNull(); + assertThat(result2.getContent()).containsExactly(SECOND_LOG_RECORD); + result2.close(); + } + + @Test + void whenReadingMultipleFilesWithoutDeleting_advancesCorrectly() throws IOException { + long firstFileWriteTime = 1000; + long secondFileWriteTime = firstFileWriteTime + MAX_FILE_AGE_FOR_WRITE_MILLIS + 1; + currentTimeMillis.set(firstFileWriteTime); + assertThat(write(Collections.singletonList(FIRST_LOG_RECORD))).isTrue(); + + currentTimeMillis.set(secondFileWriteTime); + assertThat(write(Collections.singletonList(SECOND_LOG_RECORD))).isTrue(); + + currentTimeMillis.set(secondFileWriteTime + MIN_FILE_AGE_FOR_READ_MILLIS); + + // Read first item without deleting + ReadableResult result = storage.readNext(DESERIALIZER); + assertThat(result).isNotNull(); + assertThat(result.getContent()).containsExactly(FIRST_LOG_RECORD); + result.close(); + + // Read second item (reader advances past first file since it's exhausted) + ReadableResult result2 = storage.readNext(DESERIALIZER); + assertThat(result2).isNotNull(); + assertThat(result2.getContent()).containsExactly(SECOND_LOG_RECORD); + result2.close(); + + // No more data + assertThat(storage.readNext(DESERIALIZER)).isNull(); + + // Both files remain on disk + assertThat(destinationDir.list()).hasSize(2); + } + + @Test + void afterReadSessionEnds_canReReadUnDeletedFiles() throws IOException { + assertThat(write(Collections.singletonList(FIRST_LOG_RECORD))).isTrue(); + forwardToReadTime(); + + // First session: read without deleting + ReadableResult result = storage.readNext(DESERIALIZER); + assertThat(result).isNotNull(); + assertThat(result.getContent()).containsExactly(FIRST_LOG_RECORD); + result.close(); + + // Exhaust the session (readNext returns null, resetting fileExclusion) + assertThat(storage.readNext(DESERIALIZER)).isNull(); + + // Second session: should be able to re-read the same file + ReadableResult result2 = storage.readNext(DESERIALIZER); + assertThat(result2).isNotNull(); + assertThat(result2.getContent()).containsExactly(FIRST_LOG_RECORD); + result2.close(); + + // File still on disk + assertThat(destinationDir.list()).hasSize(1); + } + @Test void deleteFilesWithCorruptedData() throws IOException { // Add files with invalid data diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java index 89a1c03471..ae08733e70 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java @@ -12,6 +12,7 @@ import static io.opentelemetry.contrib.disk.buffering.internal.storage.TestData.getConfiguration; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -88,8 +89,7 @@ void whenReadingLastLine_deleteOriginalFile_and_close() throws IOException { } @Test - void whenNoMoreLinesAvailableToRead_deleteOriginalFile_close_and_returnNoContentStatus() - throws IOException { + void whenNoMoreLinesAvailableToRead_close_and_returnNoContentStatus() throws IOException { File emptyFile = new File(dir, "emptyFile"); if (!emptyFile.createNewFile()) { fail("Could not create file for tests"); @@ -101,7 +101,7 @@ void whenNoMoreLinesAvailableToRead_deleteOriginalFile_close_and_returnNoContent assertThat(emptyReadableFile.readNext()).isNull(); assertThat(emptyReadableFile.isClosed()).isTrue(); - assertThat(emptyFile.exists()).isFalse(); + assertThat(emptyFile.exists()).isTrue(); } @Test @@ -120,6 +120,20 @@ void whenReadingAfterClosed_returnNull() throws IOException { assertThat(readableFile.readNext()).isNull(); } + @Test + void getCreatedTimeMillis_returnsValueWhenOpen() { + assertThat(readableFile.getCreatedTimeMillis()).isEqualTo(CREATED_TIME_MILLIS); + } + + @Test + void getCreatedTimeMillis_throwsWhenClosed() throws IOException { + readableFile.close(); + + assertThatThrownBy(() -> readableFile.getCreatedTimeMillis()) + .isInstanceOf(IllegalStateException.class) + .hasMessage("File is closed"); + } + private static List getRemainingDataAndClose(ReadableFile readableFile) throws IOException { List result = new ArrayList<>();