Skip to content

Commit 56dc6de

Browse files
authored
[server] Fix TabletServer fails to restart after unclean shutdown due to EOFException in log recovery (#2942)
1 parent d4cd1a2 commit 56dc6de

3 files changed

Lines changed: 179 additions & 148 deletions

File tree

fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java

Lines changed: 54 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.fluss.exception.LogSegmentOffsetOverflowException;
2424
import org.apache.fluss.exception.LogStorageException;
2525
import org.apache.fluss.metadata.LogFormat;
26-
import org.apache.fluss.server.exception.CorruptIndexException;
2726
import org.apache.fluss.utils.FlussPaths;
2827
import org.apache.fluss.utils.types.Tuple2;
2928

@@ -137,6 +136,7 @@ public LoadedLogOffsets load() throws IOException {
137136
*/
138137
private Tuple2<Long, Long> recoverLog() throws IOException {
139138
if (!isCleanShutdown) {
139+
long recoverLogStart = System.currentTimeMillis();
140140
List<LogSegment> unflushed =
141141
logSegments.values(recoveryPointCheckpoint, Long.MAX_VALUE);
142142
int numUnflushed = unflushed.size();
@@ -153,50 +153,57 @@ private Tuple2<Long, Long> recoverLog() throws IOException {
153153
numUnflushed,
154154
logSegments.getTableBucket());
155155

156+
int truncatedBytes = -1;
156157
try {
157-
segment.sanityCheck();
158-
} catch (NoSuchFileException | CorruptIndexException e) {
158+
truncatedBytes = recoverSegment(segment);
159+
} catch (InvalidOffsetException e) {
160+
long startOffset = segment.getBaseOffset();
159161
LOG.warn(
160-
"Found invalid index file corresponding log file {} for bucket {}, "
161-
+ "recovering segment and rebuilding index files...",
162-
segment.getFileLogRecords().file().getAbsoluteFile(),
162+
"Found invalid offset during recovery for bucket {}. Deleting the corrupt segment "
163+
+ "and creating an empty one with starting offset {}",
163164
logSegments.getTableBucket(),
164-
e);
165-
166-
int truncatedBytes = -1;
167-
try {
168-
truncatedBytes = recoverSegment(segment);
169-
} catch (InvalidOffsetException invalidOffsetException) {
170-
long startOffset = segment.getBaseOffset();
171-
LOG.warn(
172-
"Found invalid offset during recovery for bucket {}. Deleting the corrupt segment "
173-
+ "and creating an empty one with starting offset {}",
174-
logSegments.getTableBucket(),
175-
startOffset);
176-
truncatedBytes = segment.truncateTo(startOffset);
177-
}
165+
startOffset);
166+
truncatedBytes = segment.truncateTo(startOffset);
167+
}
178168

179-
if (truncatedBytes > 0) {
180-
// we had an invalid message, delete all remaining log
181-
LOG.warn(
182-
"Corruption found in segment {} for bucket {}, truncating to offset {}",
183-
segment.getBaseOffset(),
184-
logSegments.getTableBucket(),
185-
segment.readNextOffset());
186-
removeAndDeleteSegments(unflushedIter);
187-
truncated = true;
188-
}
169+
if (truncatedBytes > 0) {
170+
// we had an invalid message, delete all remaining log
171+
LOG.warn(
172+
"Corruption found in segment {} for bucket {}, truncating to offset {}",
173+
segment.getBaseOffset(),
174+
logSegments.getTableBucket(),
175+
segment.readNextOffset());
176+
removeAndDeleteSegments(unflushedIter);
177+
truncated = true;
178+
} else {
179+
numFlushed += 1;
189180
}
190-
numFlushed += 1;
191181
}
182+
long recoverLogEnd = System.currentTimeMillis();
183+
LOG.info(
184+
"Log recovery completed for bucket {} in {} ms",
185+
logSegments.getTableBucket(),
186+
recoverLogEnd - recoverLogStart);
192187
}
193188

194-
// TODO truncate log to recover maybe unflush segments.
195189
if (logSegments.isEmpty()) {
190+
// TODO: use logStartOffset if issue https://github.com/apache/fluss/issues/744 ready
196191
logSegments.add(LogSegment.open(logTabletDir, 0L, conf, logFormat));
197192
}
193+
198194
long logEndOffset = logSegments.lastSegment().get().readNextOffset();
199-
return Tuple2.of(recoveryPointCheckpoint, logEndOffset);
195+
196+
// Update the recovery point if there was a clean shutdown and did not perform any changes
197+
// to the segment. Otherwise, we just ensure that the recovery point is not ahead of the log
198+
// end offset. To ensure correctness and to make it easier to reason about, it's best to
199+
// only advance the recovery point when the log is flushed. If we advanced the recovery
200+
// point here, we could skip recovery for unflushed segments if the server crashed after we
201+
// checkpointed the recovery point and before we flush the segment.
202+
if (isCleanShutdown) {
203+
return Tuple2.of(logEndOffset, logEndOffset);
204+
} else {
205+
return Tuple2.of(Math.min(recoveryPointCheckpoint, logEndOffset), logEndOffset);
206+
}
200207
}
201208

