Skip to content

Commit 4b01654

Browse files
Disk buffering iterator explicit removal config option (#2560)
Co-authored-by: Jay DeLuca <jaydeluca4@gmail.com>
1 parent 88e8be4 commit 4b01654

16 files changed

Lines changed: 806 additions & 105 deletions

File tree

disk-buffering/README.md

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ The available configuration parameters are the following:
3636
* Max age for file reading. After that time passes, the file will be considered stale and will be
3737
removed when new files are created. No more data will be read from a file past this time. Defaults
3838
to 18 hours.
39+
* Delete items on iteration. Controls whether items are automatically removed from disk as the
40+
iterator advances. Defaults to `true`. See [Deleting data](#deleting-data) for more details.
3941

4042
```java
4143
// Root dir
@@ -105,7 +107,8 @@ Now when creating signals using your `OpenTelemetry` instance, those will get st
105107
### Reading data
106108

107109
In order to read data, we can iterate through our signal storage objects and then forward them to
108-
a network exporter, as shown in the example for spans below.
110+
a network exporter. By default, items are automatically deleted from disk as the iterator advances,
111+
so a simple iteration is all that's needed:
109112

110113
```java
111114
/**
@@ -129,11 +132,52 @@ public boolean exportSpansFromDisk(SpanExporter networkExporter, long timeout) {
129132
}
130133
```
131134

132-
The `File*Storage` iterators delete the previously returned collection when `next()` is called,
133-
assuming that if the next collection is requested is because the previous one was successfully
134-
consumed.
135+
### Deleting data
135136

136-
Both the writing and reading processes can run in parallel and they don't overlap
137+
By default, items are automatically deleted from disk as the iterator advances. You can also
138+
clear all data at once by calling `SignalStorage.clear()`.
139+
140+
#### Automatic vs explicit deletion
141+
142+
The default behavior (`deleteItemsOnIteration = true`) automatically removes items from disk during
143+
iteration. This means you don't need to call `Iterator.remove()` since the data is cleaned up as the
144+
iterator advances.
145+
146+
If you need more control (e.g., only deleting items after a successful network export), set
147+
`deleteItemsOnIteration` to `false` in the configuration:
148+
149+
```java
150+
FileStorageConfiguration config = FileStorageConfiguration.builder()
151+
.setDeleteItemsOnIteration(false)
152+
.build();
153+
SignalStorage.Span spanStorage = FileSpanStorage.create(new File(rootDir, "spans"), config);
154+
```
155+
156+
With this setting, items remain on disk until explicitly removed via `Iterator.remove()`:
157+
158+
```java
159+
public boolean exportSpansFromDisk(SpanExporter networkExporter, long timeout) {
160+
Iterator<Collection<SpanData>> spansIterator = spanStorage.iterator();
161+
while (spansIterator.hasNext()) {
162+
CompletableResultCode resultCode = networkExporter.export(spansIterator.next());
163+
resultCode.join(timeout, TimeUnit.MILLISECONDS);
164+
165+
if (resultCode.isSuccess()) {
166+
spansIterator.remove();
167+
} else {
168+
return false;
169+
}
170+
}
171+
return true;
172+
}
173+
```
174+
175+
Note that even with explicit deletion, disk usage is still bounded by the configured max folder size and max file
176+
age, so stale files are automatically purged when there's not enough space available before new data is written.
177+
178+
### More details on the writing and reading processes
179+
180+
Both the writing and reading processes can run in parallel as they won't overlap
137181
because each is supposed to happen in different files. We ensure that reader and writer don't
138182
accidentally meet in the same file by using the configurable parameters. These parameters set
139183
non-overlapping time frames for each action to be done on a single file at a time. On top of that,

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FileSignalStorage.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public final class FileSignalStorage<T> implements SignalStorage<T> {
2525
private final Storage<T> storage;
2626
private final SignalSerializer<T> serializer;
2727
private final SignalDeserializer<T> deserializer;
28+
private final boolean deleteItemsOnIteration;
2829
private final Logger logger = Logger.getLogger(FileSignalStorage.class.getName());
2930
private final AtomicBoolean isClosed = new AtomicBoolean(false);
3031
private final Object iteratorLock = new Object();
@@ -34,10 +35,14 @@ public final class FileSignalStorage<T> implements SignalStorage<T> {
3435
private Iterator<Collection<T>> iterator;
3536

3637
public FileSignalStorage(
37-
Storage<T> storage, SignalSerializer<T> serializer, SignalDeserializer<T> deserializer) {
38+
Storage<T> storage,
39+
SignalSerializer<T> serializer,
40+
SignalDeserializer<T> deserializer,
41+
boolean deleteItemsOnIteration) {
3842
this.storage = storage;
3943
this.serializer = serializer;
4044
this.deserializer = deserializer;
45+
this.deleteItemsOnIteration = deleteItemsOnIteration;
4146
}
4247

4348
@Override
@@ -84,7 +89,7 @@ public void close() throws IOException {
8489
public Iterator<Collection<T>> iterator() {
8590
synchronized (iteratorLock) {
8691
if (iterator == null) {
87-
iterator = new StorageIterator<>(storage, deserializer);
92+
iterator = new StorageIterator<>(storage, deserializer, deleteItemsOnIteration);
8893
}
8994
return iterator;
9095
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManager.java

Lines changed: 66 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,21 @@
1515
import java.io.File;
1616
import java.io.IOException;
1717
import java.util.ArrayList;
18+
import java.util.Collections;
1819
import java.util.List;
1920
import java.util.Objects;
21+
import java.util.function.Predicate;
22+
import java.util.logging.Logger;
23+
import java.util.regex.Pattern;
2024
import javax.annotation.Nullable;
2125
import org.jetbrains.annotations.NotNull;
2226

2327
public final class FolderManager implements Closeable {
2428
private final File folder;
2529
private final Clock clock;
2630
private final FileStorageConfiguration configuration;
31+
private final Logger logger = Logger.getLogger(FolderManager.class.getName());
32+
private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
2733
@Nullable private ReadableFile currentReadableFile;
2834
@Nullable private WritableFile currentWritableFile;
2935

@@ -46,19 +52,33 @@ public FolderManager(File folder, FileStorageConfiguration configuration, Clock
4652
this.clock = clock;
4753
}
4854

55+
static class CacheFile {
56+
private final File file;
57+
private final long createdTimeMillis;
58+
59+
CacheFile(File file, long createdTimeMillis) {
60+
this.file = file;
61+
this.createdTimeMillis = createdTimeMillis;
62+
}
63+
64+
long getCreatedTimeMillis() {
65+
return createdTimeMillis;
66+
}
67+
}
68+
4969
@Override
5070
public void close() throws IOException {
5171
closeCurrentFiles();
5272
}
5373

5474
@Nullable
55-
public synchronized ReadableFile getReadableFile() throws IOException {
75+
public synchronized ReadableFile getReadableFile(Predicate<CacheFile> excludeFiles)
76+
throws IOException {
5677
currentReadableFile = null;
57-
File readableFile = findReadableFile();
58-
if (readableFile != null) {
78+
CacheFile selectedFile = selectReadableFile(listCacheFiles(excludeFiles));
79+
if (selectedFile != null) {
5980
currentReadableFile =
60-
new ReadableFile(
61-
readableFile, Long.parseLong(readableFile.getName()), clock, configuration);
81+
new ReadableFile(selectedFile.file, selectedFile.createdTimeMillis, clock, configuration);
6282
return currentReadableFile;
6383
}
6484
return null;
@@ -93,29 +113,56 @@ public synchronized void clear() throws IOException {
93113
}
94114
}
95115

116+
private List<CacheFile> listCacheFiles(Predicate<CacheFile> exclude) {
117+
File[] existingFiles = folder.listFiles();
118+
if (existingFiles == null) {
119+
return Collections.emptyList();
120+
}
121+
ArrayList<CacheFile> files = new ArrayList<>();
122+
for (File file : existingFiles) {
123+
CacheFile cacheFile = fileToCacheFile(file);
124+
if (cacheFile != null && !exclude.test(cacheFile)) {
125+
files.add(cacheFile);
126+
}
127+
}
128+
return Collections.unmodifiableList(files);
129+
}
130+
131+
@Nullable
132+
private CacheFile fileToCacheFile(File file) {
133+
String fileName = file.getName();
134+
if (!NUMBER_PATTERN.matcher(fileName).matches()) {
135+
logger.finer(String.format("Invalid cache file name: '%s'", fileName));
136+
return null;
137+
}
138+
return new CacheFile(file, Long.parseLong(fileName));
139+
}
140+
96141
@Nullable
97-
private File findReadableFile() throws IOException {
142+
private CacheFile selectReadableFile(List<CacheFile> files) throws IOException {
143+
if (files.isEmpty()) {
144+
return null;
145+
}
146+
98147
long currentTime = nowMillis(clock);
99-
File[] existingFiles = folder.listFiles();
100-
File oldestFileAvailable = null;
148+
CacheFile oldestFileAvailable = null;
101149
long oldestFileCreationTimeMillis = 0;
102-
if (existingFiles != null) {
103-
for (File existingFile : existingFiles) {
104-
long existingFileCreationTimeMillis = Long.parseLong(existingFile.getName());
105-
if (isReadyToBeRead(currentTime, existingFileCreationTimeMillis)
106-
&& !hasExpiredForReading(currentTime, existingFileCreationTimeMillis)) {
107-
if (oldestFileAvailable == null
108-
|| existingFileCreationTimeMillis < oldestFileCreationTimeMillis) {
109-
oldestFileCreationTimeMillis = existingFileCreationTimeMillis;
110-
oldestFileAvailable = existingFile;
111-
}
150+
for (CacheFile existingFile : files) {
151+
long existingFileCreationTimeMillis = existingFile.createdTimeMillis;
152+
if (isReadyToBeRead(currentTime, existingFileCreationTimeMillis)
153+
&& !hasExpiredForReading(currentTime, existingFileCreationTimeMillis)) {
154+
if (oldestFileAvailable == null
155+
|| existingFileCreationTimeMillis < oldestFileCreationTimeMillis) {
156+
oldestFileCreationTimeMillis = existingFileCreationTimeMillis;
157+
oldestFileAvailable = existingFile;
112158
}
113159
}
114160
}
161+
115162
// Checking if the oldest available file is currently the writable file.
116163
if (oldestFileAvailable != null
117164
&& currentWritableFile != null
118-
&& oldestFileAvailable.equals(currentWritableFile.getFile())) {
165+
&& oldestFileAvailable.file.equals(currentWritableFile.getFile())) {
119166
currentWritableFile.close();
120167
}
121168
return oldestFileAvailable;

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121
import java.util.Objects;
2222
import java.util.concurrent.atomic.AtomicBoolean;
2323
import java.util.concurrent.atomic.AtomicReference;
24+
import java.util.function.Predicate;
2425
import java.util.logging.Logger;
2526
import javax.annotation.Nullable;
2627

2728
public final class Storage<T> implements Closeable {
2829
private static final int MAX_ATTEMPTS = 3;
2930
private final Logger logger = Logger.getLogger(Storage.class.getName());
3031
private final FolderManager folderManager;
32+
private volatile Predicate<FolderManager.CacheFile> fileExclusion = file -> false;
3133
private final AtomicBoolean isClosed = new AtomicBoolean(false);
3234
private final AtomicBoolean activeReadResultAvailable = new AtomicBoolean(false);
3335
private final AtomicReference<WritableFile> writableFileRef = new AtomicReference<>();
@@ -91,7 +93,11 @@ public ReadableResult<T> readNext(SignalDeserializer<T> deserializer) throws IOE
9193
throw new IllegalStateException(
9294
"You must close any previous ReadableResult before requesting a new one");
9395
}
94-
return doReadNext(deserializer, 1);
96+
ReadableResult<T> result = doReadNext(deserializer, 1);
97+
if (result == null) {
98+
fileExclusion = file -> false;
99+
}
100+
return result;
95101
}
96102

97103
@Nullable
@@ -106,9 +112,13 @@ private ReadableResult<T> doReadNext(SignalDeserializer<T> deserializer, int att
106112
return null;
107113
}
108114
ReadableFile readableFile = readableFileRef.get();
115+
if (readableFile != null && readableFile.isClosed()) {
116+
readableFileRef.set(null);
117+
readableFile = null;
118+
}
109119
if (readableFile == null) {
110120
logger.finer("Obtaining a new readableFile from the folderManager.");
111-
readableFile = folderManager.getReadableFile();
121+
readableFile = folderManager.getReadableFile(Objects.requireNonNull(fileExclusion));
112122
readableFileRef.set(readableFile);
113123
if (readableFile == null) {
114124
logger.fine("Unable to get or create readable file.");
@@ -117,19 +127,27 @@ private ReadableResult<T> doReadNext(SignalDeserializer<T> deserializer, int att
117127
}
118128

119129
logger.finer("Attempting to read data from " + readableFile);
120-
byte[] result = readableFile.readNext();
121-
if (result != null) {
122-
try {
123-
List<T> items = deserializer.deserialize(result);
124-
activeReadResultAvailable.set(true);
125-
return new FileReadResult(items, readableFile);
126-
} catch (DeserializationException e) {
127-
// Data corrupted, clear file.
128-
readableFile.clear();
130+
long currentFileCreatedTime = readableFile.getCreatedTimeMillis();
131+
try {
132+
byte[] result = readableFile.readNext();
133+
if (result != null) {
134+
try {
135+
List<T> items = deserializer.deserialize(result);
136+
activeReadResultAvailable.set(true);
137+
return new FileReadResult(items, readableFile);
138+
} catch (DeserializationException e) {
139+
// Data corrupted, clear file.
140+
readableFile.clear();
141+
}
129142
}
143+
} catch (IOException e) {
144+
// Proto data corrupted, clear file.
145+
readableFile.clear();
130146
}
131147

132-
// Retry with new file
148+
// Search for newer files than the current one.
149+
fileExclusion = file -> file.getCreatedTimeMillis() <= currentFileCreatedTime;
150+
readableFile.close();
133151
readableFileRef.set(null);
134152
return doReadNext(deserializer, ++attemptNumber);
135153
}

disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageIterator.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
final class StorageIterator<T> implements Iterator<Collection<T>> {
2020
private final Storage<T> storage;
2121
private final SignalDeserializer<T> deserializer;
22+
private final boolean deleteOnIteration;
2223
private final Logger logger = Logger.getLogger(StorageIterator.class.getName());
2324

2425
@GuardedBy("this")
@@ -28,9 +29,14 @@ final class StorageIterator<T> implements Iterator<Collection<T>> {
2829
@GuardedBy("this")
2930
private boolean currentResultConsumed = false;
3031

31-
StorageIterator(Storage<T> storage, SignalDeserializer<T> deserializer) {
32+
@GuardedBy("this")
33+
private boolean removeAllowed = false;
34+
35+
StorageIterator(
36+
Storage<T> storage, SignalDeserializer<T> deserializer, boolean deleteOnIteration) {
3237
this.storage = storage;
3338
this.deserializer = deserializer;
39+
this.deleteOnIteration = deleteOnIteration;
3440
}
3541

3642
@Override
@@ -49,13 +55,18 @@ public synchronized Collection<T> next() {
4955
}
5056
if (findNext()) {
5157
currentResultConsumed = true;
58+
removeAllowed = true;
5259
return Objects.requireNonNull(currentResult).getContent();
5360
}
5461
return null;
5562
}
5663

5764
@Override
5865
public synchronized void remove() {
66+
if (!removeAllowed) {
67+
throw new IllegalStateException("next() must be called before remove()");
68+
}
69+
removeAllowed = false;
5970
if (currentResult != null) {
6071
try {
6172
currentResult.delete();
@@ -71,7 +82,9 @@ private synchronized boolean findNext() {
7182
if (!currentResultConsumed) {
7283
return true;
7384
}
74-
currentResult.delete();
85+
if (deleteOnIteration) {
86+
currentResult.delete();
87+
}
7588
currentResult.close();
7689
currentResult = null;
7790
}

0 commit comments

Comments
 (0)