Skip to content

Commit 0d40589

Browse files
CASSANDRA-21134: Direct I/O for background SSTable writes
Adds an opt-in O_DIRECT write path for background SSTable producers, bypassing the OS page cache for data that is unlikely to be re-read soon after being written. Memtable flushes remain buffered. Enabled via two new YAML knobs: - background_write_disk_access_mode: standard (default) | direct - direct_write_buffer_size: 1MiB (default; aligned up to FS block size, auto-grown to chunk_length) The path is gated by config, table compression being enabled, and an OperationType allowlist in DataComponent. The allowlist is exhaustive: any new OperationType with writesData=true that is not classified will fail static initialization. Operations on the DIO path: COMPACTION, MAJOR_COMPACTION, TOMBSTONE_COMPACTION, ANTICOMPACTION, GARBAGE_COLLECT, CLEANUP, UPGRADE_SSTABLES, WRITE, STREAM (chunked receiver only). Operations off the DIO path: - FLUSH (policy: just-flushed data is hot, keep in page cache) - SCRUB (correctness: tryAppend needs mark/resetAndTruncate) - Zero-Copy Streaming (bypasses DataComponent.buildWriter) - Uncompressed writers (only CompressedSequentialWriter has a DIO subclass in this change) StartupChecks fails fast if 'direct' is requested on a platform/FS that does not support O_DIRECT. patch by Sam Lightfoot; reviewed by <reviewers> for CASSANDRA-21134
1 parent c0999f0 commit 0d40589

18 files changed

Lines changed: 2117 additions & 72 deletions

conf/cassandra.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,21 @@ commitlog_disk_access_mode: legacy
693693
# - direct: use direct I/O for compaction reads, bypassing the OS page cache
694694
# compaction_read_disk_access_mode: auto
695695

696+
# Set the disk access mode for writing compressed SSTables during background operations
697+
# (compaction, streaming, scrub, cleanup, repair, etc.). The allowed values are:
698+
# - standard: use buffered I/O (default)
699+
# - direct: use direct I/O, bypassing the OS page cache
700+
# Note: Only applies to compressed tables. Uncompressed tables always use buffered I/O.
701+
# Note: Memtable flushes always use buffered I/O regardless of this setting, as flushed
702+
# data benefits from page cache for subsequent reads.
703+
# background_write_disk_access_mode: standard
704+
705+
# Size of the in-memory staging buffer for Direct IO background writes. Trades off syscall
706+
# frequency against per-flush blocking latency on the compaction thread.
707+
# Aligned up to filesystem block size; auto-expands to fit a single compressed chunk + CRC
708+
# + one block when chunk_length exceeds this value.
709+
# direct_write_buffer_size: 1MiB
710+
696711
# Compression to apply to SSTables as they flush for compressed tables.
697712
# Note that tables without compression enabled do not respect this flag.
698713
#

src/java/org/apache/cassandra/config/Config.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,18 @@ public MemtableOptions()
362362

363363
public DataStorageSpec.IntKibibytesBound compressed_read_ahead_buffer_size = new DataStorageSpec.IntKibibytesBound("256KiB");
364364

365+
// Direct IO for background SSTable writes (compaction, streaming, scrub, cleanup, etc.)
366+
// When 'direct' is set, background writes bypass the OS page cache using O_DIRECT.
367+
// Memtable flushes always use buffered I/O regardless of this setting.
368+
// Default is 'standard' (buffered I/O) - users must opt-in to Direct IO
369+
public DiskAccessMode background_write_disk_access_mode = DiskAccessMode.standard;
370+
371+
// Size of the in-memory staging buffer for Direct IO background writes. Trades off syscall
372+
// frequency against per-flush blocking latency on the compaction thread.
373+
// Aligned up to filesystem block size; auto-expands to fit a single compressed chunk + CRC
374+
// + one block when chunk_length exceeds this value.
375+
public DataStorageSpec.IntKibibytesBound direct_write_buffer_size = new DataStorageSpec.IntKibibytesBound("1MiB");
376+
365377
// fraction of free disk space available for compaction after min free space is subtracted
366378
public volatile Double max_space_usable_for_compactions_in_percentage = .95;
367379

src/java/org/apache/cassandra/config/DatabaseDescriptor.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ public class DatabaseDescriptor
224224

225225
private static DiskAccessMode compactionReadDiskAccessMode;
226226

