Skip to content

Commit 6e53e20

Browse files
committed
GH-3237. Tracking buffer leaks on vector reads.
Delete checksum file so checksum validation is bypassed, which avoids all slicing issues. This makes the test failure "go away" but doesn't address the underlying issue with ChecksumFileSystem subclasses, especially LocalFileSystem.
1 parent da7826c commit 6e53e20

2 files changed

Lines changed: 24 additions & 11 deletions

File tree

parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,7 @@ public boolean isDirect() {
173173
@Override
174174
public void close() throws LeakedByteBufferException {
175175
if (!allocated.isEmpty()) {
176-
allocated.keySet().forEach(key ->
177-
LOG.warn("Unreleased ByteBuffer {}; {}", key.hashCode(), key));
176+
allocated.keySet().forEach(key -> LOG.warn("Unreleased ByteBuffer {}; {}", key.hashCode(), key));
178177
LeakedByteBufferException ex = new LeakedByteBufferException(
179178
allocated.size(), allocated.values().iterator().next());
180179
allocated.clear(); // Drop the references to the ByteBuffers, so they can be gc'd

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import java.util.List;
3636
import java.util.Set;
3737
import org.apache.hadoop.conf.Configuration;
38+
import org.apache.hadoop.fs.FileSystem;
39+
import org.apache.hadoop.fs.LocalFileSystem;
3840
import org.apache.hadoop.fs.Path;
3941
import org.apache.parquet.bytes.HeapByteBufferAllocator;
4042
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
@@ -51,6 +53,8 @@
5153
import org.junit.Test;
5254
import org.junit.runner.RunWith;
5355
import org.junit.runners.Parameterized;
56+
import org.slf4j.Logger;
57+
import org.slf4j.LoggerFactory;
5458

5559
@RunWith(Parameterized.class)
5660
public class TestParquetReader {
@@ -60,6 +64,7 @@ public class TestParquetReader {
6064
private static final Path STATIC_FILE_WITHOUT_COL_INDEXES =
6165
createPathFromCP("/test-file-with-no-column-indexes-1.parquet");
6266
private static final List<PhoneBookWriter.User> DATA = Collections.unmodifiableList(makeUsers(1000));
67+
private static final Logger LOG = LoggerFactory.getLogger(TestParquetReader.class);
6368

6469
private final Path file;
6570
private final boolean vectoredRead;
@@ -102,6 +107,11 @@ public static void deleteFiles() throws IOException {
102107
deleteFile(FILE_V2);
103108
}
104109

110+
@Before
111+
public void setup() throws IOException {
112+
LOG.info("Test run with file {}, size {}; vectored={}", file, fileSize, vectoredRead);
113+
}
114+
105115
private static void deleteFile(Path file) throws IOException {
106116
file.getFileSystem(new Configuration()).delete(file, false);
107117
}
@@ -145,6 +155,10 @@ private static void writePhoneBookToFile(Path file, ParquetProperties.WriterVers
145155
.withPageSize(pageSize)
146156
.withWriterVersion(parquetVersion),
147157
DATA);
158+
// remove the CRC file so that Hadoop local filesystem doesn't slice buffers on
159+
// vector reads.
160+
final LocalFileSystem local = FileSystem.getLocal(new Configuration());
161+
local.delete(local.getChecksumFile(file), false);
148162
}
149163

150164
private List<PhoneBookWriter.User> readUsers(
@@ -188,22 +202,22 @@ public void closeAllocator() {
188202
public void testCurrentRowIndex() throws Exception {
189203
ParquetReader<Group> reader = PhoneBookWriter.createReader(file, FilterCompat.NOOP, allocator);
190204
// Fetch row index without processing any row.
191-
assertEquals(reader.getCurrentRowIndex(), -1);
205+
assertEquals(-1, reader.getCurrentRowIndex());
192206
reader.read();
193-
assertEquals(reader.getCurrentRowIndex(), 0);
207+
assertEquals(0, reader.getCurrentRowIndex());
194208
// calling the same API again and again should return same result.
195-
assertEquals(reader.getCurrentRowIndex(), 0);
209+
assertEquals(0, reader.getCurrentRowIndex());
196210

197211
reader.read();
198-
assertEquals(reader.getCurrentRowIndex(), 1);
199-
assertEquals(reader.getCurrentRowIndex(), 1);
212+
assertEquals(1, reader.getCurrentRowIndex());
213+
assertEquals(1, reader.getCurrentRowIndex());
200214
long expectedCurrentRowIndex = 2L;
201215
while (reader.read() != null) {
202216
assertEquals(reader.getCurrentRowIndex(), expectedCurrentRowIndex);
203217
expectedCurrentRowIndex++;
204218
}
205219
// reader.read() returned null and so reader doesn't have any more rows.
206-
assertEquals(reader.getCurrentRowIndex(), -1);
220+
assertEquals(-1, reader.getCurrentRowIndex());
207221
}
208222

209223
@Test
@@ -223,13 +237,13 @@ public void testSimpleFiltering() throws Exception {
223237
// The readUsers also validates the rowIndex for each returned row.
224238
List<PhoneBookWriter.User> filteredUsers1 =
225239
readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, true);
226-
assertEquals(filteredUsers1.size(), 2L);
240+
assertEquals(2L, filteredUsers1.size());
227241
List<PhoneBookWriter.User> filteredUsers2 =
228242
readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, false);
229-
assertEquals(filteredUsers2.size(), 2L);
243+
assertEquals(2L, filteredUsers2.size());
230244
List<PhoneBookWriter.User> filteredUsers3 =
231245
readUsers(FilterCompat.get(in(longColumn("id"), idSet)), false, false);
232-
assertEquals(filteredUsers3.size(), 1000L);
246+
assertEquals(1000L, filteredUsers3.size());
233247
}
234248

235249
@Test

0 commit comments

Comments
 (0)