Skip to content

Commit 6d3ba38

Browse files
authored
[server] Keep flushedLogOffset monotonic to fix row count after failover (#3427)
* [server] Keep flushedLogOffset monotonic to fix row count after failover * add a ut test
1 parent 02e40ec commit 6d3ba38

2 files changed

Lines changed: 38 additions & 1 deletion

File tree

fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,9 @@ public void flush(long exclusiveUpToLogOffset, FatalErrorHandler fatalErrorHandl
687687
} else {
688688
try {
689689
int rowCountDiff = kvPreWriteBuffer.flush(exclusiveUpToLogOffset);
690-
flushedLogOffset = exclusiveUpToLogOffset;
690+
if (exclusiveUpToLogOffset > flushedLogOffset) {
691+
flushedLogOffset = exclusiveUpToLogOffset;
692+
}
691693
if (rowCount != ROW_COUNT_DISABLED) {
692694
// row count is enabled, we update the row count after flush.
693695
long currentRowCount = rowCount;

fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.fluss.server.kv.rowmerger.RowMerger;
6060
import org.apache.fluss.server.kv.scan.OpenScanResult;
6161
import org.apache.fluss.server.kv.scan.ScannerContext;
62+
import org.apache.fluss.server.kv.snapshot.TabletState;
6263
import org.apache.fluss.server.log.FetchIsolation;
6364
import org.apache.fluss.server.log.LogAppendInfo;
6465
import org.apache.fluss.server.log.LogTablet;
@@ -1861,6 +1862,40 @@ void testRowCountWithMixedOperations() throws Exception {
18611862
kvTablet.close();
18621863
}
18631864

1865+
@Test
1866+
void testFlushDoesNotRegressFlushedLogOffset() throws Exception {
1867+
initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>());
1868+
1869+
List<KvRecord> inserts = new ArrayList<>();
1870+
for (int i = 1; i <= 5; i++) {
1871+
inserts.add(kvRecordFactory.ofRecord("key" + i, new Object[] {i, "val" + i}));
1872+
}
1873+
kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(inserts), null);
1874+
1875+
List<KvRecord> deletes = new ArrayList<>();
1876+
for (int i = 1; i <= 2; i++) {
1877+
deletes.add(kvRecordFactory.ofRecord("key" + i, null));
1878+
}
1879+
kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(deletes), null);
1880+
1881+
long highOffset = logTablet.localLogEndOffset();
1882+
kvTablet.flush(highOffset, NOPErrorHandler.INSTANCE);
1883+
TabletState stateAfterHighFlush = kvTablet.getTabletState();
1884+
assertThat(stateAfterHighFlush.getFlushedLogOffset()).isEqualTo(highOffset);
1885+
assertThat(stateAfterHighFlush.getRowCount()).isEqualTo(3L);
1886+
assertThat(kvTablet.getRowCount()).isEqualTo(3);
1887+
1888+
long lowOffset = highOffset - 2;
1889+
kvTablet.flush(lowOffset, NOPErrorHandler.INSTANCE);
1890+
TabletState stateAfterLowFlush = kvTablet.getTabletState();
1891+
1892+
assertThat(stateAfterLowFlush.getFlushedLogOffset()).isEqualTo(highOffset);
1893+
assertThat(stateAfterLowFlush.getRowCount()).isEqualTo(3L);
1894+
assertThat(kvTablet.getRowCount()).isEqualTo(3);
1895+
1896+
kvTablet.close();
1897+
}
1898+
18641899
@Test
18651900
void testOpenScan_emptyBucket_returnsNullContext() throws Exception {
18661901
initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>());

0 commit comments

Comments
 (0)