Skip to content

Commit 60ba10a

Browse files
committed
Add raw sorted filter benchmark
1 parent f672bcc commit 60ba10a

1 file changed

Lines changed: 283 additions & 0 deletions

File tree

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.perf;
20+
21+
import com.google.common.base.Preconditions;
22+
import java.io.File;
23+
import java.util.Collections;
24+
import java.util.EnumSet;
25+
import java.util.List;
26+
import java.util.concurrent.TimeUnit;
27+
import org.apache.commons.io.FileUtils;
28+
import org.apache.pinot.common.request.context.ExpressionContext;
29+
import org.apache.pinot.common.request.context.predicate.EqPredicate;
30+
import org.apache.pinot.common.request.context.predicate.Predicate;
31+
import org.apache.pinot.common.request.context.predicate.RangePredicate;
32+
import org.apache.pinot.core.operator.blocks.FilterBlock;
33+
import org.apache.pinot.core.operator.filter.BaseFilterOperator;
34+
import org.apache.pinot.core.operator.filter.FilterOperatorUtils;
35+
import org.apache.pinot.core.operator.filter.ScanBasedFilterOperator;
36+
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
37+
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
38+
import org.apache.pinot.core.query.request.context.QueryContext;
39+
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
40+
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
41+
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
42+
import org.apache.pinot.segment.spi.Constants;
43+
import org.apache.pinot.segment.spi.IndexSegment;
44+
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
45+
import org.apache.pinot.segment.spi.datasource.DataSource;
46+
import org.apache.pinot.spi.config.table.FieldConfig;
47+
import org.apache.pinot.spi.config.table.TableType;
48+
import org.apache.pinot.spi.data.FieldSpec;
49+
import org.apache.pinot.spi.data.Schema;
50+
import org.apache.pinot.spi.data.readers.GenericRow;
51+
import org.apache.pinot.spi.data.readers.RecordReader;
52+
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
53+
import org.openjdk.jmh.annotations.Benchmark;
54+
import org.openjdk.jmh.annotations.BenchmarkMode;
55+
import org.openjdk.jmh.annotations.Fork;
56+
import org.openjdk.jmh.annotations.Level;
57+
import org.openjdk.jmh.annotations.Measurement;
58+
import org.openjdk.jmh.annotations.Mode;
59+
import org.openjdk.jmh.annotations.OutputTimeUnit;
60+
import org.openjdk.jmh.annotations.Param;
61+
import org.openjdk.jmh.annotations.Scope;
62+
import org.openjdk.jmh.annotations.Setup;
63+
import org.openjdk.jmh.annotations.State;
64+
import org.openjdk.jmh.annotations.TearDown;
65+
import org.openjdk.jmh.annotations.Warmup;
66+
import org.openjdk.jmh.runner.Runner;
67+
import org.openjdk.jmh.runner.options.OptionsBuilder;
68+
69+
70+
/**
71+
* JMH benchmark comparing the raw sorted filter path against the scan fallback on a real immutable segment.
72+
*
73+
* <p>The benchmark builds a raw, single-value, sorted integer column and measures the end-to-end leaf filter cost for
74+
* two cases:
75+
* {@link org.apache.pinot.core.operator.filter.RawSortedIndexBasedFilterOperator} via the default planner path, and
76+
* {@link ScanBasedFilterOperator} by forcing the query context to skip the sorted index.
77+
*/
78+
@BenchmarkMode(Mode.AverageTime)
79+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
80+
@Fork(1)
81+
@Warmup(iterations = 3, time = 1)
82+
@Measurement(iterations = 5, time = 1)
83+
public class BenchmarkRawSortedFilterOperator {
84+
private static final File INDEX_DIR_ROOT =
85+
new File(FileUtils.getTempDirectory(), "BenchmarkRawSortedFilterOperator");
86+
private static final String TABLE_NAME = "rawSortedFilterTable";
87+
private static final String COLUMN_NAME = "rawSortedInt";
88+
private static final String RAW_SORTED_OPERATOR_NAME = "RawSortedIndexBasedFilterOperator";
89+
private static final ExpressionContext COLUMN_EXPRESSION = ExpressionContext.forIdentifier(COLUMN_NAME);
90+
private static final int RUN_LENGTH = 32;
91+
92+
public static void main(String[] args)
93+
throws Exception {
94+
new Runner(new OptionsBuilder().include(BenchmarkRawSortedFilterOperator.class.getSimpleName()).build()).run();
95+
}
96+
97+
@State(Scope.Benchmark)
98+
public static class FilterState {
99+
@Param({"1000000"})
100+
int _numRows;
101+
102+
@Param({"PASS_THROUGH", "SNAPPY"})
103+
String _compressionCodec;
104+
105+
@Param({"EQ", "RANGE_1_PERCENT", "RANGE_10_PERCENT"})
106+
String _predicateShape;
107+
108+
private File _indexDir;
109+
private Schema _schema;
110+
private IndexSegment _segment;
111+
private DataSource _dataSource;
112+
private QueryContext _rawSortedQueryContext;
113+
private QueryContext _scanQueryContext;
114+
private PredicateEvaluator _predicateEvaluator;
115+
private long _expectedChecksum;
116+
117+
@Setup(Level.Trial)
118+
public void setup()
119+
throws Exception {
120+
_indexDir = new File(INDEX_DIR_ROOT, _compressionCodec + "-" + _predicateShape + "-" + _numRows);
121+
FileUtils.deleteQuietly(_indexDir);
122+
FileUtils.forceMkdir(_indexDir);
123+
124+
_schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
125+
.addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT)
126+
.build();
127+
128+
buildSegment();
129+
loadSegment();
130+
131+
Preconditions.checkState(_dataSource.getDataSourceMetadata().isSorted(),
132+
"Benchmark requires a sorted column but metadata was unsorted");
133+
Preconditions.checkState(_dataSource.getDictionary() == null,
134+
"Benchmark requires a raw column but dictionary was present");
135+
_rawSortedQueryContext = new QueryContext.Builder().setTableName(TABLE_NAME).build();
136+
_rawSortedQueryContext.setSchema(_schema);
137+
138+
_scanQueryContext = new QueryContext.Builder().setTableName(TABLE_NAME).build();
139+
_scanQueryContext.setSchema(_schema);
140+
_scanQueryContext.setSkipIndexes(Collections.singletonMap(COLUMN_NAME, EnumSet.of(FieldConfig.IndexType.SORTED)));
141+
142+
Predicate predicate = buildPredicate();
143+
_predicateEvaluator = PredicateEvaluatorProvider.getPredicateEvaluator(predicate, _dataSource,
144+
_rawSortedQueryContext);
145+
146+
BaseFilterOperator rawSortedOperator = createRawSortedOperator();
147+
Preconditions.checkState(RAW_SORTED_OPERATOR_NAME.equals(rawSortedOperator.getClass().getSimpleName()),
148+
"Expected %s but planned %s", RAW_SORTED_OPERATOR_NAME, rawSortedOperator.getClass().getName());
149+
BaseFilterOperator scanOperator = createScanOperator();
150+
Preconditions.checkState(scanOperator instanceof ScanBasedFilterOperator,
151+
"Expected ScanBasedFilterOperator but planned %s", scanOperator.getClass().getName());
152+
153+
_expectedChecksum = consume(rawSortedOperator);
154+
long scanChecksum = consume(scanOperator);
155+
Preconditions.checkState(_expectedChecksum == scanChecksum,
156+
"Raw sorted and scan paths must return identical results. rawSorted=%s scan=%s shape=%s",
157+
_expectedChecksum, scanChecksum, _predicateShape);
158+
}
159+
160+
@TearDown(Level.Trial)
161+
public void tearDown() {
162+
if (_segment != null) {
163+
_segment.destroy();
164+
}
165+
FileUtils.deleteQuietly(_indexDir);
166+
}
167+
168+
BaseFilterOperator createRawSortedOperator() {
169+
return FilterOperatorUtils.getLeafFilterOperator(_rawSortedQueryContext, _predicateEvaluator, _dataSource,
170+
_numRows);
171+
}
172+
173+
BaseFilterOperator createScanOperator() {
174+
return FilterOperatorUtils.getLeafFilterOperator(_scanQueryContext, _predicateEvaluator, _dataSource, _numRows);
175+
}
176+
177+
private void buildSegment()
178+
throws Exception {
179+
FieldConfig fieldConfig = new FieldConfig.Builder(COLUMN_NAME)
180+
.withEncodingType(FieldConfig.EncodingType.RAW)
181+
.withCompressionCodec(FieldConfig.CompressionCodec.valueOf(_compressionCodec))
182+
.build();
183+
SegmentGeneratorConfig config = new SegmentGeneratorConfig(
184+
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
185+
.setNoDictionaryColumns(List.of(COLUMN_NAME))
186+
.setSortedColumn(COLUMN_NAME)
187+
.setFieldConfigList(List.of(fieldConfig))
188+
.build(),
189+
_schema);
190+
config.setOutDir(_indexDir.getAbsolutePath());
191+
config.setTableName(TABLE_NAME);
192+
config.setSegmentName("segment");
193+
194+
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
195+
try (RecordReader recordReader = new GeneratedDataRecordReader(createDataGenerator())) {
196+
driver.init(config, recordReader);
197+
driver.build();
198+
}
199+
}
200+
201+
private void loadSegment()
202+
throws Exception {
203+
_segment = ImmutableSegmentLoader.load(new File(_indexDir, "segment"),
204+
new IndexLoadingConfig(
205+
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
206+
.setNoDictionaryColumns(List.of(COLUMN_NAME))
207+
.setSortedColumn(COLUMN_NAME)
208+
.setFieldConfigList(List.of(new FieldConfig.Builder(COLUMN_NAME)
209+
.withEncodingType(FieldConfig.EncodingType.RAW)
210+
.withCompressionCodec(FieldConfig.CompressionCodec.valueOf(_compressionCodec))
211+
.build()))
212+
.build(),
213+
_schema));
214+
_dataSource = _segment.getDataSource(COLUMN_NAME);
215+
}
216+
217+
private Predicate buildPredicate() {
218+
int numDistinctValues = (_numRows + RUN_LENGTH - 1) / RUN_LENGTH;
219+
int midValue = numDistinctValues / 2;
220+
switch (_predicateShape) {
221+
case "EQ":
222+
return new EqPredicate(COLUMN_EXPRESSION, Integer.toString(midValue));
223+
case "RANGE_1_PERCENT":
224+
return createCenteredRangePredicate(numDistinctValues, 0.01d);
225+
case "RANGE_10_PERCENT":
226+
return createCenteredRangePredicate(numDistinctValues, 0.10d);
227+
default:
228+
throw new IllegalStateException("Unsupported predicate shape: " + _predicateShape);
229+
}
230+
}
231+
232+
private Predicate createCenteredRangePredicate(int numDistinctValues, double matchingFraction) {
233+
int matchingDocs = Math.max(1, (int) Math.round(_numRows * matchingFraction));
234+
int matchingDistinctValues = Math.max(1, (matchingDocs + RUN_LENGTH - 1) / RUN_LENGTH);
235+
int lowerValue = Math.max(0, (numDistinctValues - matchingDistinctValues) / 2);
236+
int upperValue = Math.min(numDistinctValues - 1, lowerValue + matchingDistinctValues - 1);
237+
return new RangePredicate(COLUMN_EXPRESSION, true, Integer.toString(lowerValue), true,
238+
Integer.toString(upperValue), FieldSpec.DataType.INT);
239+
}
240+
241+
private LazyDataGenerator createDataGenerator() {
242+
return new LazyDataGenerator() {
243+
@Override
244+
public int size() {
245+
return _numRows;
246+
}
247+
248+
@Override
249+
public GenericRow next(GenericRow row, int index) {
250+
row.putValue(COLUMN_NAME, index / RUN_LENGTH);
251+
return row;
252+
}
253+
254+
@Override
255+
public void rewind() {
256+
}
257+
};
258+
}
259+
}
260+
261+
@Benchmark
262+
public long rawSortedPath(FilterState state) {
263+
return consume(state.createRawSortedOperator());
264+
}
265+
266+
@Benchmark
267+
public long scanPath(FilterState state) {
268+
return consume(state.createScanOperator());
269+
}
270+
271+
private static long consume(BaseFilterOperator operator) {
272+
FilterBlock filterBlock = operator.nextBlock();
273+
long checksum = 1L;
274+
int numMatches = 0;
275+
int docId;
276+
var iterator = filterBlock.getBlockDocIdSet().iterator();
277+
while ((docId = iterator.next()) != Constants.EOF) {
278+
checksum = checksum * 31 + docId;
279+
numMatches++;
280+
}
281+
return checksum ^ ((long) numMatches << 32);
282+
}
283+
}

0 commit comments

Comments
 (0)