Skip to content

Commit a8fb1c9

Browse files
CASSANDRA-21134: Fix O_DIRECT short read on early-open of compaction SSTables
openFinalEarly() publishes a reader right after sync() but before commit. Under O_DIRECT the inherited sync flushes only whole blocks, so the last chunk's sub-block tail stays buffered and the early reader short-reads past EOF (CorruptBlockException). Override syncInternal() to also flush the buffered tail, restoring the channel and buffer cursors afterwards so the authoritative pad+truncate at commit still produces a byte-identical file.
1 parent 7d468f8 commit a8fb1c9

2 files changed

Lines changed: 92 additions & 0 deletions

File tree

src/java/org/apache/cassandra/io/compress/DirectCompressedSequentialWriter.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,17 @@ protected void seekToChunkStart()
148148
// resetAndTruncate (the parent's reason for this seek) is unsupported under DIO.
149149
}
150150

151+
// openFinalEarly() publishes a reader right after sync(), before commit. The inherited sync flushes
152+
// only whole blocks, so under O_DIRECT the last chunk's sub-block tail is still buffered and that
153+
// reader short-reads past EOF — flush the tail here too.
154+
@Override
155+
protected void syncInternal()
156+
{
157+
doFlush(0);
158+
flushBufferedTailForEarlyOpen();
159+
syncDataOnlyInternal();
160+
}
161+
151162
@Override
152163
protected void writeChunk(ByteBuffer toWrite)
153164
{
@@ -229,6 +240,30 @@ private void flushCompleteBlocks()
229240
}
230241
}
231242

243+
// Restoring the channel and buffer cursors afterwards is what keeps the writer untouched, so the
244+
// authoritative pad+truncate at commit still produces the identical file. The over-block padding is
245+
// safe: reads are bounded by compressedFileLength and commit overwrites or truncates it away.
246+
private void flushBufferedTailForEarlyOpen()
247+
{
248+
int logicalPos = writeBuffer.position();
249+
if (logicalPos == 0)
250+
return;
251+
252+
try
253+
{
254+
long resumeChannelPos = fchannel.position();
255+
writeAlignedBlocks(BitUtil.align(logicalPos, blockSize));
256+
257+
fchannel.position(resumeChannelPos);
258+
writeBuffer.limit(writeBuffer.capacity());
259+
writeBuffer.position(logicalPos);
260+
}
261+
catch (IOException e)
262+
{
263+
throw new FSWriteError(e, getPath());
264+
}
265+
}
266+
232267
private void flushFinalWithPadding()
233268
{
234269
int logicalPos = writeBuffer.position();

test/unit/org/apache/cassandra/io/compress/DirectCompressedSequentialWriterTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,63 @@ public void testOnDiskFilePointerTracksActualDataSize() throws IOException
810810
}
811811
}
812812

813+
/**
814+
* Covers the early-open read path: openFinalEarly() publishes a reader after sync() but before
815+
* commit, and under O_DIRECT the last chunk's sub-block tail is still buffered then, so without the
816+
* sync() tail-flush the reader short-reads past EOF. The other tests miss this window — they all read
817+
* post-finish, once the tail is flushed. Tiny sub-block outputs (e.g. system/IndexInfo) are the prod shape.
818+
*/
819+
@Test
820+
public void testEarlyOpenAfterSyncReadsBackData() throws IOException
821+
{
822+
for (int size : new int[]{ 1, 64, 298, 1024, 4096, DEFAULT_CHUNK_LENGTH * 3 + 137 })
823+
assertEarlyOpenReadsBack(compressible(size), CompressionParams.lz4());
824+
}
825+
826+
// Compresses well below one block, so after sync() the whole chunk+CRC is still buffered and the
827+
// on-disk file is empty — the worst case for the early-open read.
828+
private static byte[] compressible(int size)
829+
{
830+
byte[] data = new byte[size];
831+
for (int i = 0; i < size; i++)
832+
data[i] = (byte) (i % 7);
833+
return data;
834+
}
835+
836+
private static void assertEarlyOpenReadsBack(byte[] payload, CompressionParams params) throws IOException
837+
{
838+
File dataFile = FileUtils.createTempFile("early_open_direct", ".db");
839+
File metadataFile = new File(dataFile.absolutePath() + ".metadata");
840+
try
841+
{
842+
MetadataCollector collector = newCollector();
843+
try (DirectCompressedSequentialWriter writer = new DirectCompressedSequentialWriter(
844+
dataFile, metadataFile, null, SequentialWriterOption.DEFAULT, params, collector, null))
845+
{
846+
writer.write(payload);
847+
848+
// Mirror openFinalEarly: sync(), then open(0) == openDataFile(NO_LENGTH_OVERRIDE).
849+
writer.sync();
850+
try (CompressionMetadata md = writer.open(0);
851+
FileHandle fh = new FileHandle.Builder(dataFile).withCompressionMetadata(md).complete();
852+
RandomAccessReader reader = fh.createReader())
853+
{
854+
assertEquals("early-open length mismatch, size=" + payload.length, payload.length, reader.length());
855+
byte[] readBack = new byte[payload.length];
856+
reader.readFully(readBack);
857+
assertArrayEquals("early-open read-back mismatch, size=" + payload.length, payload, readBack);
858+
}
859+
860+
writer.finish();
861+
}
862+
}
863+
finally
864+
{
865+
dataFile.tryDelete();
866+
metadataFile.tryDelete();
867+
}
868+
}
869+
813870
private void testWriteAndRead(String testName, int dataSize, CompressionParams params) throws IOException
814871
{
815872
File dataFile = FileUtils.createTempFile(testName + "_direct", ".db");

0 commit comments

Comments
 (0)