202209
/**
@@ -294,6 +301,20 @@ private void loadSegmentFiles() throws IOException {
294301
long baseOffset = FlussPaths.offsetFromFile(file);
295302
LogSegment segment =
296303
LogSegment.open(logTabletDir, baseOffset, conf, true, 0, logFormat);
304+
305+
try {
306+
segment.sanityCheck();
307+
} catch (NoSuchFileException e) {
308+
if (isCleanShutdown
309+
|| segment.getBaseOffset() < recoveryPointCheckpoint) {
310+
LOG.error(
311+
"Could not find offset index file corresponding to log file {} "
312+
+ "for bucket {}, recovering segment and rebuilding index files...",
313+
segment.getFileLogRecords().file().getAbsoluteFile(),
314+
logSegments.getTableBucket());
315+
}
316+
recoverSegment(segment);
317+
}
297318
logSegments.add(segment);
298319
}
299320
}

fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.config.ConfigOptions;
2222
import org.apache.fluss.config.Configuration;
23+
import org.apache.fluss.exception.CorruptMessageException;
2324
import org.apache.fluss.exception.CorruptRecordException;
2425
import org.apache.fluss.exception.InvalidColumnProjectionException;
2526
import org.apache.fluss.exception.InvalidRecordException;
@@ -188,15 +189,17 @@ public void sanityCheck() throws IOException {
188189
+ lazyOffsetIndex.file().getAbsolutePath()
189190
+ " does not exist.");
190191
}
191-
lazyOffsetIndex.get().sanityCheck();
192192

193193
if (!lazyTimeIndex.file().exists()) {
194194
throw new NoSuchFileException(
195195
"Time index file "
196196
+ lazyTimeIndex.file().getAbsolutePath()
197197
+ " does not exist.");
198198
}
199-
lazyTimeIndex.get().sanityCheck();
199+
200+
// Sanity checks for time index and offset index are skipped because
201+
// we will recover the segments above the recovery point in recoverLog()
202+
// in any case so sanity checking them here is redundant.
200203
}
201204

202205
/**
@@ -324,41 +327,45 @@ public int recover() throws IOException {
324327

325328
// The max timestamp is exposed at the batch level, so no need to iterate the
326329
// records
327-
if (batch.commitTimestamp() > maxTimestampSoFar()) {
330+
if (batch.commitTimestamp() > maxTimestampAndStartOffsetSoFar.timestamp) {
328331
maxTimestampAndStartOffsetSoFar =
329332
new TimestampOffset(batch.commitTimestamp(), batch.baseLogOffset());
330333
}
331334

332335
if (validBytes - lastIndexEntry > indexIntervalBytes) {
333336
offsetIndex().append(batch.lastLogOffset(), validBytes);
334-
timeIndex().maybeAppend(maxTimestampSoFar(), startOffsetOfMaxTimestampSoFar());
337+
timeIndex()
338+
.maybeAppend(
339+
maxTimestampAndStartOffsetSoFar.timestamp,
340+
maxTimestampAndStartOffsetSoFar.offset);
335341
lastIndexEntry = validBytes;
336342
}
337343

338344
// TODO Adding assign partition leader epoch follow KIP-101
339345

340346
validBytes += batch.sizeInBytes();
341347
}
342-
} catch (CorruptRecordException | InvalidRecordException e) {
348+
} catch (CorruptRecordException
349+
| InvalidRecordException
350+
| CorruptMessageException
351+
| IllegalArgumentException
352+
| IndexOutOfBoundsException e) {
353+
// Data corruption detected during recovery: CRC mismatch, invalid record format,
354+
// or truncated batch from an interrupted write. Truncate from this point onward.
343355
LOG.warn(
344-
"Found invalid messages in log segment "
345-
+ fileLogRecords.file().getAbsolutePath()
346-
+ " at byte offset "
347-
+ validBytes
348-
+ ": "
349-
+ e.getMessage()
350-
+ ". "
351-
+ e.getCause());
356+
"Found invalid messages in log segment {} at byte offset {}: {}. {}",
357+
fileLogRecords.file().getAbsolutePath(),
358+
validBytes,
359+
e.getMessage(),
360+
e.getCause());
352361
}
353362

354363
int truncated = fileLogRecords.sizeInBytes() - validBytes;
355364
if (truncated > 0) {
356365
LOG.debug(
357-
"Truncated "
358-
+ truncated
359-
+ " invalid bytes at the end of segment "
360-
+ fileLogRecords.file().getAbsolutePath()
361-
+ " during recovery");
366+
"Truncated {} invalid bytes at the end of segment {} during recovery",
367+
truncated,
368+
fileLogRecords.file().getAbsolutePath());
362369
}
363370
fileLogRecords.truncateTo(validBytes);
364371
offsetIndex().trimToValidSize();

0 commit comments

Comments
 (0)