227+
private static DiskAccessMode backgroundWriteDiskAccessMode;
228+
227229
private static AbstractCryptoProvider cryptoProvider;
228230
private static IAuthenticator authenticator;
229231
private static IAuthorizer authorizer;
@@ -897,6 +899,10 @@ else if (conf.repair_session_space.toMebibytes() > (int) (Runtime.getRuntime().m
897899
if (conf.hints_directory.equals(conf.saved_caches_directory))
898900
throw new ConfigurationException("saved_caches_directory must not be the same as the hints_directory", false);
899901

902+
initializeBackgroundWriteDiskAccessMode();
903+
if (backgroundWriteDiskAccessMode != conf.background_write_disk_access_mode)
904+
logger.info("background_write_disk_access_mode resolved to: {}", backgroundWriteDiskAccessMode);
905+
900906
if (conf.memtable_flush_writers == 0)
901907
{
902908
conf.memtable_flush_writers = conf.data_file_directories.length == 1 ? 2 : 1;
@@ -3406,6 +3412,79 @@ public static void initializeCommitLogDiskAccessMode()
34063412
commitLogWriteDiskAccessMode = accessModeDirectIoPair.left;
34073413
}
34083414

3415+
public static DiskAccessMode getBackgroundWriteDiskAccessMode()
3416+
{
3417+
return backgroundWriteDiskAccessMode;
3418+
}
3419+
3420+
@VisibleForTesting
3421+
public static void setBackgroundWriteDiskAccessMode(DiskAccessMode diskAccessMode)
3422+
{
3423+
backgroundWriteDiskAccessMode = diskAccessMode;
3424+
conf.background_write_disk_access_mode = diskAccessMode;
3425+
}
3426+
3427+
public static DataStorageSpec.IntKibibytesBound getDirectWriteBufferSize()
3428+
{
3429+
return conf.direct_write_buffer_size;
3430+
}
3431+
3432+
@VisibleForTesting
3433+
public static void initializeBackgroundWriteDiskAccessMode()
3434+
{
3435+
DiskAccessMode providedMode = conf.background_write_disk_access_mode;
3436+
3437+
if (providedMode == DiskAccessMode.auto)
3438+
{
3439+
providedMode = DiskAccessMode.standard;
3440+
}
3441+
3442+
if (providedMode == DiskAccessMode.direct)
3443+
{
3444+
// DataStorageSpec already rejects negatives at parse time; zero is the remaining
3445+
// nonsense value. The writer's Math.max would silently coerce it to minRequiredSize,
3446+
// which masks a likely operator mistake — fail fast instead.
3447+
if (conf.direct_write_buffer_size.toBytes() <= 0)
3448+
throw new ConfigurationException("direct_write_buffer_size must be > 0 when background_write_disk_access_mode is 'direct'. " +
3449+
"Got: " + conf.direct_write_buffer_size, false);
3450+
3451+
if (!toolInitialized)
3452+
{
3453+
List<String> unsupportedLocations = new ArrayList<>();
3454+
3455+
for (String dataDir : conf.data_file_directories)
3456+
{
3457+
try
3458+
{
3459+
File dataDirFile = new File(dataDir);
3460+
PathUtils.createDirectoriesIfNotExists(dataDirFile.toPath());
3461+
3462+
if (!FileUtils.isDirectIOSupported(dataDirFile))
3463+
{
3464+
unsupportedLocations.add(dataDir);
3465+
}
3466+
}
3467+
catch (RuntimeException e)
3468+
{
3469+
logger.warn("Unable to determine Direct IO support for data directory {}: {}", dataDir, e.getMessage());
3470+
unsupportedLocations.add(dataDir + " (check failed: " + e.getMessage() + ")");
3471+
}
3472+
}
3473+
3474+
if (!unsupportedLocations.isEmpty())
3475+
{
3476+
throw new ConfigurationException(
3477+
String.format("background_write_disk_access_mode is set to 'direct', but the following data directories " +
3478+
"do not support Direct I/O: %s. Either change background_write_disk_access_mode to 'standard' " +
3479+
"in cassandra.yaml, or ensure all data directories are on filesystems that support Direct I/O.",
3480+
unsupportedLocations), false);
3481+
}
3482+
}
3483+
}
3484+
3485+
backgroundWriteDiskAccessMode = providedMode;
3486+
}
3487+
34093488
public static String getSavedCachesLocation()
34103489
{
34113490
return conf.saved_caches_directory;
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.cassandra.io;
19+
20+
/**
21+
* Classifies an operation's eligibility for a direct-IO (O_DIRECT) data path, encoding both
22+
* the answer and the rationale class. Consumers maintain their own per-operation classification
23+
* and apply this alongside their own gates (e.g. compression, configuration mode);
24+
* {@link #SUPPORTED} is necessary but not sufficient.
25+
*/
26+
public enum DirectIoSupport
27+
{
28+
/**
29+
* Eligible for the direct-IO data path.
30+
* */
31+
SUPPORTED,
32+
33+
/**
34+
* The direct-IO path is mechanically incompatible with this operation. Removing this
35+
* exclusion requires code changes, not policy.
36+
*/
37+
UNSUPPORTED_CORRECTNESS,
38+
39+
/**
40+
* Direct IO would work but is deliberately disabled for performance or cache-residency
41+
* reasons. Removing this exclusion requires re-evaluating the policy, not code changes.
42+
*/
43+
UNSUPPORTED_POLICY;
44+
45+
public boolean isSupported()
46+
{
47+
return this == SUPPORTED;
48+
}
49+
}

src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java

Lines changed: 62 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.IOException;
2323
import java.nio.ByteBuffer;
2424
import java.nio.channels.Channels;
25+
import java.nio.file.OpenOption;
2526
import java.util.Optional;
2627
import java.util.zip.CRC32;
2728

@@ -50,66 +51,76 @@ public class CompressedSequentialWriter extends SequentialWriter
5051

5152
// holds offset in the file where current chunk should be written
5253
// changed only by flush() method where data buffer gets compressed and stored to the file
53-
private long chunkOffset = 0;
54+
protected long chunkOffset = 0;
5455

5556
// index file writer (random I/O)
56-
private final CompressionMetadata.Writer metadataWriter;
57+
protected final CompressionMetadata.Writer metadataWriter;
5758
private final ICompressor compressor;
5859

5960
// used to store compressed data
6061
private ByteBuffer compressed;
6162

6263
// holds a number of already written chunks
63-
private int chunkCount = 0;
64+
protected int chunkCount = 0;
6465

65-
private long uncompressedSize = 0, compressedSize = 0;
66+
protected long uncompressedSize = 0;
67+
protected long compressedSize = 0;
6668

67-
private final MetadataCollector sstableMetadataCollector;
69+
protected final MetadataCollector sstableMetadataCollector;
6870
private final CompressionDictionaryManager compressionDictionaryManager;
6971

7072
private final ByteBuffer crcCheckBuffer = ByteBuffer.allocate(4);
71-
private final Optional<File> digestFile;
73+
protected final Optional<File> digestFile;
7274

7375
private final int maxCompressedLength;
7476
private final boolean isDictionaryEnabled;
7577

78+
private static ByteBuffer allocateBuffer(CompressionParams parameters)
79+
{
80+
return parameters.getSstableCompressor().preferredBufferType().allocate(parameters.chunkLength());
81+
}
82+
83+
private static SequentialWriterOption buildOption(SequentialWriterOption option, CompressionParams parameters)
84+
{
85+
return SequentialWriterOption.newBuilder()
86+
.bufferSize(parameters.chunkLength())
87+
.bufferType(parameters.getSstableCompressor().preferredBufferType())
88+
.finishOnClose(option.finishOnClose())
89+
.build();
90+
}
91+
7692
public CompressedSequentialWriter(File file,
7793
File offsetsFile,
78-
File digestFile,
94+
@Nullable File digestFile,
7995
SequentialWriterOption option,
8096
CompressionParams parameters,
8197
MetadataCollector sstableMetadataCollector)
8298
{
8399
this(file, offsetsFile, digestFile, option, parameters, sstableMetadataCollector, null);
84100
}
85101

86-
87102
/**
88-
* Create CompressedSequentialWriter without digest file.
103+
* Create CompressedSequentialWriter with optional compression dictionary and channel options.
89104
*
90105
* @param file File to write
91106
* @param offsetsFile File to write compression metadata
92-
* @param digestFile File to write digest
107+
* @param digestFile File to write digest, or null if not needed
93108
* @param option Write option (buffer size and type will be set the same as compression params)
94109
* @param parameters Compression parameters
95110
* @param sstableMetadataCollector Metadata collector
96111
* @param compressionDictionaryManager manages compression dictionary; null if absent
112+
* @param extraOpenOptions additional options to pass to FileChannel.open (e.g., ExtendedOpenOption.DIRECT)
97113
*/
98114
public CompressedSequentialWriter(File file,
99115
File offsetsFile,
100-
File digestFile,
116+
@Nullable File digestFile,
101117
SequentialWriterOption option,
102118
CompressionParams parameters,
103119
MetadataCollector sstableMetadataCollector,
104-
@Nullable CompressionDictionaryManager compressionDictionaryManager)
120+
@Nullable CompressionDictionaryManager compressionDictionaryManager,
121+
OpenOption... extraOpenOptions)
105122
{
106-
super(file, SequentialWriterOption.newBuilder()
107-
.bufferSize(option.bufferSize())
108-
.bufferType(option.bufferType())
109-
.bufferSize(parameters.chunkLength())
110-
.bufferType(parameters.getSstableCompressor().preferredBufferType())
111-
.finishOnClose(option.finishOnClose())
112-
.build());
123+
super(file, allocateBuffer(parameters), buildOption(option, parameters), true, extraOpenOptions);
113124
ICompressor compressor = parameters.getSstableCompressor();
114125
this.digestFile = Optional.ofNullable(digestFile);
115126

@@ -142,7 +153,7 @@ public CompressedSequentialWriter(File file,
142153
metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsFile, compressionDictionary);
143154

144155
this.sstableMetadataCollector = sstableMetadataCollector;
145-
crcMetadata = new ChecksumWriter(new DataOutputStream(Channels.newOutputStream(channel)));
156+
crcMetadata = new ChecksumWriter(new DataOutputStream(Channels.newOutputStream(this.channel)));
146157
}
147158

148159
@Override
@@ -178,7 +189,9 @@ public void flush()
178189
@Override
179190
protected void flushData()
180191
{
181-
seekToChunkStart(); // why is this necessary? seems like it should always be at chunk start in normal operation
192+
// resetAndTruncate leaves fchannel.position() past EOF after its verification reads + truncate;
193+
// re-seek so the next chunk lands at chunkOffset. No-op under linear writes.
194+
seekToChunkStart();
182195

183196
try
184197
{
@@ -216,32 +229,36 @@ protected void flushData()
216229
}
217230
compressedSize += compressedLength;
218231

232+
// write an offset of the newly written chunk to the index file
233+
metadataWriter.addOffset(chunkOffset);
234+
chunkCount++;
235+
236+
// write out the compressed data and checksum
237+
toWrite.flip();
238+
writeChunk(toWrite);
239+
lastFlushOffset = uncompressedSize;
240+
241+
if (toWrite == buffer)
242+
buffer.position(uncompressedLength);
243+
244+
// next chunk should be written right after current + length of the checksum (int)
245+
chunkOffset += compressedLength + 4;
246+
if (runPostFlush != null)
247+
runPostFlush.accept(getLastFlushOffset());
248+
}
249+
250+
protected void writeChunk(ByteBuffer toWrite)
251+
{
219252
try
220253
{
221-
// write an offset of the newly written chunk to the index file
222-
metadataWriter.addOffset(chunkOffset);
223-
chunkCount++;
224-
225-
// write out the compressed data
226-
toWrite.flip();
227254
channel.write(toWrite);
228-
229-
// write corresponding checksum
230255
toWrite.rewind();
231256
crcMetadata.appendDirect(toWrite, true);
232-
lastFlushOffset = uncompressedSize;
233257
}
234258
catch (IOException e)
235259
{
236260
throw new FSWriteError(e, getPath());
237261
}
238-
if (toWrite == buffer)
239-
buffer.position(uncompressedLength);
240-
241-
// next chunk should be written right after current + length of the checksum (int)
242-
chunkOffset += compressedLength + 4;
243-
if (runPostFlush != null)
244-
runPostFlush.accept(getLastFlushOffset());
245262
}
246263

247264
public CompressionMetadata open(long overrideLength)
@@ -358,10 +375,16 @@ private void truncate(long toFileSize, long toBufferOffset)
358375
}
359376
}
360377

378+
protected void writeDigestFile()
379+
{
380+
digestFile.ifPresent(crcMetadata::writeFullChecksum);
381+
}
382+
361383
/**
362384
* Seek to the offset where next compressed data chunk should be stored.
385+
* Subclasses may override if they manage their own channel.
363386
*/
364-
private void seekToChunkStart()
387+
protected void seekToChunkStart()
365388
{
366389
if (getOnDiskFilePointer() != chunkOffset)
367390
{
@@ -429,7 +452,7 @@ protected Throwable doAbort(Throwable accumulate)
429452
protected void doPrepare()
430453
{
431454
syncInternal();
432-
digestFile.ifPresent(crcMetadata::writeFullChecksum);
455+
writeDigestFile();
433456
sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize);
434457
metadataWriter.finalizeLength(current(), chunkCount).prepareToCommit();
435458
}

0 commit comments

Comments
 (0)