|
28 | 28 | import org.apache.druid.data.input.impl.DoubleDimensionSchema; |
29 | 29 | import org.apache.druid.data.input.impl.LongDimensionSchema; |
30 | 30 | import org.apache.druid.data.input.impl.StringDimensionSchema; |
| 31 | +import org.apache.druid.data.input.impl.LocalInputSourceFactory; |
31 | 32 | import org.apache.druid.data.input.impl.TimestampSpec; |
| 33 | +import org.apache.druid.data.input.InputSourceReader; |
| 34 | +import org.apache.druid.data.input.parquet.ParquetInputFormat; |
32 | 35 | import org.apache.druid.iceberg.input.IcebergArrowInputSourceReader; |
| 36 | +import org.apache.druid.iceberg.input.IcebergInputSource; |
33 | 37 | import org.apache.druid.iceberg.input.LocalCatalog; |
34 | 38 | import org.apache.druid.java.util.common.parsers.CloseableIterator; |
| 39 | +import org.apache.hadoop.conf.Configuration; |
35 | 40 | import org.apache.iceberg.DataFile; |
36 | 41 | import org.apache.iceberg.PartitionSpec; |
37 | 42 | import org.apache.iceberg.Schema; |
@@ -199,6 +204,40 @@ public void arrowReaderLargeBatch(final Blackhole bh) throws IOException |
199 | 204 | } |
200 | 205 | } |
201 | 206 |
|
| 207 | + /** |
| 208 | + * Existing path-based reader (current production behaviour when useArrowReader=false): |
| 209 | + * IcebergCatalog extracts data-file paths from the snapshot, then a LocalInputSource + |
| 210 | + * ParquetInputFormat re-opens and re-parses each Parquet file generically. |
| 211 | + * No delete-file awareness, no scan-level column projection, no schema evolution. |
| 212 | + */ |
| 213 | + @Benchmark |
| 214 | + public void pathBasedReader(final Blackhole bh) throws IOException |
| 215 | + { |
| 216 | + final IcebergInputSource source = new IcebergInputSource( |
| 217 | + TABLE, |
| 218 | + NAMESPACE, |
| 219 | + null, |
| 220 | + catalog, |
| 221 | + new LocalInputSourceFactory(), |
| 222 | + null, |
| 223 | + null, |
| 224 | + false, |
| 225 | + 0 |
| 226 | + ); |
| 227 | + final ParquetInputFormat parquetFormat = new ParquetInputFormat(null, null, new Configuration()); |
| 228 | + final InputSourceReader reader = source.reader(inputRowSchema, parquetFormat, warehouseDir); |
| 229 | + int count = 0; |
| 230 | + try (CloseableIterator<InputRow> it = reader.read(NoopStats.INSTANCE)) { |
| 231 | + while (it.hasNext()) { |
| 232 | + bh.consume(it.next()); |
| 233 | + count++; |
| 234 | + } |
| 235 | + } |
| 236 | + if (count != numRows) { |
| 237 | + throw new RuntimeException("Expected " + numRows + " rows but got " + count); |
| 238 | + } |
| 239 | + } |
| 240 | + |
202 | 241 | // --- helpers --- |
203 | 242 |
|
204 | 243 | private static Schema buildSchema(final int numColumns) |
|
0 commit comments