Skip to content

Commit 6a1961b

Browse files
authored
[To dev/1.3] Fix that WALBuffer waits for flush instead of file-roll (#17628) (#17633)
* Fix that WALBuffer waits for flush instead of file-roll (#17628) (cherry picked from commit cfbcc24) * fix test compilation
1 parent 9b5973e commit 6a1961b

4 files changed

Lines changed: 427 additions & 13 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/IWALBuffer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public interface IWALBuffer extends AutoCloseable {
5151
*
5252
* @throws InterruptedException when interrupted by the flush thread
5353
*/
54-
void waitForFlush() throws InterruptedException;
54+
void waitForRollFile() throws InterruptedException;
5555

5656
/**
5757
* Wait for next flush operation done, if the predicate == true after entering a locked
@@ -60,14 +60,14 @@ public interface IWALBuffer extends AutoCloseable {
6060
* @param waitPredicate the condition which should be satisfied before waiting.
6161
* @throws InterruptedException when interrupted by the flush thread
6262
*/
63-
public void waitForFlush(Predicate<WALBuffer> waitPredicate) throws InterruptedException;
63+
public void waitForRollFile(Predicate<WALBuffer> waitPredicate) throws InterruptedException;
6464

6565
/**
6666
* Wait for next flush operation done.
6767
*
6868
* @throws InterruptedException when interrupted by the flush thread
6969
*/
70-
boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException;
70+
boolean waitForRollFile(long time, TimeUnit unit) throws InterruptedException;
7171

7272
/** Return true when all wal entries all consumed and flushed. */
7373
boolean isAllWALEntriesConsumed();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public class WALBuffer extends AbstractWALBuffer {
8888
private final Lock buffersLock = new ReentrantLock();
8989
// condition to guarantee correctness of switching buffers
9090
private final Condition idleBufferReadyCondition = buffersLock.newCondition();
91+
private final Condition rollLogWriterCondition = buffersLock.newCondition();
9192
// last writer position when fsync is called, help record each entry's position
9293
private long lastFsyncPosition;
9394

@@ -168,6 +169,13 @@ private int getCompressedByteBufferSize(int size) {
168169
protected File rollLogWriter(long searchIndex, WALFileStatus fileStatus) throws IOException {
169170
File file = super.rollLogWriter(searchIndex, fileStatus);
170171
currentWALFileWriter.setCompressedByteBuffer(compressedByteBuffer);
172+
buffersLock.lock();
173+
try {
174+
// notify WALReader that new file is generated, and it can read new file
175+
rollLogWriterCondition.signalAll();
176+
} finally {
177+
buffersLock.unlock();
178+
}
171179
return file;
172180
}
173181

@@ -640,7 +648,7 @@ private void switchSyncingBufferToIdle() {
640648
}
641649

642650
@Override
643-
public void waitForFlush() throws InterruptedException {
651+
public void waitForRollFile() throws InterruptedException {
644652
buffersLock.lock();
645653
try {
646654
idleBufferReadyCondition.await();
@@ -650,22 +658,22 @@ public void waitForFlush() throws InterruptedException {
650658
}
651659

652660
@Override
653-
public void waitForFlush(Predicate<WALBuffer> waitPredicate) throws InterruptedException {
661+
public void waitForRollFile(Predicate<WALBuffer> waitPredicate) throws InterruptedException {
654662
buffersLock.lock();
655663
try {
656664
if (waitPredicate.test(this)) {
657-
idleBufferReadyCondition.await();
665+
rollLogWriterCondition.await();
658666
}
659667
} finally {
660668
buffersLock.unlock();
661669
}
662670
}
663671

664672
@Override
665-
public boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException {
673+
public boolean waitForRollFile(long time, TimeUnit unit) throws InterruptedException {
666674
buffersLock.lock();
667675
try {
668-
return idleBufferReadyCondition.await(time, unit);
676+
return rollLogWriterCondition.await(time, unit);
669677
} finally {
670678
buffersLock.unlock();
671679
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public class WALNode implements IWALNode {
9292
// no iot consensus, all insert nodes can be safely deleted
9393
public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX = Long.MAX_VALUE;
9494
// timeout threshold when waiting for next wal entry
95-
private static final long WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC = 30;
95+
public static final long WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC = 30;
9696
private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance();
9797

9898
// unique identifier of this WALNode
@@ -733,7 +733,7 @@ public void waitForNextReady() throws InterruptedException {
733733
while (!hasNext()) {
734734
if (!walFileRolled) {
735735
boolean timeout =
736-
!buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
736+
!buffer.waitForRollFile(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
737737
if (timeout) {
738738
bufferLastSearchIndex = buffer.getCurrentSearchIndex();
739739
logger.info(
@@ -746,7 +746,7 @@ public void waitForNextReady() throws InterruptedException {
746746
} else {
747747
// only wait when the search index of the buffer remains the same as the previous check
748748
long finalBufferLastSearchIndex = bufferLastSearchIndex;
749-
buffer.waitForFlush(buf -> buf.getCurrentSearchIndex() == finalBufferLastSearchIndex);
749+
buffer.waitForRollFile(buf -> buf.getCurrentSearchIndex() == finalBufferLastSearchIndex);
750750
}
751751
}
752752
}
@@ -755,8 +755,8 @@ public void waitForNextReady() throws InterruptedException {
755755
public void waitForNextReady(long time, TimeUnit unit)
756756
throws InterruptedException, TimeoutException {
757757
if (!hasNext()) {
758-
boolean timeout = !buffer.waitForFlush(time, unit);
759-
if (timeout || !hasNext()) {
758+
boolean timeout = !buffer.waitForRollFile(time, unit);
759+
if (timeout && !hasNext()) {
760760
throw new TimeoutException();
761761
}
762762
}

0 commit comments

Comments
 (0)