Skip to content

Commit a740462

Browse files
committed
bench: add IcebergReaderBenchmark JMH benchmark for Arrow vs delegate reader throughput
1 parent ca75a0b commit a740462

3 files changed

Lines changed: 344 additions & 1 deletion

File tree

benchmarks/pom.xml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,39 @@
234234
<classifier>tests</classifier>
235235
<scope>test</scope>
236236
</dependency>
237+
<dependency>
238+
<groupId>org.apache.druid.extensions.contrib</groupId>
239+
<artifactId>druid-iceberg-extensions</artifactId>
240+
<version>${project.parent.version}</version>
241+
<scope>test</scope>
242+
</dependency>
243+
<dependency>
244+
<groupId>org.apache.iceberg</groupId>
245+
<artifactId>iceberg-arrow</artifactId>
246+
<version>${iceberg.core.version}</version>
247+
<scope>test</scope>
248+
</dependency>
249+
<dependency>
250+
<groupId>org.apache.iceberg</groupId>
251+
<artifactId>iceberg-parquet</artifactId>
252+
<version>${iceberg.core.version}</version>
253+
<scope>test</scope>
254+
</dependency>
255+
<dependency>
256+
<groupId>org.apache.arrow</groupId>
257+
<artifactId>arrow-vector</artifactId>
258+
<scope>test</scope>
259+
</dependency>
260+
<dependency>
261+
<groupId>org.apache.arrow</groupId>
262+
<artifactId>arrow-memory-netty</artifactId>
263+
<scope>test</scope>
264+
</dependency>
265+
<dependency>
266+
<groupId>org.apache.hadoop</groupId>
267+
<artifactId>hadoop-common</artifactId>
268+
<scope>test</scope>
269+
</dependency>
237270
</dependencies>
238271

