Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 49 additions & 5 deletions disk-buffering/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ The available configuration parameters are the following:
* Max age for file reading. After that time passes, the file will be considered stale and will be
removed when new files are created. No more data will be read from a file past this time. Defaults
to 18 hours.
* Delete items on iteration. Controls whether items are automatically removed from disk as the
iterator advances. Defaults to `true`. See [Deleting data](#deleting-data) for more details.

```java
// Root dir
Expand Down Expand Up @@ -105,7 +107,8 @@ Now when creating signals using your `OpenTelemetry` instance, those will get st
### Reading data

In order to read data, we can iterate through our signal storage objects and then forward them to
a network exporter, as shown in the example for spans below.
a network exporter. By default, items are automatically deleted from disk as the iterator advances,
so a simple iteration is all that's needed:

```java
/**
Expand All @@ -129,11 +132,52 @@ public boolean exportSpansFromDisk(SpanExporter networkExporter, long timeout) {
}
```

The `File*Storage` iterators delete the previously returned collection when `next()` is called,
assuming that if the next collection is requested is because the previous one was successfully
consumed.
### Deleting data

Both the writing and reading processes can run in parallel and they don't overlap
By default, items are automatically deleted from disk as the iterator advances. You can also
clear all data at once by calling `SignalStorage.clear()`.

#### Automatic vs explicit deletion

The default behavior (`deleteItemsOnIteration = true`) automatically removes items from disk during
iteration. This means you don't need to call `Iterator.remove()` since the data is cleaned up as the
iterator advances.

If you need more control (e.g., only deleting items after a successful network export), set
`deleteItemsOnIteration` to `false` in the configuration:

```java
FileStorageConfiguration config = FileStorageConfiguration.builder()
.setDeleteItemsOnIteration(false)
.build();
SignalStorage.Span spanStorage = FileSpanStorage.create(new File(rootDir, "spans"), config);
```

With this setting, items remain on disk until explicitly removed via `Iterator.remove()`:

```java
public boolean exportSpansFromDisk(SpanExporter networkExporter, long timeout) {
Iterator<Collection<SpanData>> spansIterator = spanStorage.iterator();
while (spansIterator.hasNext()) {
CompletableResultCode resultCode = networkExporter.export(spansIterator.next());
resultCode.join(timeout, TimeUnit.MILLISECONDS);

if (resultCode.isSuccess()) {
spansIterator.remove();
} else {
return false;
}
}
return true;
}
```

Note that even with explicit deletion, disk usage is still bounded by the configured max folder size and max file
age, so stale files are automatically purged when there's not enough space available before new data is written.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


### More details on the writing and reading processes

Both the writing and reading processes can run in parallel as they won't overlap
because each is supposed to happen in different files. We ensure that reader and writer don't
accidentally meet in the same file by using the configurable parameters. These parameters set
non-overlapping time frames for each action to be done on a single file at a time. On top of that,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public final class FileSignalStorage<T> implements SignalStorage<T> {
private final Storage<T> storage;
private final SignalSerializer<T> serializer;
private final SignalDeserializer<T> deserializer;
private final boolean deleteItemsOnIteration;
private final Logger logger = Logger.getLogger(FileSignalStorage.class.getName());
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final Object iteratorLock = new Object();
Expand All @@ -34,10 +35,14 @@ public final class FileSignalStorage<T> implements SignalStorage<T> {
private Iterator<Collection<T>> iterator;

public FileSignalStorage(
Storage<T> storage, SignalSerializer<T> serializer, SignalDeserializer<T> deserializer) {
Storage<T> storage,
SignalSerializer<T> serializer,
SignalDeserializer<T> deserializer,
boolean deleteItemsOnIteration) {
this.storage = storage;
this.serializer = serializer;
this.deserializer = deserializer;
this.deleteItemsOnIteration = deleteItemsOnIteration;
}

@Override
Expand Down Expand Up @@ -84,7 +89,7 @@ public void close() throws IOException {
public Iterator<Collection<T>> iterator() {
synchronized (iteratorLock) {
if (iterator == null) {
iterator = new StorageIterator<>(storage, deserializer);
iterator = new StorageIterator<>(storage, deserializer, deleteItemsOnIteration);
}
return iterator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,21 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.jetbrains.annotations.NotNull;

public final class FolderManager implements Closeable {
private final File folder;
private final Clock clock;
private final FileStorageConfiguration configuration;
private final Logger logger = Logger.getLogger(FolderManager.class.getName());
private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
@Nullable private ReadableFile currentReadableFile;
@Nullable private WritableFile currentWritableFile;

Expand All @@ -46,19 +52,33 @@ public FolderManager(File folder, FileStorageConfiguration configuration, Clock
this.clock = clock;
}

static class CacheFile {
private final File file;
private final long createdTimeMillis;

Comment thread
LikeTheSalad marked this conversation as resolved.
CacheFile(File file, long createdTimeMillis) {
this.file = file;
this.createdTimeMillis = createdTimeMillis;
}

long getCreatedTimeMillis() {
return createdTimeMillis;
}
}

@Override
public void close() throws IOException {
closeCurrentFiles();
}

@Nullable
public synchronized ReadableFile getReadableFile() throws IOException {
public synchronized ReadableFile getReadableFile(Predicate<CacheFile> excludeFiles)
throws IOException {
currentReadableFile = null;
File readableFile = findReadableFile();
if (readableFile != null) {
CacheFile selectedFile = selectReadableFile(listCacheFiles(excludeFiles));
if (selectedFile != null) {
currentReadableFile =
new ReadableFile(
readableFile, Long.parseLong(readableFile.getName()), clock, configuration);
new ReadableFile(selectedFile.file, selectedFile.createdTimeMillis, clock, configuration);
return currentReadableFile;
}
return null;
Expand Down Expand Up @@ -93,29 +113,56 @@ public synchronized void clear() throws IOException {
}
}

private List<CacheFile> listCacheFiles(Predicate<CacheFile> exclude) {
File[] existingFiles = folder.listFiles();
if (existingFiles == null) {
return Collections.emptyList();
}
ArrayList<CacheFile> files = new ArrayList<>();
for (File file : existingFiles) {
CacheFile cacheFile = fileToCacheFile(file);
if (cacheFile != null && !exclude.test(cacheFile)) {
files.add(cacheFile);
}
}
return Collections.unmodifiableList(files);
}

@Nullable
private CacheFile fileToCacheFile(File file) {
String fileName = file.getName();
if (!NUMBER_PATTERN.matcher(fileName).matches()) {
logger.finer(String.format("Invalid cache file name: '%s'", fileName));
return null;
}
return new CacheFile(file, Long.parseLong(fileName));
}

@Nullable
private File findReadableFile() throws IOException {
private CacheFile selectReadableFile(List<CacheFile> files) throws IOException {
if (files.isEmpty()) {
return null;
}

long currentTime = nowMillis(clock);
File[] existingFiles = folder.listFiles();
File oldestFileAvailable = null;
CacheFile oldestFileAvailable = null;
long oldestFileCreationTimeMillis = 0;
if (existingFiles != null) {
for (File existingFile : existingFiles) {
long existingFileCreationTimeMillis = Long.parseLong(existingFile.getName());
if (isReadyToBeRead(currentTime, existingFileCreationTimeMillis)
&& !hasExpiredForReading(currentTime, existingFileCreationTimeMillis)) {
if (oldestFileAvailable == null
|| existingFileCreationTimeMillis < oldestFileCreationTimeMillis) {
oldestFileCreationTimeMillis = existingFileCreationTimeMillis;
oldestFileAvailable = existingFile;
}
for (CacheFile existingFile : files) {
long existingFileCreationTimeMillis = existingFile.createdTimeMillis;
if (isReadyToBeRead(currentTime, existingFileCreationTimeMillis)
&& !hasExpiredForReading(currentTime, existingFileCreationTimeMillis)) {
if (oldestFileAvailable == null
|| existingFileCreationTimeMillis < oldestFileCreationTimeMillis) {
oldestFileCreationTimeMillis = existingFileCreationTimeMillis;
oldestFileAvailable = existingFile;
}
}
}

// Checking if the oldest available file is currently the writable file.
if (oldestFileAvailable != null
&& currentWritableFile != null
&& oldestFileAvailable.equals(currentWritableFile.getFile())) {
&& oldestFileAvailable.file.equals(currentWritableFile.getFile())) {
currentWritableFile.close();
}
return oldestFileAvailable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.logging.Logger;
import javax.annotation.Nullable;

public final class Storage<T> implements Closeable {
private static final int MAX_ATTEMPTS = 3;
private final Logger logger = Logger.getLogger(Storage.class.getName());
private final FolderManager folderManager;
private volatile Predicate<FolderManager.CacheFile> fileExclusion = file -> false;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicBoolean activeReadResultAvailable = new AtomicBoolean(false);
private final AtomicReference<WritableFile> writableFileRef = new AtomicReference<>();
Expand Down Expand Up @@ -91,7 +93,11 @@ public ReadableResult<T> readNext(SignalDeserializer<T> deserializer) throws IOE
throw new IllegalStateException(
"You must close any previous ReadableResult before requesting a new one");
}
return doReadNext(deserializer, 1);
ReadableResult<T> result = doReadNext(deserializer, 1);
if (result == null) {
fileExclusion = file -> false;
}
return result;
}

@Nullable
Expand All @@ -106,9 +112,13 @@ private ReadableResult<T> doReadNext(SignalDeserializer<T> deserializer, int att
return null;
}
ReadableFile readableFile = readableFileRef.get();
if (readableFile != null && readableFile.isClosed()) {
readableFileRef.set(null);
readableFile = null;
}
Comment thread
LikeTheSalad marked this conversation as resolved.
if (readableFile == null) {
logger.finer("Obtaining a new readableFile from the folderManager.");
readableFile = folderManager.getReadableFile();
readableFile = folderManager.getReadableFile(Objects.requireNonNull(fileExclusion));
readableFileRef.set(readableFile);
if (readableFile == null) {
logger.fine("Unable to get or create readable file.");
Expand All @@ -117,19 +127,27 @@ private ReadableResult<T> doReadNext(SignalDeserializer<T> deserializer, int att
}

logger.finer("Attempting to read data from " + readableFile);
byte[] result = readableFile.readNext();
if (result != null) {
try {
List<T> items = deserializer.deserialize(result);
activeReadResultAvailable.set(true);
return new FileReadResult(items, readableFile);
} catch (DeserializationException e) {
// Data corrupted, clear file.
readableFile.clear();
long currentFileCreatedTime = readableFile.getCreatedTimeMillis();
try {
byte[] result = readableFile.readNext();
if (result != null) {
try {
List<T> items = deserializer.deserialize(result);
activeReadResultAvailable.set(true);
return new FileReadResult(items, readableFile);
} catch (DeserializationException e) {
// Data corrupted, clear file.
readableFile.clear();
}
}
} catch (IOException e) {
// Proto data corrupted, clear file.
readableFile.clear();
}

// Retry with new file
// Search for newer files than the current one.
fileExclusion = file -> file.getCreatedTimeMillis() <= currentFileCreatedTime;
readableFile.close();
readableFileRef.set(null);
return doReadNext(deserializer, ++attemptNumber);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related, but loops are better than recursion as they don't pollute the stack.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been ages since I wrote this part. I honestly don't remember why I chose recursion, but I think it's because on a happy path, it should only be executed once, so the recursion would only be present on exceptions, hence an iteration was not the expected, common behavior. I think I understand what you're saying, but so far it doesn't seem to cause an issue in practice, and it also doesn't seem to be a trivial change, so it wouldn't fall under my current set of priorities. Still, if you're interested in finding an alternative solution, feel free to open a PR, and I'll take a look 👍

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
final class StorageIterator<T> implements Iterator<Collection<T>> {
private final Storage<T> storage;
private final SignalDeserializer<T> deserializer;
private final boolean deleteOnIteration;
private final Logger logger = Logger.getLogger(StorageIterator.class.getName());

@GuardedBy("this")
Expand All @@ -28,9 +29,14 @@ final class StorageIterator<T> implements Iterator<Collection<T>> {
@GuardedBy("this")
private boolean currentResultConsumed = false;

StorageIterator(Storage<T> storage, SignalDeserializer<T> deserializer) {
@GuardedBy("this")
private boolean removeAllowed = false;

StorageIterator(
Storage<T> storage, SignalDeserializer<T> deserializer, boolean deleteOnIteration) {
this.storage = storage;
this.deserializer = deserializer;
this.deleteOnIteration = deleteOnIteration;
}

@Override
Expand All @@ -49,13 +55,18 @@ public synchronized Collection<T> next() {
}
if (findNext()) {
currentResultConsumed = true;
removeAllowed = true;
return Objects.requireNonNull(currentResult).getContent();
}
return null;
}

@Override
public synchronized void remove() {
if (!removeAllowed) {
throw new IllegalStateException("next() must be called before remove()");
}
removeAllowed = false;
if (currentResult != null) {
try {
currentResult.delete();
Expand All @@ -71,7 +82,9 @@ private synchronized boolean findNext() {
if (!currentResultConsumed) {
return true;
}
currentResult.delete();
if (deleteOnIteration) {
currentResult.delete();
}
currentResult.close();
currentResult = null;
}
Expand Down
Loading
Loading