Skip to content

Commit 7d468f8

Browse files
CASSANDRA-21134: Address review feedback for background write Direct I/O
- Reuse ChecksumWriter for O_DIRECT compressed writes via DirectChecksumWriter - Centralize Direct I/O write paths and drop redundant background-write support validation - Rely on startup checks for engagement; remove per-operation log and its reflective test seam - Exercise isDirectIOSupported path checks in DatabaseDescriptorTest
1 parent 31ec958 commit 7d468f8

16 files changed

Lines changed: 722 additions & 304 deletions

File tree

conf/cassandra.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -697,8 +697,8 @@ commitlog_disk_access_mode: legacy
697697
# (compaction, streaming, cleanup, repair, etc.). The allowed values are:
698698
# - standard: use buffered I/O (default)
699699
# - direct: use direct I/O, bypassing the OS page cache
700-
# Note: Only applies to compressed tables. Uncompressed tables always use buffered I/O.
701-
# Note: Memtable flushes always use buffered I/O regardless of this setting, as flushed
700+
# Only applies to compressed tables. Uncompressed tables always use buffered I/O.
701+
# Memtable flushes always use buffered I/O regardless of this setting, as flushed
702702
# data benefits from page cache for subsequent reads.
703703
# background_write_disk_access_mode: standard
704704

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3429,11 +3429,31 @@ public static DataStorageSpec.IntKibibytesBound getDirectWriteBufferSize()
34293429
return conf.direct_write_buffer_size;
34303430
}
34313431