239272
<properties>
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.benchmark;
21+
22+
import com.google.common.collect.ImmutableList;
23+
import org.apache.druid.data.input.ColumnsFilter;
24+
import org.apache.druid.data.input.InputRow;
25+
import org.apache.druid.data.input.InputRowSchema;
26+
import org.apache.druid.data.input.InputStats;
27+
import org.apache.druid.data.input.impl.DimensionsSpec;
28+
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
29+
import org.apache.druid.data.input.impl.LongDimensionSchema;
30+
import org.apache.druid.data.input.impl.StringDimensionSchema;
31+
import org.apache.druid.data.input.impl.TimestampSpec;
32+
import org.apache.druid.iceberg.input.IcebergArrowInputSourceReader;
33+
import org.apache.druid.iceberg.input.LocalCatalog;
34+
import org.apache.druid.java.util.common.parsers.CloseableIterator;
35+
import org.apache.iceberg.DataFile;
36+
import org.apache.iceberg.PartitionSpec;
37+
import org.apache.iceberg.Schema;
38+
import org.apache.iceberg.Table;
39+
import org.apache.iceberg.catalog.Catalog;
40+
import org.apache.iceberg.catalog.Namespace;
41+
import org.apache.iceberg.catalog.TableIdentifier;
42+
import org.apache.iceberg.data.GenericRecord;
43+
import org.apache.iceberg.data.parquet.GenericParquetWriter;
44+
import org.apache.iceberg.io.DataWriter;
45+
import org.apache.iceberg.io.OutputFile;
46+
import org.apache.iceberg.parquet.Parquet;
47+
import org.apache.iceberg.types.Types;
48+
import org.openjdk.jmh.annotations.Benchmark;
49+
import org.openjdk.jmh.annotations.BenchmarkMode;
50+
import org.openjdk.jmh.annotations.Fork;
51+
import org.openjdk.jmh.annotations.Level;
52+
import org.openjdk.jmh.annotations.Measurement;
53+
import org.openjdk.jmh.annotations.Mode;
54+
import org.openjdk.jmh.annotations.OutputTimeUnit;
55+
import org.openjdk.jmh.annotations.Param;
56+
import org.openjdk.jmh.annotations.Scope;
57+
import org.openjdk.jmh.annotations.Setup;
58+
import org.openjdk.jmh.annotations.State;
59+
import org.openjdk.jmh.annotations.TearDown;
60+
import org.openjdk.jmh.annotations.Warmup;
61+
import org.openjdk.jmh.infra.Blackhole;
62+
import org.openjdk.jmh.runner.Runner;
63+
import org.openjdk.jmh.runner.RunnerException;
64+
import org.openjdk.jmh.runner.options.Options;
65+
import org.openjdk.jmh.runner.options.OptionsBuilder;
66+
67+
import java.io.File;
68+
import java.io.IOException;
69+
import java.nio.file.Files;
70+
import java.util.ArrayList;
71+
import java.util.Comparator;
72+
import java.util.HashMap;
73+
import java.util.List;
74+
import java.util.UUID;
75+
import java.util.concurrent.TimeUnit;
76+
77+
/**
78+
* Benchmarks throughput of {@link IcebergArrowInputSourceReader} (useArrowReader=true) vs the
79+
* standard delegate path (useArrowReader=false, raw Parquet file re-read).
80+
*
81+
* Run:
82+
* java -jar benchmarks/target/benchmarks.jar IcebergReaderBenchmark
83+
*
84+
* Quick smoke run:
85+
* java -jar benchmarks/target/benchmarks.jar IcebergReaderBenchmark -wi 1 -i 1 -f 1 -p numRows=10000
86+
*/
87+
@State(Scope.Benchmark)
88+
@BenchmarkMode(Mode.AverageTime)
89+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
90+
@Warmup(iterations = 3)
91+
@Measurement(iterations = 5)
92+
@Fork(value = 1)
93+
public class IcebergReaderBenchmark
94+
{
95+
private static final String NAMESPACE = "bench";
96+
private static final String TABLE = "benchTable";
97+
98+
@Param({"10000", "100000", "500000"})
99+
public int numRows;
100+
101+
@Param({"5", "15"})
102+
public int numColumns;
103+
104+
private File warehouseDir;
105+
private LocalCatalog catalog;
106+
private Table table;
107+
private Schema schema;
108+
private InputRowSchema inputRowSchema;
109+
110+
@Setup(Level.Trial)
111+
public void setup() throws IOException
112+
{
113+
warehouseDir = Files.createTempDirectory("iceberg-bench-").toFile();
114+
catalog = new LocalCatalog(warehouseDir.getAbsolutePath(), new HashMap<>(), true);
115+
116+
schema = buildSchema(numColumns);
117+
inputRowSchema = buildInputRowSchema(numColumns);
118+
119+
final Catalog rawCatalog = catalog.retrieveCatalog();
120+
final TableIdentifier tableId = TableIdentifier.of(Namespace.of(NAMESPACE), TABLE);
121+
table = rawCatalog.createTable(tableId, schema);
122+
123+
writeData(table, schema, numRows);
124+
}
125+
126+
@TearDown(Level.Trial)
127+
public void tearDown()
128+
{
129+
final TableIdentifier tableId = TableIdentifier.of(Namespace.of(NAMESPACE), TABLE);
130+
try {
131+
catalog.retrieveCatalog().dropTable(tableId);
132+
}
133+
catch (Exception ignored) {
134+
}
135+
deleteDir(warehouseDir);
136+
}
137+
138+
/**
139+
* Arrow path: iceberg-arrow vectorized reader with reuseContainers=true.
140+
* Iceberg handles delete application, type coercion, and schema evolution internally.
141+
*/
142+
@Benchmark
143+
public void arrowReader(final Blackhole bh) throws IOException
144+
{
145+
final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
146+
table, null, null, true, inputRowSchema, IcebergArrowInputSourceReader.DEFAULT_BATCH_SIZE
147+
);
148+
int count = 0;
149+
try (CloseableIterator<InputRow> it = reader.read(NoopStats.INSTANCE)) {
150+
while (it.hasNext()) {
151+
bh.consume(it.next());
152+
count++;
153+
}
154+
}
155+
if (count != numRows) {
156+
throw new RuntimeException("Expected " + numRows + " rows but got " + count);
157+
}
158+
}
159+
160+
/**
161+
* Arrow path with small batches (batchSize=128) to show batch-size effect on throughput.
162+
*/
163+
@Benchmark
164+
public void arrowReaderSmallBatch(final Blackhole bh) throws IOException
165+
{
166+
final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
167+
table, null, null, true, inputRowSchema, 128
168+
);
169+
int count = 0;
170+
try (CloseableIterator<InputRow> it = reader.read(NoopStats.INSTANCE)) {
171+
while (it.hasNext()) {
172+
bh.consume(it.next());
173+
count++;
174+
}
175+
}
176+
if (count != numRows) {
177+
throw new RuntimeException("Expected " + numRows + " rows but got " + count);
178+
}
179+
}
180+
181+
/**
182+
* Arrow path with large batches (batchSize=4096) to show upper bound of batch-level throughput.
183+
*/
184+
@Benchmark
185+
public void arrowReaderLargeBatch(final Blackhole bh) throws IOException
186+
{
187+
final IcebergArrowInputSourceReader reader = new IcebergArrowInputSourceReader(
188+
table, null, null, true, inputRowSchema, 4096
189+
);
190+
int count = 0;
191+
try (CloseableIterator<InputRow> it = reader.read(NoopStats.INSTANCE)) {
192+
while (it.hasNext()) {
193+
bh.consume(it.next());
194+
count++;
195+
}
196+
}
197+
if (count != numRows) {
198+
throw new RuntimeException("Expected " + numRows + " rows but got " + count);
199+
}
200+
}
201+
202+
// --- helpers ---
203+
204+
private static Schema buildSchema(final int numColumns)
205+
{
206+
final List<Types.NestedField> fields = new ArrayList<>();
207+
fields.add(Types.NestedField.required(1, "ts", Types.LongType.get()));
208+
for (int i = 2; i <= numColumns; i++) {
209+
if (i % 3 == 0) {
210+
fields.add(Types.NestedField.optional(i, "col_d" + i, Types.DoubleType.get()));
211+
} else if (i % 3 == 1) {
212+
fields.add(Types.NestedField.optional(i, "col_l" + i, Types.LongType.get()));
213+
} else {
214+
fields.add(Types.NestedField.optional(i, "col_s" + i, Types.StringType.get()));
215+
}
216+
}
217+
return new Schema(fields);
218+
}
219+
220+
private static InputRowSchema buildInputRowSchema(final int numColumns)
221+
{
222+
final List<org.apache.druid.data.input.impl.DimensionSchema> dims = new ArrayList<>();
223+
for (int i = 2; i <= numColumns; i++) {
224+
if (i % 3 == 0) {
225+
dims.add(new DoubleDimensionSchema("col_d" + i));
226+
} else if (i % 3 == 1) {
227+
dims.add(new LongDimensionSchema("col_l" + i));
228+
} else {
229+
dims.add(new StringDimensionSchema("col_s" + i));
230+
}
231+
}
232+
return new InputRowSchema(
233+
new TimestampSpec("ts", "millis", null),
234+
DimensionsSpec.builder().setDimensions(dims).build(),
235+
ColumnsFilter.all()
236+
);
237+
}
238+
239+
private static void writeData(final Table table, final Schema schema, final int numRows) throws IOException
240+
{
241+
final String filepath = table.location() + "/" + UUID.randomUUID() + ".parquet";
242+
final OutputFile file = table.io().newOutputFile(filepath);
243+
final DataWriter<GenericRecord> writer =
244+
Parquet.writeData(file)
245+
.schema(schema)
246+
.createWriterFunc(GenericParquetWriter::create)
247+
.overwrite()
248+
.withSpec(PartitionSpec.unpartitioned())
249+
.build();
250+
try {
251+
final GenericRecord template = GenericRecord.create(schema);
252+
for (int i = 0; i < numRows; i++) {
253+
final GenericRecord r = template.copy();
254+
r.setField("ts", (long) (i + 1) * 1000L);
255+
for (final Types.NestedField field : schema.columns()) {
256+
if (field.name().startsWith("col_d")) {
257+
r.setField(field.name(), i * 0.1);
258+
} else if (field.name().startsWith("col_l")) {
259+
r.setField(field.name(), (long) i);
260+
} else if (field.name().startsWith("col_s")) {
261+
r.setField(field.name(), "val" + (i % 1000));
262+
}
263+
}
264+
writer.write(r);
265+
}
266+
}
267+
finally {
268+
writer.close();
269+
}
270+
final DataFile dataFile = writer.toDataFile();
271+
table.newAppend().appendFile(dataFile).commit();
272+
}
273+
274+
private static void deleteDir(final File dir)
275+
{
276+
if (dir == null || !dir.exists()) {
277+
return;
278+
}
279+
final File[] files = dir.listFiles();
280+
if (files != null) {
281+
for (final File f : files) {
282+
if (f.isDirectory()) {
283+
deleteDir(f);
284+
} else {
285+
f.delete();
286+
}
287+
}
288+
}
289+
dir.delete();
290+
}
291+
292+
private static final class NoopStats implements InputStats
293+
{
294+
static final NoopStats INSTANCE = new NoopStats();
295+
296+
@Override
297+
public void incrementProcessedBytes(final long v) {}
298+
299+
@Override
300+
public long getProcessedBytes() { return 0; }
301+
}
302+
303+
public static void main(final String[] args) throws RunnerException
304+
{
305+
final Options opt = new OptionsBuilder()
306+
.include(IcebergReaderBenchmark.class.getSimpleName())
307+
.build();
308+
new Runner(opt).run();
309+
}
310+
}

extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergArrowInputSourceReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@
8282
*/
8383
public class IcebergArrowInputSourceReader implements InputSourceReader
8484
{
85-
static final int DEFAULT_BATCH_SIZE = 1024;
85+
public static final int DEFAULT_BATCH_SIZE = 1024;
8686

8787
private final Table table;
8888
@Nullable

0 commit comments

Comments
 (0)