Skip to content

Commit 5bba52f

Browse files
CASSANDRA-21134: Fix O_DIRECT short read on preemptive early-open of compaction SSTables
The preemptive openEarly path (SSTableRewriter.maybeReopenEarly) publishes a reader incrementally as the data writer's post-flush listener reports synced offsets, without a sync() call. Under O_DIRECT a chunk's compressed bytes stay in the aligned writeBuffer until its block reaches disk, so reporting the staged uncompressed offset exposes chunks not yet durable and the early reader short- reads past EOF (CorruptBlockException). Report the durable uncompressed offset instead: track each staged chunk's {compressedEnd, uncompressedEnd} and, in the post-flush listener, advance over chunks whose compressed bytes now sit below fchannel.position(). The boundary simply waits for the next block flush. Adds no extra I/O and stays inside the DIO subclass, so both SSTable formats are covered.
1 parent dc58b1b commit 5bba52f

2 files changed

Lines changed: 114 additions & 0 deletions

File tree

src/java/org/apache/cassandra/io/compress/DirectCompressedSequentialWriter.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
import java.io.IOException;
2121
import java.nio.ByteBuffer;
22+
import java.util.ArrayDeque;
2223
import java.util.concurrent.atomic.AtomicBoolean;
2324
import java.util.function.IntConsumer;
25+
import java.util.function.LongConsumer;
2426

2527
import javax.annotation.Nullable;
2628

@@ -34,6 +36,7 @@
3436

3537
import org.apache.cassandra.config.DatabaseDescriptor;
3638
import org.apache.cassandra.db.compression.CompressionDictionaryManager;
39+
import org.apache.cassandra.io.FSReadError;
3740
import org.apache.cassandra.io.FSWriteError;
3841
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
3942
import org.apache.cassandra.io.util.ChecksumWriter;
@@ -78,6 +81,12 @@ public class DirectCompressedSequentialWriter extends CompressedSequentialWriter
7881

7982
private final int blockSize;
8083

84+
// Chunks staged in writeBuffer but not yet on disk, in write order, each as {compressedEnd, uncompressedEnd}.
85+
// Drained as whole blocks reach disk so the post-flush listener can report a durable offset (see
86+
// setPostFlushListener). A chunk costs one small array per ~chunkLength written — negligible churn.
87+
private final ArrayDeque<long[]> stagedChunkBoundaries = new ArrayDeque<>();
88+
private long durableUncompressedOffset = 0;
89+
8190
public DirectCompressedSequentialWriter(File file,
8291
File offsetsFile,
8392
@Nullable File digestFile,
@@ -159,6 +168,16 @@ protected void syncInternal()
159168
syncDataOnlyInternal();
160169
}
161170

171+
// flushData fires this per chunk with the staged (uncompressed) offset, but under O_DIRECT a chunk is
172+
// only readable once its block reaches disk. Preemptive early-open (markDataSynced) publishes a reader
173+
// as soon as this offset covers its boundary, so reporting the staged offset would expose chunks still
174+
// in writeBuffer. Report the durable offset instead; the boundary simply waits for the next block flush.
175+
@Override
176+
public void setPostFlushListener(LongConsumer postFlush)
177+
{
178+
super.setPostFlushListener(stagedOffset -> postFlush.accept(durableUncompressedOffset()));
179+
}
180+
162181
@Override
163182
protected void writeChunk(ByteBuffer toWrite)
164183
{
@@ -175,6 +194,7 @@ protected void writeChunk(ByteBuffer toWrite)
175194
crcMetadata.appendDirect(toWrite, true);
176195

177196
actualDataSize = chunkOffset + chunkLength + CRC_LENGTH;
197+
stagedChunkBoundaries.add(new long[]{ actualDataSize, uncompressedSize });
178198
}
179199

180200
private void writeToAlignedBuffer(ByteBuffer data)
@@ -264,6 +284,30 @@ private void flushBufferedTailForEarlyOpen()
264284
}
265285
}
266286

