-
Notifications
You must be signed in to change notification settings - Fork 178
Disk buffering iterator explicit removal config option #2560
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fb5f066
5f46a84
f2e250a
8f86310
fd9fa9f
052794f
d332e0a
24339d9
c472f74
5a27d75
24cc173
2a9ad80
6b3314e
ce56bef
daae2bc
7be2529
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,13 +21,15 @@ | |
| import java.util.Objects; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.function.Predicate; | ||
| import java.util.logging.Logger; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| public final class Storage<T> implements Closeable { | ||
| private static final int MAX_ATTEMPTS = 3; | ||
| private final Logger logger = Logger.getLogger(Storage.class.getName()); | ||
| private final FolderManager folderManager; | ||
| private volatile Predicate<FolderManager.CacheFile> fileExclusion = file -> false; | ||
| private final AtomicBoolean isClosed = new AtomicBoolean(false); | ||
| private final AtomicBoolean activeReadResultAvailable = new AtomicBoolean(false); | ||
| private final AtomicReference<WritableFile> writableFileRef = new AtomicReference<>(); | ||
|
|
@@ -91,7 +93,11 @@ public ReadableResult<T> readNext(SignalDeserializer<T> deserializer) throws IOE | |
| throw new IllegalStateException( | ||
| "You must close any previous ReadableResult before requesting a new one"); | ||
| } | ||
| return doReadNext(deserializer, 1); | ||
| ReadableResult<T> result = doReadNext(deserializer, 1); | ||
| if (result == null) { | ||
| fileExclusion = file -> false; | ||
| } | ||
| return result; | ||
| } | ||
|
|
||
| @Nullable | ||
|
|
@@ -106,9 +112,13 @@ private ReadableResult<T> doReadNext(SignalDeserializer<T> deserializer, int att | |
| return null; | ||
| } | ||
| ReadableFile readableFile = readableFileRef.get(); | ||
| if (readableFile != null && readableFile.isClosed()) { | ||
| readableFileRef.set(null); | ||
| readableFile = null; | ||
| } | ||
|
LikeTheSalad marked this conversation as resolved.
|
||
| if (readableFile == null) { | ||
| logger.finer("Obtaining a new readableFile from the folderManager."); | ||
| readableFile = folderManager.getReadableFile(); | ||
| readableFile = folderManager.getReadableFile(Objects.requireNonNull(fileExclusion)); | ||
| readableFileRef.set(readableFile); | ||
| if (readableFile == null) { | ||
| logger.fine("Unable to get or create readable file."); | ||
|
|
@@ -117,19 +127,27 @@ private ReadableResult<T> doReadNext(SignalDeserializer<T> deserializer, int att | |
| } | ||
|
|
||
| logger.finer("Attempting to read data from " + readableFile); | ||
| byte[] result = readableFile.readNext(); | ||
| if (result != null) { | ||
| try { | ||
| List<T> items = deserializer.deserialize(result); | ||
| activeReadResultAvailable.set(true); | ||
| return new FileReadResult(items, readableFile); | ||
| } catch (DeserializationException e) { | ||
| // Data corrupted, clear file. | ||
| readableFile.clear(); | ||
| long currentFileCreatedTime = readableFile.getCreatedTimeMillis(); | ||
| try { | ||
| byte[] result = readableFile.readNext(); | ||
| if (result != null) { | ||
| try { | ||
| List<T> items = deserializer.deserialize(result); | ||
| activeReadResultAvailable.set(true); | ||
| return new FileReadResult(items, readableFile); | ||
| } catch (DeserializationException e) { | ||
| // Data corrupted, clear file. | ||
| readableFile.clear(); | ||
| } | ||
| } | ||
| } catch (IOException e) { | ||
| // Proto data corrupted, clear file. | ||
| readableFile.clear(); | ||
| } | ||
|
|
||
| // Retry with new file | ||
| // Search for newer files than the current one. | ||
| fileExclusion = file -> file.getCreatedTimeMillis() <= currentFileCreatedTime; | ||
| readableFile.close(); | ||
| readableFileRef.set(null); | ||
| return doReadNext(deserializer, ++attemptNumber); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not related, but loops are better than recursion as they don't pollute the stack.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's been ages since I wrote this part. I honestly don't remember why I chose recursion, but I think it's because on a happy path, it should only be executed once, so the recursion would only be present on exceptions, hence an iteration was not the expected, common behavior. I think I understand what you're saying, but so far it doesn't seem to cause an issue in practice, and it also doesn't seem to be a trivial change, so it wouldn't fall under my current set of priorities. Still, if you're interested in finding an alternative solution, feel free to open a PR, and I'll take a look 👍 |
||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