Skip to content

Commit cfbcc24

Browse files
authored
Fix that WALBuffer waits for flush instead of file-roll (#17628)
1 parent f92e9c0 commit cfbcc24

5 files changed

Lines changed: 446 additions & 13 deletions

File tree

.idea/icon.png

-6.58 KB
Binary file not shown.

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/IWALBuffer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public interface IWALBuffer extends AutoCloseable {
5151
*
5252
* @throws InterruptedException when interrupted by the flush thread
5353
*/
54-
void waitForFlush() throws InterruptedException;
54+
void waitForRollFile() throws InterruptedException;
5555

5656
/**
5757
* Wait for next flush operation done, if the predicate == true after entering a locked
@@ -60,14 +60,14 @@ public interface IWALBuffer extends AutoCloseable {
6060
* @param waitPredicate the condition which should be satisfied before waiting.
6161
* @throws InterruptedException when interrupted by the flush thread
6262
*/
63-
public void waitForFlush(Predicate<WALBuffer> waitPredicate) throws InterruptedException;
63+
public void waitForRollFile(Predicate<WALBuffer> waitPredicate) throws InterruptedException;
6464

6565
/**
6666
* Wait for next flush operation done.
6767
*
6868
* @throws InterruptedException when interrupted by the flush thread
6969
*/
70-
boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException;
70+
boolean waitForRollFile(long time, TimeUnit unit) throws InterruptedException;
7171

7272
/** Return true when all wal entries all consumed and flushed. */
7373
boolean isAllWALEntriesConsumed();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public class WALBuffer extends AbstractWALBuffer {
9090
private final Lock buffersLock = new ReentrantLock();
9191
// condition to guarantee correctness of switching buffers
9292
private final Condition idleBufferReadyCondition = buffersLock.newCondition();
93+
private final Condition rollLogWriterCondition = buffersLock.newCondition();
9394
// last writer position when fsync is called, help record each entry's position
9495
private long lastFsyncPosition;
9596

@@ -170,6 +171,13 @@ private int getCompressedByteBufferSize(int size) {
170171
protected File rollLogWriter(long searchIndex, WALFileStatus fileStatus) throws IOException {
171172
File file = super.rollLogWriter(searchIndex, fileStatus);
172173
currentWALFileWriter.setCompressedByteBuffer(compressedByteBuffer);
174+
buffersLock.lock();
175+
try {
176+
// notify WALReader that new file is generated, and it can read new file
177+
rollLogWriterCondition.signalAll();
178+
} finally {
179+
buffersLock.unlock();
180+
}
173181
return file;
174182
}
175183

@@ -656,7 +664,7 @@ private void switchSyncingBufferToIdle() {
656664
}
657665

658666
@Override
659-
public void waitForFlush() throws InterruptedException {
667+
public void waitForRollFile() throws InterruptedException {
660668
buffersLock.lock();
661669
try {
662670
idleBufferReadyCondition.await();
@@ -666,22 +674,22 @@ public void waitForFlush() throws InterruptedException {
666674
}
667675

668676
@Override
669-
public void waitForFlush(Predicate<WALBuffer> waitPredicate) throws InterruptedException {
677+
public void waitForRollFile(Predicate<WALBuffer> waitPredicate) throws InterruptedException {
670678
buffersLock.lock();
671679
try {
672680
if (waitPredicate.test(this)) {
673-
idleBufferReadyCondition.await();
681+
rollLogWriterCondition.await();
674682
}
675683
} finally {
676684
buffersLock.unlock();
677685
}
678686
}
679687

680688
@Override
681-
public boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException {
689+
public boolean waitForRollFile(long time, TimeUnit unit) throws InterruptedException {
682690
buffersLock.lock();
683691
try {
684-
return idleBufferReadyCondition.await(time, unit);
692+
return rollLogWriterCondition.await(time, unit);
685693
} finally {
686694
buffersLock.unlock();
687695
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public class WALNode implements IWALNode {
9696
// no iot consensus, all insert nodes can be safely deleted
9797
public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX = Long.MAX_VALUE;
9898
// timeout threshold when waiting for next wal entry
99-
private static final long WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC = 30;
99+
public static final long WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC = 30;
100100
private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance();
101101

102102
// unique identifier of this WALNode
@@ -792,7 +792,7 @@ public void waitForNextReady() throws InterruptedException {
792792
while (!hasNext()) {
793793
if (!walFileRolled) {
794794
boolean timeout =
795-
!buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
795+
!buffer.waitForRollFile(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
796796
if (timeout) {
797797
bufferLastSearchIndex = buffer.getCurrentSearchIndex();
798798
logger.info(
@@ -805,7 +805,7 @@ public void waitForNextReady() throws InterruptedException {
805805
} else {
806806
// only wait when the search index of the buffer remains the same as the previous check
807807
long finalBufferLastSearchIndex = bufferLastSearchIndex;
808-
buffer.waitForFlush(buf -> buf.getCurrentSearchIndex() == finalBufferLastSearchIndex);
808+
buffer.waitForRollFile(buf -> buf.getCurrentSearchIndex() == finalBufferLastSearchIndex);
809809
}
810810
}
811811
}
@@ -814,8 +814,8 @@ public void waitForNextReady() throws InterruptedException {
814814
public void waitForNextReady(long time, TimeUnit unit)
815815
throws InterruptedException, TimeoutException {
816816
if (!hasNext()) {
817-
boolean timeout = !buffer.waitForFlush(time, unit);
818-
if (timeout || !hasNext()) {
817+
boolean timeout = !buffer.waitForRollFile(time, unit);
818+
if (timeout && !hasNext()) {
819819
throw new TimeoutException();
820820
}
821821
}

0 commit comments

Comments
 (0)