Skip to content

Commit a494af3

Browse files
committed
GH-3558: Properly close buffers
1 parent c7e7acb commit a494af3

3 files changed

Lines changed: 31 additions & 13 deletions

File tree

parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020

2121
import java.nio.ByteBuffer;
2222
import java.util.HashMap;
23+
import java.util.HashSet;
2324
import java.util.Map;
2425
import java.util.Objects;
26+
import java.util.Set;
2527

2628
/**
2729
* A wrapper {@link ByteBufferAllocator} implementation that tracks whether all allocated buffers are released. It
@@ -49,7 +51,11 @@ private static class Key {
4951
private final ByteBuffer buffer;
5052

5153
Key(ByteBuffer buffer) {
52-
hashCode = System.identityHashCode(buffer);
54+
if (!buffer.isDirect() && buffer.hasArray()) {
55+
hashCode = System.identityHashCode(buffer.array());
56+
} else {
57+
hashCode = System.identityHashCode(buffer);
58+
}
5359
this.buffer = buffer;
5460
}
5561

@@ -62,6 +68,9 @@ public boolean equals(Object o) {
6268
return false;
6369
}
6470
Key key = (Key) o;
71+
if (!buffer.isDirect() && buffer.hasArray() && !key.buffer.isDirect() && key.buffer.hasArray()) {
72+
return buffer.array() == key.buffer.array();
73+
}
6574
return this.buffer == key.buffer;
6675
}
6776

@@ -124,6 +133,7 @@ private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceExcep
124133
}
125134

126135
private final Map<Key, ByteBufferAllocationStacktraceException> allocated = new HashMap<>();
136+
private final Set<Object> releasedArrays = new HashSet<>();
127137
private final ByteBufferAllocator allocator;
128138

129139
private TrackingByteBufferAllocator(ByteBufferAllocator allocator) {
@@ -140,12 +150,19 @@ public ByteBuffer allocate(int size) {
140150
@Override
141151
public void release(ByteBuffer b) throws ReleasingUnallocatedByteBufferException {
142152
Objects.requireNonNull(b);
143-
if (allocated.remove(new Key(b)) == null) {
144-
throw new ReleasingUnallocatedByteBufferException();
153+
if (allocated.remove(new Key(b)) != null) {
154+
allocator.release(b);
155+
if (!b.isDirect() && b.hasArray()) {
156+
releasedArrays.add(b.array());
157+
}
158+
b.clear();
159+
return;
160+
}
161+
if (!b.isDirect() && b.hasArray() && releasedArrays.contains(b.array())) {
162+
b.clear();
163+
return;
145164
}
146-
allocator.release(b);
147-
// Clearing the buffer so subsequent access would probably generate errors
148-
b.clear();
165+
throw new ReleasingUnallocatedByteBufferException();
149166
}
150167

151168
@Override
@@ -154,12 +171,12 @@ public boolean isDirect() {
154171
}
155172

156173
@Override
157-
public void close() throws LeakedByteBufferException {
158-
if (!allocated.isEmpty()) {
159-
LeakedByteBufferException ex = new LeakedByteBufferException(
160-
allocated.size(), allocated.values().iterator().next());
161-
allocated.clear(); // Drop the references to the ByteBuffers, so they can be gc'd
162-
throw ex;
174+
public void close() {
175+
// Release all remaining buffers through the underlying allocator
176+
// so they are properly freed (e.g. direct memory cleanup).
177+
for (Key key : allocated.keySet()) {
178+
allocator.release(key.buffer);
163179
}
180+
allocated.clear();
164181
}
165182
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2327,6 +2327,7 @@ public void readFromVectoredRange(ParquetFileRange currRange, ChunkListBuilder b
23272327
LOG.error(error, e);
23282328
throw new IOException(error, e);
23292329
}
2330+
builder.addBuffersToRelease(Collections.singletonList(buffer));
23302331
ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer);
23312332
for (ChunkDescriptor descriptor : chunks) {
23322333
builder.add(descriptor, stream.sliceBuffers(descriptor.size), f);

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@
8383
<spotless.version>2.46.1</spotless.version>
8484
<shade.prefix>shaded.parquet</shade.prefix>
8585
<!-- Guarantees no newer classes/methods/constants are used by parquet. -->
86-
<hadoop.version>3.3.0</hadoop.version>
86+
<hadoop.version>3.4.0</hadoop.version>
8787
<parquet.format.version>2.12.0</parquet.format.version>
8888
<previous.version>1.17.0</previous.version>
8989
<thrift.executable>thrift</thrift.executable>

0 commit comments

Comments
 (0)