3432+
/**
3433+
* The directories opened with O_DIRECT for writing. Startup checks that must reason about O_DIRECT
3434+
* write targets (e.g. the kernel-bug 1057843 check) derive their path set from here.
3435+
*/
3436+
public static Set<Path> getDirectIOWritePaths()
3437+
{
3438+
Set<Path> paths = new HashSet<>();
3439+
3440+
if (getCommitLogWriteDiskAccessMode() == DiskAccessMode.direct)
3441+
paths.add(new File(getCommitLogLocation()).toPath());
3442+
3443+
if (getBackgroundWriteDiskAccessMode() == DiskAccessMode.direct)
3444+
for (String dataDir : getAllDataFileLocations())
3445+
paths.add(new File(dataDir).toPath());
3446+
3447+
return paths;
3448+
}
3449+
34323450
@VisibleForTesting
34333451
public static void initializeBackgroundWriteDiskAccessMode()
34343452
{
34353453
DiskAccessMode providedMode = conf.background_write_disk_access_mode;
34363454

3455+
// 'auto' is a forward-looking placeholder: it resolves to standard until Direct I/O is a safe
3456+
// default, at which point it will resolve to direct (mirroring commitlog_disk_access_mode).
34373457
if (providedMode == DiskAccessMode.auto)
34383458
{
34393459
providedMode = DiskAccessMode.standard;
@@ -3448,38 +3468,12 @@ public static void initializeBackgroundWriteDiskAccessMode()
34483468
throw new ConfigurationException("direct_write_buffer_size must be > 0 when background_write_disk_access_mode is 'direct'. " +
34493469
"Got: " + conf.direct_write_buffer_size, false);
34503470

3471+
// Create the data directories up front (as we do for the commit log) so the kernel-bug 1057843
3472+
// startup check can stat each O_DIRECT write target. Direct I/O support is validated separately
3473+
// by the directio_support startup check.
34513474
if (!toolInitialized)
3452-
{
3453-
List<String> unsupportedLocations = new ArrayList<>();
3454-
3455-
for (String dataDir : conf.data_file_directories)
3456-
{
3457-
try
3458-
{
3459-
File dataDirFile = new File(dataDir);
3460-
PathUtils.createDirectoriesIfNotExists(dataDirFile.toPath());
3461-
3462-
if (!FileUtils.isDirectIOSupported(dataDirFile))
3463-
{
3464-
unsupportedLocations.add(dataDir);
3465-
}
3466-
}
3467-
catch (RuntimeException e)
3468-
{
3469-
logger.warn("Unable to determine Direct IO support for data directory {}: {}", dataDir, e.getMessage());
3470-
unsupportedLocations.add(dataDir + " (check failed: " + e.getMessage() + ")");
3471-
}
3472-
}
3473-
3474-
if (!unsupportedLocations.isEmpty())
3475-
{
3476-
throw new ConfigurationException(
3477-
String.format("background_write_disk_access_mode is set to 'direct', but the following data directories " +
3478-
"do not support Direct I/O: %s. Either change background_write_disk_access_mode to 'standard' " +
3479-
"in cassandra.yaml, or ensure all data directories are on filesystems that support Direct I/O.",
3480-
unsupportedLocations), false);
3481-
}
3482-
}
3475+
for (String dataDir : getAllDataFileLocations())
3476+
PathUtils.createDirectoriesIfNotExists(new File(dataDir).toPath());
34833477
}
34843478
else if (providedMode != DiskAccessMode.standard)
34853479
{

src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747

4848
public class CompressedSequentialWriter extends SequentialWriter
4949
{
50-
private final ChecksumWriter crcMetadata;
50+
protected final ChecksumWriter crcMetadata;
5151

5252
// holds offset in the file where current chunk should be written
5353
// changed only by flush() method where data buffer gets compressed and stored to the file
@@ -153,7 +153,16 @@ public CompressedSequentialWriter(File file,
153153
metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsFile, compressionDictionary);
154154

155155
this.sstableMetadataCollector = sstableMetadataCollector;
156-
crcMetadata = new ChecksumWriter(new DataOutputStream(Channels.newOutputStream(this.channel)));
156+
crcMetadata = createChecksumWriter();
157+
}
158+
159+
/**
160+
* Creates the {@link ChecksumWriter} for the chunk and full-file checksums. Invoked from the constructor,
161+
* so overrides must not read subclass fields.
162+
*/
163+
protected ChecksumWriter createChecksumWriter()
164+
{
165+
return new ChecksumWriter(new DataOutputStream(Channels.newOutputStream(channel)));
157166
}
158167

159168
@Override

src/java/org/apache/cassandra/io/compress/DirectCompressedSequentialWriter.java

Lines changed: 64 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919

2020
import java.io.IOException;
2121
import java.nio.ByteBuffer;
22-
import java.nio.charset.StandardCharsets;
2322
import java.util.concurrent.atomic.AtomicBoolean;
24-
import java.util.zip.CRC32;
23+
import java.util.function.IntConsumer;
2524

2625
import javax.annotation.Nullable;
2726

@@ -37,9 +36,9 @@
3736
import org.apache.cassandra.db.compression.CompressionDictionaryManager;
3837
import org.apache.cassandra.io.FSWriteError;
3938
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
39+
import org.apache.cassandra.io.util.ChecksumWriter;
4040
import org.apache.cassandra.io.util.DataPosition;
4141
import org.apache.cassandra.io.util.File;
42-
import org.apache.cassandra.io.util.FileOutputStreamPlus;
4342
import org.apache.cassandra.io.util.FileUtils;
4443
import org.apache.cassandra.io.util.SequentialWriter;
4544
import org.apache.cassandra.io.util.SequentialWriterOption;
@@ -75,14 +74,9 @@ public class DirectCompressedSequentialWriter extends CompressedSequentialWriter
7574
// fit contiguously. flushCompleteBlocks aligns down to blockSize, leaving up to
7675
// (blockSize - 1) bytes carried over via compact(); the floor accounts for that.
7776
private ByteBuffer writeBuffer;
78-
private int writeBufferPosition = 0;
7977
private long actualDataSize = 0;
8078

8179
private final int blockSize;
82-
// ChecksumWriter writes CRCs directly to the channel, bypassing writeBuffer; track checksums ourselves.
83-
private final CRC32 fullFileChecksum = new CRC32();
84-
private final CRC32 chunkChecksum = new CRC32();
85-
private final ByteBuffer crcBuffer = ByteBuffer.allocate(CRC_LENGTH);
8680

8781
public DirectCompressedSequentialWriter(File file,
8882
File offsetsFile,
@@ -130,6 +124,14 @@ public DirectCompressedSequentialWriter(File file,
130124
}
131125
}
132126

127+
// Invoked from super()'s constructor before writeBuffer exists; safe because we only capture the
128+
// writeCrcToAlignedBuffer reference, which isn't called until the first flush.
129+
@Override
130+
protected ChecksumWriter createChecksumWriter()
131+
{
132+
return new DirectChecksumWriter(this::writeCrcToAlignedBuffer);
133+
}
134+
133135
// Parent reads fchannel.position(), which lags by writeBuffer contents under O_DIRECT.
134136
// getEstimatedOnDiskBytesWritten is intentionally NOT overridden: parent returns chunkOffset,
135137
// which already represents the eventual on-disk size — correct under DIO.
@@ -155,16 +157,11 @@ protected void writeChunk(ByteBuffer toWrite)
155157

156158
int chunkLength = toWrite.remaining();
157159

158-
chunkChecksum.reset();
159-
chunkChecksum.update(toWrite);
160-
int crcValue = (int) chunkChecksum.getValue();
161-
toWrite.rewind();
162-
163160
writeToAlignedBuffer(toWrite);
164-
writeCrcToAlignedBuffer(crcValue);
165161

162+
// writeToAlignedBuffer drained toWrite; rewind so appendDirect can re-read it for the CRC.
166163
toWrite.rewind();
167-
updateFullChecksum(toWrite, crcValue);
164+
crcMetadata.appendDirect(toWrite, true);
168165

169166
actualDataSize = chunkOffset + chunkLength + CRC_LENGTH;
170167
}
@@ -174,51 +171,57 @@ private void writeToAlignedBuffer(ByteBuffer data)
174171
int dataLength = data.remaining();
175172

176173
// Buffer is sized for worst-case chunk + CRC + blockSize, so a flush always frees enough room.
177-
if (writeBufferPosition + dataLength > writeBuffer.capacity())
174+
if (writeBuffer.position() + dataLength > writeBuffer.capacity())
178175
flushCompleteBlocks();
179176

180-
writeBuffer.position(writeBufferPosition);
181177
writeBuffer.put(data);
182-
writeBufferPosition = writeBuffer.position();
183178
}
184179

