2121
2222import com .google .common .collect .ImmutableList ;
2323import io .pixelsdb .pixels .common .physical .*;
24+ import io .pixelsdb .pixels .common .utils .ConfigFactory ;
2425import io .pixelsdb .pixels .common .utils .Constants ;
2526import io .pixelsdb .pixels .core .PixelsProto ;
2627import io .pixelsdb .pixels .core .PixelsVersion ;
@@ -58,6 +59,14 @@ public class PixelsCompactor
5859 private final TimeZone timeZone ;
5960 private final long fileContentLength ;
6061 private final int fileRowNum ;
62+ /**
63+ * The number of bytes that each column chunk is aligned to.
64+ */
65+ private final int chunkAlignment ;
66+ /**
67+ * The byte buffer padded to each column chunk for alignment.
68+ */
69+ private final byte [] chunkPaddingBuffer ;
6170
6271 private final Storage inputStorage ;
6372 private final PhysicalWriter fsWriter ;
@@ -93,6 +102,9 @@ private PixelsCompactor(
93102 checkArgument (compressionBlockSize > 0 , "compression block size is not positive" );
94103 this .compressionBlockSize = compressionBlockSize ;
95104 this .timeZone = requireNonNull (timeZone );
105+ this .chunkAlignment = Integer .parseInt (ConfigFactory .Instance ().getProperty ("column.chunk.alignment" ));
106+ checkArgument (this .chunkAlignment >= 0 , "column.chunk.alignment must >= 0" );
107+ this .chunkPaddingBuffer = new byte [this .chunkAlignment ];
96108 checkArgument (fileContentLength > 0 , "file content length is not positive" );
97109 this .fileContentLength = fileContentLength ;
98110 checkArgument (fileRowNum > 0 , "file row number is not positive" );
@@ -103,7 +115,7 @@ private PixelsCompactor(
103115
104116 this .fileColStatRecorders = requireNonNull (fileColStatRecorders , "file column stat reader is null" );
105117
106- checkArgument (!requireNonNull (rowGroupFooterBuilderList ).isEmpty ());
118+ checkArgument (!requireNonNull (rowGroupInfoBuilderList ).isEmpty ());
107119 checkArgument (!requireNonNull (rowGroupStatBuilderList ).isEmpty ());
108120 checkArgument (!requireNonNull (rowGroupFooterBuilderList ).isEmpty ());
109121 checkArgument (!requireNonNull (rowGroupPaths ).isEmpty ());
@@ -369,10 +381,34 @@ private void writeColumnChunks()
369381 fsReader .seek (columnChunkOffset );
370382 byte [] chunkBuffer = new byte [(int ) columnChunkLength ];
371383 fsReader .readFully (chunkBuffer );
372- fsWriter .prepare ((int ) columnChunkLength );
373- long offset = this .fsWriter .append (chunkBuffer , 0 , (int ) columnChunkLength );
374- columnChunkIndexBuilder .setChunkOffset (offset );
375- // this.fsWriter.flush(); // Issue #192: no need to flush as writing has not finished.
384+
385+ // Issue #521: prepare for writing the column chunk, and make sure the start offset is aligned.
386+ long chunkStartOffset = fsWriter .prepare ((int ) columnChunkLength );
387+ int tryAlign = 0 ;
388+ while (chunkAlignment != 0 && chunkStartOffset % chunkAlignment != 0 && tryAlign ++ < 2 )
389+ {
390+ int alignBytes = (int ) (chunkAlignment - chunkStartOffset % chunkAlignment );
391+ this .fsWriter .append (chunkPaddingBuffer , 0 , alignBytes );
392+ chunkStartOffset = this .fsWriter .prepare ((int ) columnChunkLength );
393+ }
394+ if (tryAlign > 2 )
395+ {
396+ LOGGER .warn ("failed to align the start offset of the column chunk" );
397+ throw new IOException ("failed to align the start offset of the column chunk" );
398+ }
399+
400+ this .fsWriter .append (chunkBuffer , 0 , (int ) columnChunkLength );
401+ /*
402+ * Issue #521:
403+ * It is not necessary pad the column chunk here, as additional bytes are already padded before
404+ * writing this column chunk to ensure chunkStartOffset is aligned. For the last column chunk,
405+ * there is no need to ensure its length is aligned. We only need aligned start offsets.
406+ *
407+ * Also, there is no need to update the column chunk length, pixels reader needs the real length
408+ * of the column chunk.
409+ */
410+ columnChunkIndexBuilder .setChunkOffset (chunkStartOffset );
411+ // Issue #192: no need to flush fsWriter as writing has not finished.
376412 }
377413 catch (IOException e )
378414 {
@@ -417,8 +453,8 @@ private void writeRowGroupFooters()
417453
418454 private void writeFileTail ()
419455 {
420- PixelsProto .Footer footer = writeFooter ();
421- PixelsProto .PostScript postScript = writePostScript ();
456+ PixelsProto .Footer footer = buildFileFooter ();
457+ PixelsProto .PostScript postScript = buildPostScript ();
422458
423459 PixelsProto .FileTail fileTail =
424460 PixelsProto .FileTail .newBuilder ()
@@ -447,7 +483,7 @@ private void writeFileTail()
447483 }
448484 }
449485
450- private PixelsProto .Footer writeFooter ()
486+ private PixelsProto .Footer buildFileFooter ()
451487 {
452488 PixelsProto .Footer .Builder footerBuilder =
453489 PixelsProto .Footer .newBuilder ();
@@ -468,7 +504,7 @@ private PixelsProto.Footer writeFooter()
468504 return footerBuilder .build ();
469505 }
470506
471- private PixelsProto .PostScript writePostScript ()
507+ private PixelsProto .PostScript buildPostScript ()
472508 {
473509 return PixelsProto .PostScript .newBuilder ()
474510 .setVersion (Constants .VERSION )
@@ -478,6 +514,8 @@ private PixelsProto.PostScript writePostScript()
478514 .setCompressionBlockSize (compressionBlockSize )
479515 .setPixelStride (pixelStride )
480516 .setWriterTimezone (timeZone .getDisplayName ())
517+ .setPartitioned (false ) // Issue #521: we do not compact partitioned files.
518+ .setColumnChunkAlignment (chunkAlignment )
481519 .setMagic (Constants .MAGIC )
482520 .build ();
483521 }
0 commit comments