Skip to content

Commit 274dc51

Browse files
authored
PARQUET-2432: Use ByteBufferAllocator over hardcoded heap allocation (#1278)
* PARQUET-2432: Use ByteBufferAllocator over hardcoded heap allocation * Updated BytesInput implementations to rely on a ByteBufferAllocator instance for allocating/releasing ByteBuffer objects. * Extend the usage of a ByteBufferAllocator instead of the hardcoded usage of heap (e.g. byte[], ByteBuffer.allocate etc.) * parquet-cli related code parts including ParquetRewriter and tests are not changed in this effort * Reuse temporary ByteBuffer instead of keep allocating/releasing
1 parent d839608 commit 274dc51

34 files changed

Lines changed: 1549 additions & 159 deletions

parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
* Container which can construct writers for multiple columns to be stored
2323
* together.
2424
*/
25-
public interface ColumnWriteStore {
25+
public interface ColumnWriteStore extends AutoCloseable {
2626
/**
2727
* @param path the column for which to create a writer
2828
* @return the column writer for the given column
@@ -63,6 +63,7 @@ public interface ColumnWriteStore {
6363
/**
6464
* Close the related output stream and release any resources
6565
*/
66+
@Override
6667
public abstract void close();
6768

6869
/**

parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
/**
2424
* writer for (repetition level, definition level, values) triplets
2525
*/
26-
public interface ColumnWriter {
26+
public interface ColumnWriter extends AutoCloseable {
2727

2828
/**
2929
* writes the current value
@@ -91,6 +91,7 @@ public interface ColumnWriter {
9191
* Close the underlying store. This should be called when there are no
9292
* more data to be written.
9393
*/
94+
@Override
9495
void close();
9596

9697
/**

parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPageReadStore.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
/**
2424
* Interface to read dictionary pages for all the columns of a row group
2525
*/
26-
public interface DictionaryPageReadStore {
26+
public interface DictionaryPageReadStore extends AutoCloseable {
2727

2828
/**
2929
* Returns a {@link DictionaryPage} for the given column descriptor.
@@ -33,4 +33,9 @@ public interface DictionaryPageReadStore {
3333
* @return the DictionaryPage for that column, or null if there isn't one
3434
*/
3535
DictionaryPage readDictionaryPage(ColumnDescriptor descriptor);
36+
37+
@Override
38+
default void close() {
39+
// No-op default implementation for compatibility
40+
}
3641
}

parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,16 @@
2323
/**
2424
* contains all the writers for the columns in the corresponding row group
2525
*/
26-
public interface PageWriteStore {
26+
public interface PageWriteStore extends AutoCloseable {
2727

2828
/**
2929
* @param path the descriptor for the column
3030
* @return the corresponding page writer
3131
*/
3232
PageWriter getPageWriter(ColumnDescriptor path);
33+
34+
@Override
35+
default void close() {
36+
// No-op default implementation for compatibility
37+
}
3338
}

parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
/**
2727
* a writer for all the pages of a given column chunk
2828
*/
29-
public interface PageWriter {
29+
public interface PageWriter extends AutoCloseable {
3030

3131
/**
3232
* writes a single page
@@ -120,4 +120,9 @@ void writePageV2(
120120
* @return a string presenting a summary of how memory is used
121121
*/
122122
String memUsageString(String prefix);
123+
124+
@Override
125+
default void close() {
126+
// No-op default implementation for compatibility
127+
}
123128
}

parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,17 @@
2424
/**
2525
* Contains all writers for all columns of a row group
2626
*/
27-
public interface BloomFilterWriteStore {
27+
public interface BloomFilterWriteStore extends AutoCloseable {
2828
/**
2929
* Get bloom filter writer of a column
3030
*
3131
* @param path the descriptor for the column
3232
* @return the corresponding Bloom filter writer
3333
*/
3434
BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path);
35+
36+
@Override
37+
default void close() {
38+
// No-op default implementation for compatibility
39+
}
3540
}

parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,16 @@
1919

2020
package org.apache.parquet.column.values.bloomfilter;
2121

22-
public interface BloomFilterWriter {
22+
public interface BloomFilterWriter extends AutoCloseable {
2323
/**
2424
* Write a Bloom filter
2525
*
2626
* @param bloomFilter the Bloom filter to write
2727
*/
2828
void writeBloomFilter(BloomFilter bloomFilter);
29+
30+
@Override
31+
default void close() {
32+
// No-op default implementation for compatibility
33+
}
2934
}

parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ public long getRowCount() {
7070
return rowCount;
7171
}
7272

73+
@Override
74+
public void close() {
75+
// no-op
76+
}
77+
7378
public void addRowCount(long count) {
7479
rowCount += count;
7580
}

parquet-common/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@
6161
<version>${slf4j.version}</version>
6262
<scope>test</scope>
6363
</dependency>
64+
65+
<dependency>
66+
<groupId>org.mockito</groupId>
67+
<artifactId>mockito-all</artifactId>
68+
<version>${mockito.version}</version>
69+
<scope>test</scope>
70+
</dependency>
6471
</dependencies>
6572

6673
<build>
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.bytes;
20+
21+
import java.nio.ByteBuffer;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
25+
/**
26+
* Convenient class for releasing {@link java.nio.ByteBuffer} objects with the corresponding allocator;
27+
*/
28+
public class ByteBufferReleaser implements AutoCloseable {
29+
30+
final ByteBufferAllocator allocator;
31+
private final List<ByteBuffer> toRelease = new ArrayList<>();
32+
33+
/**
34+
* Constructs a new {@link ByteBufferReleaser} instance with the specified {@link ByteBufferAllocator} to be used for
35+
* releasing the buffers in {@link #close()}.
36+
*
37+
* @param allocator the allocator to be used for releasing the buffers
38+
* @see #releaseLater(ByteBuffer)
39+
* @see #close()
40+
*/
41+
public ByteBufferReleaser(ByteBufferAllocator allocator) {
42+
this.allocator = allocator;
43+
}
44+
45+
/**
46+
* Adds a {@link ByteBuffer} object to the list of buffers to be released at {@link #close()}. The specified buffer
47+
* shall be one that was allocated by the {@link ByteBufferAllocator} of this object.
48+
*
49+
* @param buffer the buffer to be released
50+
*/
51+
public void releaseLater(ByteBuffer buffer) {
52+
toRelease.add(buffer);
53+
}
54+
55+
@Override
56+
public void close() {
57+
for (ByteBuffer buf : toRelease) {
58+
allocator.release(buf);
59+
}
60+
toRelease.clear();
61+
}
62+
}

0 commit comments

Comments
 (0)