185180
private void writeCrcToAlignedBuffer(int crcValue)
186181
{
187182
// After flush, leftover < blockSize, so there's always room for the CRC trailer.
188-
if (writeBufferPosition + CRC_LENGTH > writeBuffer.capacity())
183+
if (writeBuffer.position() + CRC_LENGTH > writeBuffer.capacity())
189184
flushCompleteBlocks();
190185

191-
writeBuffer.position(writeBufferPosition);
192186
writeBuffer.putInt(crcValue);
193-
writeBufferPosition = writeBuffer.position();
187+
}
188+
189+
// FileChannel.write may write fewer bytes than requested, and a partial write would short the file
190+
// and desync the callers' leftover/truncate bookkeeping.
191+
private void writeAlignedBlocks(int flushLimit) throws IOException
192+
{
193+
writeBuffer.position(0);
194+
writeBuffer.limit(flushLimit);
195+
while (writeBuffer.hasRemaining())
196+
fchannel.write(writeBuffer);
194197
}
195198

196199
private void flushCompleteBlocks()
197200
{
201+
// writeAlignedBlocks rewinds position() to 0 to drain, so snapshot the cursor first.
202+
int logicalPos = writeBuffer.position();
203+
198204
// Align down: O_DIRECT cannot write partial blocks
199-
int flushLimit = writeBufferPosition & -blockSize;
205+
int flushLimit = logicalPos & -blockSize;
200206

201207
if (flushLimit == 0)
202208
return;
203209

204210
try
205211
{
206-
writeBuffer.position(0);
207-
writeBuffer.limit(flushLimit);
208-
fchannel.write(writeBuffer);
212+
writeAlignedBlocks(flushLimit);
209213

210-
int leftover = writeBufferPosition - flushLimit;
214+
int leftover = logicalPos - flushLimit;
211215
if (leftover > 0)
212216
{
213-
writeBuffer.limit(writeBufferPosition);
217+
writeBuffer.limit(logicalPos);
214218
writeBuffer.position(flushLimit);
215219
writeBuffer.compact();
216220
}
217221
else
218222
{
219223
writeBuffer.clear();
220224
}
221-
writeBufferPosition = leftover;
222225
}
223226
catch (IOException e)
224227
{
@@ -228,19 +231,18 @@ private void flushCompleteBlocks()
228231

229232
private void flushFinalWithPadding()
230233
{
231-
if (writeBufferPosition == 0)
234+
int logicalPos = writeBuffer.position();
235+
236+
if (logicalPos == 0)
232237
return;
233238

234239
try
235240
{
236-
int flushLimit = BitUtil.align(writeBufferPosition, blockSize);
241+
int flushLimit = BitUtil.align(logicalPos, blockSize);
237242

238-
writeBuffer.position(writeBufferPosition);
239-
ByteBufferUtil.writeZeroes(writeBuffer, flushLimit - writeBufferPosition);
243+
ByteBufferUtil.writeZeroes(writeBuffer, flushLimit - logicalPos);
240244

241-
writeBuffer.position(0);
242-
writeBuffer.limit(flushLimit);
243-
fchannel.write(writeBuffer);
245+
writeAlignedBlocks(flushLimit);
244246

245247
// O_DIRECT required padding; truncate back to actual data size.
246248
fchannel.truncate(actualDataSize);
@@ -251,34 +253,6 @@ private void flushFinalWithPadding()
251253
}
252254
}
253255

254-
private void updateFullChecksum(ByteBuffer data, int crcValue)
255-
{
256-
fullFileChecksum.update(data);
257-
258-
// Include CRC bytes in the full-file checksum to match ChecksumWriter.appendDirect(..., true).
259-
crcBuffer.clear();
260-
crcBuffer.putInt(crcValue);
261-
crcBuffer.flip();
262-
fullFileChecksum.update(crcBuffer);
263-
}
264-
265-
@Override
266-
protected void writeDigestFile()
267-
{
268-
digestFile.ifPresent(file -> {
269-
try (FileOutputStreamPlus fos = new FileOutputStreamPlus(file))
270-
{
271-
fos.write(String.valueOf(fullFileChecksum.getValue()).getBytes(StandardCharsets.UTF_8));
272-
fos.flush();
273-
fos.getChannel().force(true);
274-
}
275-
catch (IOException e)
276-
{
277-
throw new FSWriteError(e, file);
278-
}
279-
});
280-
}
281-
282256
// Gated out for SCRUB in DataComponent.buildWriter; these throws are a canary if the gate is bypassed.
283257
@Override
284258
public DataPosition mark()
@@ -332,4 +306,31 @@ protected SequentialWriter.TransactionalProxy txnProxy()
332306
{
333307
return new DirectTransactionalProxy();
334308
}
309+
310+
/**
311+
* Sends the per-chunk CRC int into the block-aligned writeBuffer rather than straight to the channel, so
312+
* the O_DIRECT writer reuses ChecksumWriter's checksum bookkeeping instead of duplicating it. Only the CRC
313+
* trailer flows through here; {@link #writeChunk} places the chunk data in the buffer.
314+
*/
315+
private static final class DirectChecksumWriter extends ChecksumWriter
316+
{
317+
private final IntConsumer alignedSink;
318+
319+
DirectChecksumWriter(IntConsumer alignedSink)
320+
{
321+
this.alignedSink = alignedSink;
322+
}
323+
324+
@Override
325+
protected void writeIncrementalInt(int value)
326+
{
327+
alignedSink.accept(value);
328+
}
329+
330+
@Override
331+
public void writeChunkSize(int length)
332+
{
333+
throw new UnsupportedOperationException("writeChunkSize is unused on the compressed O_DIRECT path");
334+
}
335+
}
335336
}

src/java/org/apache/cassandra/io/sstable/format/DataComponent.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,8 @@
1919
package org.apache.cassandra.io.sstable.format;
2020

2121
import java.util.EnumMap;
22-
import java.util.Set;
23-
import java.util.concurrent.ConcurrentHashMap;
2422

25-
import org.slf4j.Logger;
26-
import org.slf4j.LoggerFactory;
23+
import com.google.common.annotations.VisibleForTesting;
2724

2825
import org.apache.cassandra.config.Config.DiskAccessMode;
2926
import org.apache.cassandra.config.Config.FlushCompression;
@@ -49,12 +46,8 @@
4946

5047
public class DataComponent
5148
{
52-
private static final Logger logger = LoggerFactory.getLogger(DataComponent.class);
53-
5449
private static final EnumMap<OperationType, DirectIoSupport> DIRECT_WRITE_SUPPORT = buildDirectWriteSupport();
5550

56-
private static final Set<OperationType> directWriteLogged = ConcurrentHashMap.newKeySet();
57-
5851
private static EnumMap<OperationType, DirectIoSupport> buildDirectWriteSupport()
5952
{
6053
EnumMap<OperationType, DirectIoSupport> m = new EnumMap<>(OperationType.class);
@@ -97,8 +90,6 @@ public static SequentialWriter buildWriter(Descriptor descriptor,
9790
if (DatabaseDescriptor.getBackgroundWriteDiskAccessMode() == DiskAccessMode.direct
9891
&& isDirectWriteSupported(operationType))
9992
{
100-
if (directWriteLogged.add(operationType))
101-
logger.info("Direct I/O writer activated for {}", operationType);
10293
return new DirectCompressedSequentialWriter(descriptor.fileFor(Components.DATA),
10394
descriptor.fileFor(Components.COMPRESSION_INFO),
10495
descriptor.fileFor(Components.DIGEST),
@@ -168,7 +159,8 @@ private static CompressionParams buildCompressionParams(TableMetadata metadata,
168159
return compressionParams;
169160
}
170161

171-
private static boolean isDirectWriteSupported(OperationType operationType)
162+
@VisibleForTesting
163+
static boolean isDirectWriteSupported(OperationType operationType)
172164
{
173165
DirectIoSupport support = DIRECT_WRITE_SUPPORT.get(operationType);
174166
if (support == null)

0 commit comments

Comments
 (0)