Skip to content

Commit 6bdaec5

Browse files
committed
GH-3522: Reduce peak memory during row group flush by eagerly releasing column buffers
During flushToFileWriter(), each column's compressed page buffers are now released immediately after being written to disk, rather than held in memory until the entire row group flush completes. For a schema with N columns, this reduces peak flush memory from ~N columns' worth of compressed pages to ~1 column's worth. Changes: - Add writeAllToAndRelease() to ConcatenatingByteBufferCollector for progressive slab-by-slab memory release during write - Make close() idempotent (safe to call after eager release or multiple times) - Call pageWriter.close() after each column in flushToFileWriter() - Add tests for eager release, double-close safety, and output equivalence
1 parent 53d7842 commit 6bdaec5

3 files changed

Lines changed: 114 additions & 0 deletions

File tree

parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteBufferCollector.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.nio.channels.Channels;
2727
import java.nio.channels.WritableByteChannel;
2828
import java.util.ArrayList;
29+
import java.util.Iterator;
2930
import java.util.List;
3031

3132
/**
@@ -64,10 +65,14 @@ public void collect(BytesInput bytesInput) {
6465

6566
@Override
6667
public void close() {
68+
if (slabs.isEmpty()) {
69+
return;
70+
}
6771
for (ByteBuffer slab : slabs) {
6872
allocator.release(slab);
6973
}
7074
slabs.clear();
75+
size = 0;
7176
}
7277

7378
@Override
@@ -78,6 +83,30 @@ public void writeAllTo(OutputStream out) throws IOException {
7883
}
7984
}
8085

86+
/**
87+
* Writes all collected slabs to the given output stream, releasing each slab's
88+
* {@link ByteBuffer} back to the allocator immediately after it has been written.
89+
* This progressively frees memory during the write rather than holding all slabs
90+
* until {@link #close()} is called.
91+
*
92+
* <p>After this method returns, the collector is empty and {@link #size()} returns 0.
93+
* Calling {@link #close()} afterwards is safe but has no additional effect.
94+
*
95+
* @param out the output stream to write to
96+
* @throws IOException if an I/O error occurs
97+
*/
98+
public void writeAllToAndRelease(OutputStream out) throws IOException {
99+
WritableByteChannel channel = Channels.newChannel(out);
100+
Iterator<ByteBuffer> it = slabs.iterator();
101+
while (it.hasNext()) {
102+
ByteBuffer slab = it.next();
103+
channel.write(slab.duplicate());
104+
allocator.release(slab);
105+
it.remove();
106+
}
107+
size = 0;
108+
}
109+
81110
@Override
82111
public void writeInto(ByteBuffer buffer) {
83112
for (ByteBuffer slab : slabs) {

parquet-common/src/test/java/org/apache/parquet/bytes/TestConcatenatingByteBufferCollector.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,4 +109,84 @@ private static CapacityByteArrayOutputStream capacityByteArrayOutputStream(Strin
109109
}
110110
return cbaos;
111111
}
112+
113+
@Test
114+
public void testWriteAllToAndRelease() throws IOException {
115+
byte[] result;
116+
ConcatenatingByteBufferCollector collector = new ConcatenatingByteBufferCollector(allocator);
117+
collector.collect(BytesInput.from(bytes("Hello")));
118+
collector.collect(BytesInput.from(bytes(" ")));
119+
collector.collect(BytesInput.from(bytes("World")));
120+
121+
Assert.assertEquals(11, collector.size());
122+
123+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
124+
collector.writeAllToAndRelease(baos);
125+
result = baos.toByteArray();
126+
127+
// After writeAllToAndRelease, the collector should be empty
128+
Assert.assertEquals(0, collector.size());
129+
130+
// Verify the data was written correctly
131+
Assert.assertEquals("Hello World", new String(result, StandardCharsets.UTF_8));
132+
133+
// close() after writeAllToAndRelease() should be a safe no-op
134+
collector.close();
135+
}
136+
137+
@Test
138+
public void testDoubleCloseIsSafe() throws IOException {
139+
ConcatenatingByteBufferCollector collector = new ConcatenatingByteBufferCollector(allocator);
140+
collector.collect(BytesInput.from(bytes("test data")));
141+
142+
Assert.assertEquals(9, collector.size());
143+
144+
// First close releases the buffers
145+
collector.close();
146+
Assert.assertEquals(0, collector.size());
147+
148+
// Second close should be a no-op and not throw
149+
collector.close();
150+
}
151+
152+
@Test
153+
public void testCloseOnEmpty() {
154+
// Close on an empty collector should not throw
155+
ConcatenatingByteBufferCollector collector = new ConcatenatingByteBufferCollector(allocator);
156+
collector.close();
157+
collector.close(); // double close on empty
158+
}
159+
160+
@Test
161+
public void testWriteAllToAndReleaseProducesIdenticalOutput() throws IOException {
162+
// Verify that writeAllToAndRelease produces identical output to writeAllTo
163+
byte[] regularResult;
164+
byte[] progressiveResult;
165+
166+
// Use writeAllTo (non-destructive)
167+
try (ConcatenatingByteBufferCollector collector = new ConcatenatingByteBufferCollector(allocator)) {
168+
collector.collect(BytesInput.fromInt(42));
169+
collector.collect(BytesInput.from(bytes("parquet")));
170+
collector.collect(BytesInput.fromInt(99));
171+
172+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
173+
collector.writeAllTo(baos);
174+
regularResult = baos.toByteArray();
175+
}
176+
177+
// Use writeAllToAndRelease (progressive)
178+
ConcatenatingByteBufferCollector collector = new ConcatenatingByteBufferCollector(allocator);
179+
collector.collect(BytesInput.fromInt(42));
180+
collector.collect(BytesInput.from(bytes("parquet")));
181+
collector.collect(BytesInput.fromInt(99));
182+
183+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
184+
collector.writeAllToAndRelease(baos);
185+
progressiveResult = baos.toByteArray();
186+
187+
Assert.assertArrayEquals(regularResult, progressiveResult);
188+
189+
// Already released by writeAllToAndRelease, close is a no-op
190+
collector.close();
191+
}
112192
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,11 @@ public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
692692
for (ColumnDescriptor path : schema.getColumns()) {
693693
ColumnChunkPageWriter pageWriter = writers.get(path);
694694
pageWriter.writeToFileWriter(writer);
695+
// Eagerly release this column's page buffers now that they've been
696+
// written to the file writer. This reduces peak memory during flush
697+
// from the entire compressed row group down to roughly one column's
698+
// worth of compressed pages at a time.
699+
pageWriter.close();
695700
}
696701
}
697702
}

0 commit comments

Comments
 (0)