Skip to content

Commit 2686e85

Browse files
GH-3411: Expose row group index (#3412)
* add getCurrentRowGroupIndex method to Parquet readers * Format with `mvn spotless:apply`
1 parent 01b96de commit 2686e85

5 files changed

Lines changed: 68 additions & 0 deletions

File tree

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,14 @@ private static <K, V> Map<K, Set<V>> toSetMultiMap(Map<K, V> map) {
294294
return Collections.unmodifiableMap(setMultiMap);
295295
}
296296

297+
/**
298+
* Returns the 0-based index of the row group currently being read. Returns -1 if no row group
299+
* has been read yet.
300+
*/
301+
public int getCurrentRowGroupIndex() {
302+
return currentBlock;
303+
}
304+
297305
/**
298306
* Returns the row index of the current row. If no row has been processed or if the
299307
* row index information is unavailable from the underlying @{@link PageReadStore}, returns -1.

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,6 +1097,14 @@ public List<BlockMetaData> getRowGroups() {
10971097
return blocks;
10981098
}
10991099

1100+
/**
1101+
* Returns the 0-based index of the row group that was last read via {@link #readNextRowGroup()}
1102+
* or {@link #readNextFilteredRowGroup()}. Returns -1 if no row group has been read yet.
1103+
*/
1104+
public int getCurrentRowGroupIndex() {
1105+
return currentBlock - 1;
1106+
}
1107+
11001108
public void setRequestedSchema(List<ColumnDescriptor> columns) {
11011109
paths.clear();
11021110
for (ColumnDescriptor col : columns) {

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,17 @@ public T read() throws IOException {
144144
}
145145
}
146146

147+
/**
148+
* @return the 0-based index of the row group currently being read. If no row group has been
149+
* read yet, returns -1.
150+
*/
151+
public int getCurrentRowGroupIndex() {
152+
if (reader == null) {
153+
return -1;
154+
}
155+
return reader.getCurrentRowGroupIndex();
156+
}
157+
147158
/**
148159
* @return the row index of the last read row. If no row has been processed, returns -1.
149160
*/

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,14 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
207207
return internalReader.nextKeyValue();
208208
}
209209

210+
/**
211+
* @return the 0-based index of the row group currently being read. If no row group has been
212+
* read yet, returns -1.
213+
*/
214+
public int getCurrentRowGroupIndex() {
215+
return internalReader.getCurrentRowGroupIndex();
216+
}
217+
210218
/**
211219
* @return the row index of the current row. If no row has been processed, returns -1.
212220
*/

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
2323
import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
2424
import static org.junit.Assert.assertEquals;
25+
import static org.junit.Assert.assertTrue;
2526

2627
import java.io.IOException;
2728
import java.net.URISyntaxException;
@@ -201,6 +202,38 @@ public void testCurrentRowIndex() throws Exception {
201202
assertEquals(reader.getCurrentRowIndex(), -1);
202203
}
203204

205+
@Test
206+
public void testCurrentRowGroupIndex() throws Exception {
207+
int expectedRowGroups;
208+
try (ParquetFileReader fileReader =
209+
ParquetFileReader.open(HadoopInputFile.fromPath(file, new Configuration()))) {
210+
expectedRowGroups = fileReader.getRowGroups().size();
211+
}
212+
assertTrue("expected multiple row groups for this test", expectedRowGroups > 1);
213+
214+
try (ParquetReader<Group> reader = PhoneBookWriter.createReader(file, FilterCompat.NOOP, allocator)) {
215+
// before reading anything, returns -1
216+
assertEquals(-1, reader.getCurrentRowGroupIndex());
217+
218+
reader.read();
219+
assertEquals(0, reader.getCurrentRowGroupIndex());
220+
// idempotent
221+
assertEquals(0, reader.getCurrentRowGroupIndex());
222+
223+
int prevIdx = 0;
224+
while (reader.read() != null) {
225+
int idx = reader.getCurrentRowGroupIndex();
226+
assertTrue(idx >= prevIdx);
227+
assertTrue(idx <= prevIdx + 1);
228+
prevIdx = idx;
229+
}
230+
// last row group seen should be the final one
231+
assertEquals(expectedRowGroups - 1, prevIdx);
232+
// after exhaustion, returns -1
233+
assertEquals(-1, reader.getCurrentRowGroupIndex());
234+
}
235+
}
236+
204237
@Test
205238
public void testRangeFiltering() throws Exception {
206239
// The readUsers also validates the rowIndex for each returned row.

0 commit comments

Comments
 (0)