1919
2020import java .io .IOException ;
2121import java .nio .ByteBuffer ;
22- import java .nio .charset .StandardCharsets ;
2322import java .util .concurrent .atomic .AtomicBoolean ;
24- import java .util .zip . CRC32 ;
23+ import java .util .function . IntConsumer ;
2524
2625import javax .annotation .Nullable ;
2726
3736import org .apache .cassandra .db .compression .CompressionDictionaryManager ;
3837import org .apache .cassandra .io .FSWriteError ;
3938import org .apache .cassandra .io .sstable .metadata .MetadataCollector ;
39+ import org .apache .cassandra .io .util .ChecksumWriter ;
4040import org .apache .cassandra .io .util .DataPosition ;
4141import org .apache .cassandra .io .util .File ;
42- import org .apache .cassandra .io .util .FileOutputStreamPlus ;
4342import org .apache .cassandra .io .util .FileUtils ;
4443import org .apache .cassandra .io .util .SequentialWriter ;
4544import org .apache .cassandra .io .util .SequentialWriterOption ;
@@ -75,14 +74,9 @@ public class DirectCompressedSequentialWriter extends CompressedSequentialWriter
7574 // fit contiguously. flushCompleteBlocks aligns down to blockSize, leaving up to
7675 // (blockSize - 1) bytes carried over via compact(); the floor accounts for that.
7776 private ByteBuffer writeBuffer ;
78- private int writeBufferPosition = 0 ;
7977 private long actualDataSize = 0 ;
8078
8179 private final int blockSize ;
82- // ChecksumWriter writes CRCs directly to the channel, bypassing writeBuffer; track checksums ourselves.
83- private final CRC32 fullFileChecksum = new CRC32 ();
84- private final CRC32 chunkChecksum = new CRC32 ();
85- private final ByteBuffer crcBuffer = ByteBuffer .allocate (CRC_LENGTH );
8680
8781 public DirectCompressedSequentialWriter (File file ,
8882 File offsetsFile ,
@@ -130,6 +124,14 @@ public DirectCompressedSequentialWriter(File file,
130124 }
131125 }
132126
127+ // Invoked from super()'s constructor before writeBuffer exists; safe because we only capture the
128+ // writeCrcToAlignedBuffer reference, which isn't called until the first flush.
129+ @ Override
130+ protected ChecksumWriter createChecksumWriter ()
131+ {
132+ return new DirectChecksumWriter (this ::writeCrcToAlignedBuffer );
133+ }
134+
133135 // Parent reads fchannel.position(), which lags by writeBuffer contents under O_DIRECT.
134136 // getEstimatedOnDiskBytesWritten is intentionally NOT overridden: parent returns chunkOffset,
135137 // which already represents the eventual on-disk size — correct under DIO.
@@ -155,16 +157,11 @@ protected void writeChunk(ByteBuffer toWrite)
155157
156158 int chunkLength = toWrite .remaining ();
157159
158- chunkChecksum .reset ();
159- chunkChecksum .update (toWrite );
160- int crcValue = (int ) chunkChecksum .getValue ();
161- toWrite .rewind ();
162-
163160 writeToAlignedBuffer (toWrite );
164- writeCrcToAlignedBuffer (crcValue );
165161
162+ // writeToAlignedBuffer drained toWrite; rewind so appendDirect can re-read it for the CRC.
166163 toWrite .rewind ();
167- updateFullChecksum (toWrite , crcValue );
164+ crcMetadata . appendDirect (toWrite , true );
168165
169166 actualDataSize = chunkOffset + chunkLength + CRC_LENGTH ;
170167 }
@@ -174,51 +171,57 @@ private void writeToAlignedBuffer(ByteBuffer data)
174171 int dataLength = data .remaining ();
175172
176173 // Buffer is sized for worst-case chunk + CRC + blockSize, so a flush always frees enough room.
177- if (writeBufferPosition + dataLength > writeBuffer .capacity ())
174+ if (writeBuffer . position () + dataLength > writeBuffer .capacity ())
178175 flushCompleteBlocks ();
179176
180- writeBuffer .position (writeBufferPosition );
181177 writeBuffer .put (data );
182- writeBufferPosition = writeBuffer .position ();
183178 }
184179
185180 private void writeCrcToAlignedBuffer (int crcValue )
186181 {
187182 // After flush, leftover < blockSize, so there's always room for the CRC trailer.
188- if (writeBufferPosition + CRC_LENGTH > writeBuffer .capacity ())
183+ if (writeBuffer . position () + CRC_LENGTH > writeBuffer .capacity ())
189184 flushCompleteBlocks ();
190185
191- writeBuffer .position (writeBufferPosition );
192186 writeBuffer .putInt (crcValue );
193- writeBufferPosition = writeBuffer .position ();
187+ }
188+
189+ // FileChannel.write may write fewer bytes than requested, and a partial write would short the file
190+ // and desync the callers' leftover/truncate bookkeeping.
191+ private void writeAlignedBlocks (int flushLimit ) throws IOException
192+ {
193+ writeBuffer .position (0 );
194+ writeBuffer .limit (flushLimit );
195+ while (writeBuffer .hasRemaining ())
196+ fchannel .write (writeBuffer );
194197 }
195198
196199 private void flushCompleteBlocks ()
197200 {
201+ // writeAlignedBlocks rewinds position() to 0 to drain, so snapshot the cursor first.
202+ int logicalPos = writeBuffer .position ();
203+
198204 // Align down: O_DIRECT cannot write partial blocks
199- int flushLimit = writeBufferPosition & -blockSize ;
205+ int flushLimit = logicalPos & -blockSize ;
200206
201207 if (flushLimit == 0 )
202208 return ;
203209
204210 try
205211 {
206- writeBuffer .position (0 );
207- writeBuffer .limit (flushLimit );
208- fchannel .write (writeBuffer );
212+ writeAlignedBlocks (flushLimit );
209213
210- int leftover = writeBufferPosition - flushLimit ;
214+ int leftover = logicalPos - flushLimit ;
211215 if (leftover > 0 )
212216 {
213- writeBuffer .limit (writeBufferPosition );
217+ writeBuffer .limit (logicalPos );
214218 writeBuffer .position (flushLimit );
215219 writeBuffer .compact ();
216220 }
217221 else
218222 {
219223 writeBuffer .clear ();
220224 }
221- writeBufferPosition = leftover ;
222225 }
223226 catch (IOException e )
224227 {
@@ -228,19 +231,18 @@ private void flushCompleteBlocks()
228231
229232 private void flushFinalWithPadding ()
230233 {
231- if (writeBufferPosition == 0 )
234+ int logicalPos = writeBuffer .position ();
235+
236+ if (logicalPos == 0 )
232237 return ;
233238
234239 try
235240 {
236- int flushLimit = BitUtil .align (writeBufferPosition , blockSize );
241+ int flushLimit = BitUtil .align (logicalPos , blockSize );
237242
238- writeBuffer .position (writeBufferPosition );
239- ByteBufferUtil .writeZeroes (writeBuffer , flushLimit - writeBufferPosition );
243+ ByteBufferUtil .writeZeroes (writeBuffer , flushLimit - logicalPos );
240244
241- writeBuffer .position (0 );
242- writeBuffer .limit (flushLimit );
243- fchannel .write (writeBuffer );
245+ writeAlignedBlocks (flushLimit );
244246
245247 // O_DIRECT required padding; truncate back to actual data size.
246248 fchannel .truncate (actualDataSize );
@@ -251,34 +253,6 @@ private void flushFinalWithPadding()
251253 }
252254 }
253255
254- private void updateFullChecksum (ByteBuffer data , int crcValue )
255- {
256- fullFileChecksum .update (data );
257-
258- // Include CRC bytes in the full-file checksum to match ChecksumWriter.appendDirect(..., true).
259- crcBuffer .clear ();
260- crcBuffer .putInt (crcValue );
261- crcBuffer .flip ();
262- fullFileChecksum .update (crcBuffer );
263- }
264-
265- @ Override
266- protected void writeDigestFile ()
267- {
268- digestFile .ifPresent (file -> {
269- try (FileOutputStreamPlus fos = new FileOutputStreamPlus (file ))
270- {
271- fos .write (String .valueOf (fullFileChecksum .getValue ()).getBytes (StandardCharsets .UTF_8 ));
272- fos .flush ();
273- fos .getChannel ().force (true );
274- }
275- catch (IOException e )
276- {
277- throw new FSWriteError (e , file );
278- }
279- });
280- }
281-
282256 // Gated out for SCRUB in DataComponent.buildWriter; these throws are a canary if the gate is bypassed.
283257 @ Override
284258 public DataPosition mark ()
@@ -332,4 +306,31 @@ protected SequentialWriter.TransactionalProxy txnProxy()
332306 {
333307 return new DirectTransactionalProxy ();
334308 }
309+
310+ /**
311+ * Sends the per-chunk CRC int into the block-aligned writeBuffer rather than straight to the channel, so
312+ * the O_DIRECT writer reuses ChecksumWriter's checksum bookkeeping instead of duplicating it. Only the CRC
313+ * trailer flows through here; {@link #writeChunk} places the chunk data in the buffer.
314+ */
315+ private static final class DirectChecksumWriter extends ChecksumWriter
316+ {
317+ private final IntConsumer alignedSink ;
318+
319+ DirectChecksumWriter (IntConsumer alignedSink )
320+ {
321+ this .alignedSink = alignedSink ;
322+ }
323+
324+ @ Override
325+ protected void writeIncrementalInt (int value )
326+ {
327+ alignedSink .accept (value );
328+ }
329+
330+ @ Override
331+ public void writeChunkSize (int length )
332+ {
333+ throw new UnsupportedOperationException ("writeChunkSize is unused on the compressed O_DIRECT path" );
334+ }
335+ }
335336}
0 commit comments