Skip to content

Commit 9428487

Browse files
authored
GH-3484: Eliminate per-page heap allocation for CRC32 (#3485)
* GH-3484 Eliminate per-page heap allocation for CRC32 checksums when using direct `ByteBufferAllocator` Why this is safe - CRC32.update(ByteBuffer) exists since Java 9, processes bytes from position to limit, advancing position. - toByteBuffer(releaser) returns either a slice() of the internal buffer (independent position) or a freshly allocated copy. Either way, the original BytesInput is unaffected for the subsequent buf.collect() call, because ByteBufferBytesInput.writeInto() uses buffer.duplicate(). - When the allocator is direct, toByteBuffer(releaser) returns the direct buffer directly -- zero heap copy. When the allocator is heap-based, behavior is functionally equivalent to the old toByteArray() path. - The releaser field already exists on ColumnChunkPageWriter (line 124) and manages buffer lifecycle. * GH-3484 Release CRC32 ByteBuffers per page to avoid leak across column chunk The previous change reused the ColumnChunkPageWriter's long-lived ByteBufferReleaser for the CRC32 update buffers. That releaser only closes when the column chunk writer closes (end of row group), so every per-page allocation made by BytesInput.toByteBuffer(releaser) accumulated until then. With many pages in a single large column chunk this exhausted the heap (TestLargeColumnChunk OOM in CI). Scope each CRC update to a try-with-resources ByteBufferReleaser so allocated buffers are returned to the allocator immediately after crc.update(). writePageV2 shares one per-page releaser across the three updates.
1 parent 3e05546 commit 9428487

1 file changed

Lines changed: 15 additions & 9 deletions

File tree

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ private static final class ColumnChunkPageWriter implements PageWriter, BloomFil
9494
private Statistics totalStatistics;
9595
private final SizeStatistics totalSizeStatistics;
9696
private final GeospatialStatistics totalGeospatialStatistics;
97+
private final ByteBufferAllocator allocator;
9798
private final ByteBufferReleaser releaser;
9899

99100
private final CRC32 crc;
@@ -121,6 +122,7 @@ private ColumnChunkPageWriter(
121122
int columnOrdinal) {
122123
this.path = path;
123124
this.compressor = compressor;
125+
this.allocator = allocator;
124126
this.releaser = new ByteBufferReleaser(allocator);
125127
this.buf = new ConcatenatingByteBufferCollector(allocator);
126128
this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength);
@@ -217,7 +219,9 @@ public void writePage(
217219
}
218220
if (pageWriteChecksumEnabled) {
219221
crc.reset();
220-
crc.update(compressedBytes.toByteArray());
222+
try (ByteBufferReleaser pageReleaser = new ByteBufferReleaser(allocator)) {
223+
crc.update(compressedBytes.toByteBuffer(pageReleaser));
224+
}
221225
parquetMetadataConverter.writeDataPageV1Header(
222226
(int) uncompressedSize,
223227
(int) compressedSize,
@@ -321,14 +325,16 @@ public void writePageV2(
321325
}
322326
if (pageWriteChecksumEnabled) {
323327
crc.reset();
324-
if (repetitionLevels.size() > 0) {
325-
crc.update(repetitionLevels.toByteArray());
326-
}
327-
if (definitionLevels.size() > 0) {
328-
crc.update(definitionLevels.toByteArray());
329-
}
330-
if (compressedData.size() > 0) {
331-
crc.update(compressedData.toByteArray());
328+
try (ByteBufferReleaser pageReleaser = new ByteBufferReleaser(allocator)) {
329+
if (repetitionLevels.size() > 0) {
330+
crc.update(repetitionLevels.toByteBuffer(pageReleaser));
331+
}
332+
if (definitionLevels.size() > 0) {
333+
crc.update(definitionLevels.toByteBuffer(pageReleaser));
334+
}
335+
if (compressedData.size() > 0) {
336+
crc.update(compressedData.toByteBuffer(pageReleaser));
337+
}
332338
}
333339
parquetMetadataConverter.writeDataPageV2Header(
334340
uncompressedSize,

0 commit comments

Comments
 (0)