287+
// Advance over chunks whose compressed bytes now sit entirely below the on-disk boundary
288+
// (fchannel.position() — only whole blocks are written). Chunks flush in order, so a single drain
289+
// suffices and the offset is monotonic.
290+
private long durableUncompressedOffset()
291+
{
292+
long onDisk;
293+
try
294+
{
295+
onDisk = fchannel.position();
296+
}
297+
catch (IOException e)
298+
{
299+
throw new FSReadError(e, getPath());
300+
}
301+
302+
long[] chunk;
303+
while ((chunk = stagedChunkBoundaries.peek()) != null && chunk[0] <= onDisk)
304+
{
305+
durableUncompressedOffset = chunk[1];
306+
stagedChunkBoundaries.poll();
307+
}
308+
return durableUncompressedOffset;
309+
}
310+
267311
private void flushFinalWithPadding()
268312
{
269313
int logicalPos = writeBuffer.position();

test/unit/org/apache/cassandra/io/compress/DirectCompressedSequentialWriterTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -867,6 +867,76 @@ private static void assertEarlyOpenReadsBack(byte[] payload, CompressionParams p
867867
}
868868
}
869869

870+
/**
871+
* Covers the preemptive early-open read path (SSTableRewriter.maybeReopenEarly): unlike openFinalEarly
872+
* it never calls sync(). The partition index advances its readable boundary off the writer's post-flush
873+
* offset, then a reader is published over that boundary mid-compaction. Under O_DIRECT only whole blocks
874+
* reach disk during flushData, so the writer must report the durable offset, not the staged one —
875+
* otherwise the reader short-reads chunks still parked in writeBuffer. testEarlyOpenAfterSyncReadsBackData
876+
* does not cover this — its sync() flushes the tail; this window has no sync at all.
877+
*/
878+
@Test
879+
public void testPreemptiveOpenReadsBackSyncedData() throws IOException
880+
{
881+
// Many small chunks that compress far below one block: nothing reaches disk before finish, so the
882+
// writer must report 0 synced (the prod system/IndexInfo shape) rather than the staged length.
883+
assertSyncedOffsetIsReadable(compressible(DEFAULT_CHUNK_LENGTH * 4 + 137), CompressionParams.lz4());
884+
885+
// Several MiB of incompressible data forces real block flushes mid-write, so the synced boundary
886+
// must advance past zero (preemptive open still works) yet never cross the still-buffered tail.
887+
long synced = assertSyncedOffsetIsReadable(incompressible(4 << 20), CompressionParams.lz4());
888+
assertTrue("expected the synced boundary to advance for multi-MiB data", synced > 0);
889+
}
890+
891+
// Mirrors how a preemptive early-open reader is bounded: whatever offset the writer reports as synced
892+
// (via the post-flush listener that drives PartitionIndexBuilder.markDataSynced) must be fully readable
893+
// from disk, with no sync()/finish() in between. Returns that synced offset.
894+
private static long assertSyncedOffsetIsReadable(byte[] payload, CompressionParams params) throws IOException
895+
{
896+
File dataFile = FileUtils.createTempFile("preemptive_open_direct", ".db");
897+
File metadataFile = new File(dataFile.absolutePath() + ".metadata");
898+
try
899+
{
900+
MetadataCollector collector = newCollector();
901+
try (DirectCompressedSequentialWriter writer = new DirectCompressedSequentialWriter(
902+
dataFile, metadataFile, null, SequentialWriterOption.DEFAULT, params, collector, null))
903+
{
904+
long[] syncedOffset = { 0 };
905+
writer.setPostFlushListener(offset -> syncedOffset[0] = offset);
906+
907+
writer.write(payload);
908+
909+
int readable = (int) syncedOffset[0];
910+
if (readable > 0)
911+
{
912+
try (CompressionMetadata md = writer.open(readable);
913+
FileHandle fh = new FileHandle.Builder(dataFile).withCompressionMetadata(md).complete();
914+
RandomAccessReader reader = fh.createReader())
915+
{
916+
byte[] readBack = new byte[readable];
917+
reader.readFully(readBack);
918+
assertArrayEquals("preemptive-open read-back mismatch", Arrays.copyOf(payload, readable), readBack);
919+
}
920+
}
921+
922+
writer.finish();
923+
return syncedOffset[0];
924+
}
925+
}
926+
finally
927+
{
928+
dataFile.tryDelete();
929+
metadataFile.tryDelete();
930+
}
931+
}
932+
933+
private static byte[] incompressible(int size)
934+
{
935+
byte[] data = new byte[size];
936+
new Random(42).nextBytes(data);
937+
return data;
938+
}
939+
870940
private void testWriteAndRead(String testName, int dataSize, CompressionParams params) throws IOException
871941
{
872942
File dataFile = FileUtils.createTempFile(testName + "_direct", ".db");

0 commit comments

Comments
 (0)