diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java index ee2ed5e82915..35a3b416b6ed 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java @@ -88,12 +88,10 @@ public BaseFilterOperator getLeafFilterOperator(QueryContext queryContext, Predi return new MatchAllFilterOperator(numDocs); } - // Currently sorted index based filtering is supported only for - // dictionary encoded columns. The on-disk segment metadata - // will indicate if the column is sorted or not regardless of - // whether it is raw or dictionary encoded. Here when creating - // the filter operator, we need to make sure that sort filter - // operator is used only if the column is sorted and has dictionary. + // Sorted index based filtering is supported for both dictionary encoded and raw columns. + // For dictionary encoded sorted columns, SortedIndexBasedFilterOperator uses the sorted + // index reader. For raw sorted columns, RawSortedIndexBasedFilterOperator uses binary + // search on the forward index. Predicate.Type predicateType = predicateEvaluator.getPredicateType(); if (predicateType == Predicate.Type.RANGE) { if (dataSource.getDataSourceMetadata().isSorted() && dataSource.getDictionary() != null @@ -104,6 +102,11 @@ public BaseFilterOperator getLeafFilterOperator(QueryContext queryContext, Predi && queryContext.isIndexUseAllowed(dataSource, FieldConfig.IndexType.RANGE)) { return new RangeIndexBasedFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs); } + if (dataSource.getDataSourceMetadata().isSorted() && dataSource.getDictionary() == null + && dataSource.getDataSourceMetadata().isSingleValue() + && queryContext.isIndexUseAllowed(dataSource, FieldConfig.IndexType.SORTED)) { + return new RawSortedIndexBasedFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs); + } return new ScanBasedFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs); } else if (predicateType == Predicate.Type.REGEXP_LIKE) { if (predicateEvaluator instanceof BaseDictIdBasedRegexpLikePredicateEvaluator) { @@ -134,6 +137,11 @@ public BaseFilterOperator getLeafFilterOperator(QueryContext queryContext, Predi && queryContext.isIndexUseAllowed(dataSource, FieldConfig.IndexType.RANGE)) { return new RangeIndexBasedFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs); } + if (dataSource.getDataSourceMetadata().isSorted() && dataSource.getDictionary() == null + && dataSource.getDataSourceMetadata().isSingleValue() + && queryContext.isIndexUseAllowed(dataSource, FieldConfig.IndexType.SORTED)) { + return new RawSortedIndexBasedFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs); + } return new ScanBasedFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs); } } @@ -221,7 +229,8 @@ int getPriority(BaseFilterOperator filterOperator) { return priority.getAsInt(); } } - if (filterOperator instanceof SortedIndexBasedFilterOperator) { + if (filterOperator instanceof SortedIndexBasedFilterOperator + || filterOperator instanceof RawSortedIndexBasedFilterOperator) { return PrioritizedFilterOperator.HIGH_PRIORITY; } if (filterOperator instanceof BitmapBasedFilterOperator diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RawSortedIndexBasedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RawSortedIndexBasedFilterOperator.java new file mode 100644 index 000000000000..2267d1536653 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RawSortedIndexBasedFilterOperator.java @@ -0,0 +1,523 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.filter; + +import com.google.common.base.CaseFormat; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.pinot.common.request.context.predicate.EqPredicate; +import org.apache.pinot.common.request.context.predicate.InPredicate; +import org.apache.pinot.common.request.context.predicate.NotEqPredicate; +import org.apache.pinot.common.request.context.predicate.NotInPredicate; +import org.apache.pinot.common.request.context.predicate.Predicate; +import org.apache.pinot.common.request.context.predicate.RangePredicate; +import org.apache.pinot.core.common.BlockDocIdSet; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.ExplainAttributeBuilder; +import org.apache.pinot.core.operator.docidsets.EmptyDocIdSet; +import org.apache.pinot.core.operator.docidsets.SortedDocIdSet; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.utils.ByteArray; +import org.apache.pinot.spi.utils.BytesUtils; +import org.apache.pinot.spi.utils.Pairs.IntPair; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +/** + * Filter operator for sorted raw (non-dictionary-encoded) single-value columns. Uses binary search on the forward + * index to find matching document id ranges, avoiding full column scans. For chunk-compressed forward indexes, + * implements a two-level binary search: first at chunk boundaries to minimize chunk decompressions, then within the + * target chunk. + */ +public class RawSortedIndexBasedFilterOperator extends BaseColumnFilterOperator { + private static final String EXPLAIN_NAME = "FILTER_RAW_SORTED_INDEX"; + + private final PredicateEvaluator _predicateEvaluator; + @SuppressWarnings("rawtypes") + private final ForwardIndexReader _forwardIndexReader; + private final int _numDocsPerChunk; + + @SuppressWarnings("rawtypes") + RawSortedIndexBasedFilterOperator(QueryContext queryContext, PredicateEvaluator predicateEvaluator, + DataSource dataSource, int numDocs) { + super(queryContext, dataSource, numDocs); + _predicateEvaluator = predicateEvaluator; + _forwardIndexReader = dataSource.getForwardIndex(); + _numDocsPerChunk = _forwardIndexReader.getNumDocsPerChunk(); + } + + /** + * Functional interface for comparing a value at a docId against a target value. + * Returns negative if value at docId < target, 0 if equal, positive if value > target. + */ + @FunctionalInterface + private interface ValueComparator { + int compare(int docId); + } + + @Override + protected BlockDocIdSet getNextBlockWithoutNullHandling() { + List docIdRanges = computeMatchingRanges(); + if (docIdRanges.isEmpty()) { + return EmptyDocIdSet.getInstance(); + } + return new SortedDocIdSet(docIdRanges); + } + + @SuppressWarnings("unchecked") + private List computeMatchingRanges() { + Predicate predicate = _predicateEvaluator.getPredicate(); + Predicate.Type predicateType = predicate.getType(); + DataType dataType = _forwardIndexReader.getStoredType(); + + try (ForwardIndexReaderContext context = _forwardIndexReader.createContext()) { + switch (predicateType) { + case EQ: + return computeEqRanges(context, dataType, ((EqPredicate) predicate).getValue()); + case NOT_EQ: + return invertRanges(computeEqRanges(context, dataType, ((NotEqPredicate) predicate).getValue())); + case IN: + return computeInRanges(context, dataType, ((InPredicate) predicate).getValues()); + case NOT_IN: + return invertRanges(computeInRanges(context, dataType, ((NotInPredicate) predicate).getValues())); + case RANGE: + return computeRangeRanges(context, dataType, (RangePredicate) predicate); + default: + // For unsupported predicates (REGEXP_LIKE, etc.), fall back to full range and let caller scan + return Collections.singletonList(new IntPair(0, _numDocs - 1)); + } + } catch (Exception e) { + throw new RuntimeException("Failed to compute matching ranges for raw sorted column", e); + } + } + + private List computeEqRanges(ForwardIndexReaderContext context, DataType dataType, String valueStr) { + ValueComparator cmp = buildComparator(context, dataType, valueStr); + int firstDocId = lowerBound(cmp, 0, _numDocs - 1); + if (firstDocId >= _numDocs) { + return Collections.emptyList(); + } + // Check if the value at firstDocId actually matches + if (cmp.compare(firstDocId) != 0) { + return Collections.emptyList(); + } + int lastDocId = upperBound(cmp, firstDocId, _numDocs - 1); + return Collections.singletonList(new IntPair(firstDocId, lastDocId)); + } + + private List computeInRanges(ForwardIndexReaderContext context, DataType dataType, List values) { + // Sort values to get ranges in order, then merge adjacent ranges + String[] sortedValues = getSortedValues(dataType, values); + List allRanges = new ArrayList<>(); + for (String valueStr : sortedValues) { + List eqRanges = computeEqRanges(context, dataType, valueStr); + allRanges.addAll(eqRanges); + } + return mergeAdjacentRanges(allRanges); + } + + private List computeRangeRanges(ForwardIndexReaderContext context, DataType dataType, + RangePredicate rangePredicate) { + String lowerBound = rangePredicate.getLowerBound(); + String upperBound = rangePredicate.getUpperBound(); + boolean lowerUnbounded = lowerBound.equals(RangePredicate.UNBOUNDED); + boolean upperUnbounded = upperBound.equals(RangePredicate.UNBOUNDED); + boolean lowerInclusive = lowerUnbounded || rangePredicate.isLowerInclusive(); + boolean upperInclusive = upperUnbounded || rangePredicate.isUpperInclusive(); + + int startDocId; + if (lowerUnbounded) { + startDocId = 0; + } else { + ValueComparator lowerCmp = buildComparator(context, dataType, lowerBound); + if (lowerInclusive) { + // Find first docId where value >= lowerBound + startDocId = lowerBound(lowerCmp, 0, _numDocs - 1); + } else { + // Find first docId where value > lowerBound (i.e., first docId where value >= lowerBound that is NOT equal) + startDocId = strictLowerBound(lowerCmp, 0, _numDocs - 1); + } + } + + if (startDocId >= _numDocs) { + return Collections.emptyList(); + } + + int endDocId; + if (upperUnbounded) { + endDocId = _numDocs - 1; + } else { + ValueComparator upperCmp = buildComparator(context, dataType, upperBound); + if (upperInclusive) { + // Find last docId where value <= upperBound + endDocId = upperBound(upperCmp, 0, _numDocs - 1); + } else { + // Find last docId where value < upperBound + endDocId = strictUpperBound(upperCmp, 0, _numDocs - 1); + } + } + + if (endDocId < 0 || endDocId < startDocId) { + return Collections.emptyList(); + } + + return Collections.singletonList(new IntPair(startDocId, endDocId)); + } + + /** + * Inverts a list of docId ranges within [0, numDocs-1]. + */ + private List invertRanges(List ranges) { + if (ranges.isEmpty()) { + return Collections.singletonList(new IntPair(0, _numDocs - 1)); + } + List inverted = new ArrayList<>(ranges.size() + 1); + int prevEnd = -1; + for (IntPair range : ranges) { + if (range.getLeft() > prevEnd + 1) { + inverted.add(new IntPair(prevEnd + 1, range.getLeft() - 1)); + } + prevEnd = range.getRight(); + } + if (prevEnd < _numDocs - 1) { + inverted.add(new IntPair(prevEnd + 1, _numDocs - 1)); + } + return inverted; + } + + /** + * Merges adjacent or overlapping ranges into a single range. + */ + private static List mergeAdjacentRanges(List ranges) { + if (ranges.size() <= 1) { + return ranges; + } + List merged = new ArrayList<>(); + IntPair current = ranges.get(0); + for (int i = 1; i < ranges.size(); i++) { + IntPair next = ranges.get(i); + if (next.getLeft() <= current.getRight() + 1) { + current = new IntPair(current.getLeft(), Math.max(current.getRight(), next.getRight())); + } else { + merged.add(current); + current = next; + } + } + merged.add(current); + return merged; + } + + // --- Binary search methods --- + + /** + * Finds the first docId in [low, high] where value >= target (comparator returns >= 0). + * If no such docId exists, returns high + 1. + * Uses two-level search when chunk info is available. + */ + private int lowerBound(ValueComparator cmp, int low, int high) { + if (_numDocsPerChunk > 0 && high - low > _numDocsPerChunk) { + return chunkAwareLowerBound(cmp, low, high); + } + return simpleLowerBound(cmp, low, high); + } + + /** + * Finds the last docId in [low, high] where value <= target (comparator returns <= 0). + * If no such docId exists, returns low - 1. + */ + private int upperBound(ValueComparator cmp, int low, int high) { + if (_numDocsPerChunk > 0 && high - low > _numDocsPerChunk) { + return chunkAwareUpperBound(cmp, low, high); + } + return simpleUpperBound(cmp, low, high); + } + + /** + * Finds the first docId where value > target (strictly greater than). + */ + private int strictLowerBound(ValueComparator cmp, int low, int high) { + // Find the last docId where value == target, then return the next one + int ub = upperBound(cmp, low, high); + return ub + 1; + } + + /** + * Finds the last docId where value < target (strictly less than). + */ + private int strictUpperBound(ValueComparator cmp, int low, int high) { + // Find the first docId where value >= target, then return the one before + int lb = lowerBound(cmp, low, high); + return lb - 1; + } + + private int simpleLowerBound(ValueComparator cmp, int low, int high) { + int result = high + 1; + while (low <= high) { + int mid = low + (high - low) / 2; + if (cmp.compare(mid) >= 0) { + result = mid; + high = mid - 1; + } else { + low = mid + 1; + } + } + return result; + } + + private int simpleUpperBound(ValueComparator cmp, int low, int high) { + int result = low - 1; + while (low <= high) { + int mid = low + (high - low) / 2; + if (cmp.compare(mid) <= 0) { + result = mid; + low = mid + 1; + } else { + high = mid - 1; + } + } + return result; + } + + /** + * Two-level binary search: coarse search at chunk boundaries, then fine search within the chunk. + * Finds first docId where value >= target. + */ + private int chunkAwareLowerBound(ValueComparator cmp, int low, int high) { + // Coarse search: find the chunk containing the boundary + int lowChunk = low / _numDocsPerChunk; + int highChunk = Math.min(high / _numDocsPerChunk, (_numDocs - 1) / _numDocsPerChunk); + + int targetChunk = highChunk + 1; + while (lowChunk <= highChunk) { + int midChunk = lowChunk + (highChunk - lowChunk) / 2; + int chunkFirstDocId = midChunk * _numDocsPerChunk; + if (cmp.compare(chunkFirstDocId) >= 0) { + targetChunk = midChunk; + highChunk = midChunk - 1; + } else { + lowChunk = midChunk + 1; + } + } + + // Fine search within the target chunk and possibly the previous one + int fineStart; + if (targetChunk > 0) { + fineStart = (targetChunk - 1) * _numDocsPerChunk; + } else { + fineStart = 0; + } + int fineEnd = Math.min((targetChunk + 1) * _numDocsPerChunk - 1, high); + fineStart = Math.max(fineStart, low); + fineEnd = Math.min(fineEnd, high); + return simpleLowerBound(cmp, fineStart, fineEnd); + } + + /** + * Two-level binary search: coarse search at chunk boundaries, then fine search within the chunk. + * Finds last docId where value <= target. + */ + private int chunkAwareUpperBound(ValueComparator cmp, int low, int high) { + // Coarse search: find the last chunk where first value <= target + int lowChunk = low / _numDocsPerChunk; + int highChunk = Math.min(high / _numDocsPerChunk, (_numDocs - 1) / _numDocsPerChunk); + + int targetChunk = lowChunk - 1; + while (lowChunk <= highChunk) { + int midChunk = lowChunk + (highChunk - lowChunk) / 2; + int chunkFirstDocId = midChunk * _numDocsPerChunk; + if (cmp.compare(chunkFirstDocId) <= 0) { + targetChunk = midChunk; + lowChunk = midChunk + 1; + } else { + highChunk = midChunk - 1; + } + } + + // Fine search within the target chunk and possibly the next one + int fineStart = Math.max(targetChunk * _numDocsPerChunk, low); + int fineEnd; + if (targetChunk < (_numDocs - 1) / _numDocsPerChunk) { + fineEnd = (targetChunk + 2) * _numDocsPerChunk - 1; + } else { + fineEnd = _numDocs - 1; + } + fineEnd = Math.min(fineEnd, high); + return simpleUpperBound(cmp, fineStart, fineEnd); + } + + // --- Value comparator builders --- + + @SuppressWarnings("unchecked") + private ValueComparator buildComparator(ForwardIndexReaderContext context, DataType dataType, String valueStr) { + switch (dataType) { + case INT: { + int target = Integer.parseInt(valueStr); + return docId -> Integer.compare(_forwardIndexReader.getInt(docId, context), target); + } + case LONG: { + long target = Long.parseLong(valueStr); + return docId -> Long.compare(_forwardIndexReader.getLong(docId, context), target); + } + case FLOAT: { + float target = Float.parseFloat(valueStr); + return docId -> Float.compare(_forwardIndexReader.getFloat(docId, context), target); + } + case DOUBLE: { + double target = Double.parseDouble(valueStr); + return docId -> Double.compare(_forwardIndexReader.getDouble(docId, context), target); + } + case BIG_DECIMAL: { + BigDecimal target = new BigDecimal(valueStr); + return docId -> _forwardIndexReader.getBigDecimal(docId, context).compareTo(target); + } + case STRING: { + return docId -> _forwardIndexReader.getString(docId, context).compareTo(valueStr); + } + case BYTES: { + byte[] target = BytesUtils.toBytes(valueStr); + return docId -> ByteArray.compare(_forwardIndexReader.getBytes(docId, context), target); + } + default: + throw new IllegalStateException("Unsupported data type for raw sorted index: " + dataType); + } + } + + /** + * Sorts the IN/NOT_IN values according to the column data type ordering, so that binary search results produce + * ranges in docId order. + */ + private String[] getSortedValues(DataType dataType, List values) { + String[] sorted = values.toArray(new String[0]); + switch (dataType) { + case INT: + int[] intVals = new int[sorted.length]; + for (int i = 0; i < sorted.length; i++) { + intVals[i] = Integer.parseInt(sorted[i]); + } + Arrays.sort(intVals); + for (int i = 0; i < sorted.length; i++) { + sorted[i] = Integer.toString(intVals[i]); + } + break; + case LONG: + long[] longVals = new long[sorted.length]; + for (int i = 0; i < sorted.length; i++) { + longVals[i] = Long.parseLong(sorted[i]); + } + Arrays.sort(longVals); + for (int i = 0; i < sorted.length; i++) { + sorted[i] = Long.toString(longVals[i]); + } + break; + case FLOAT: + float[] floatVals = new float[sorted.length]; + for (int i = 0; i < sorted.length; i++) { + floatVals[i] = Float.parseFloat(sorted[i]); + } + Arrays.sort(floatVals); + for (int i = 0; i < sorted.length; i++) { + sorted[i] = Float.toString(floatVals[i]); + } + break; + case DOUBLE: + double[] doubleVals = new double[sorted.length]; + for (int i = 0; i < sorted.length; i++) { + doubleVals[i] = Double.parseDouble(sorted[i]); + } + Arrays.sort(doubleVals); + for (int i = 0; i < sorted.length; i++) { + sorted[i] = Double.toString(doubleVals[i]); + } + break; + default: + // String and others: natural string sort + Arrays.sort(sorted); + break; + } + return sorted; + } + + // --- Count optimization and bitmap support --- + + @Override + public boolean canOptimizeCount() { + return true; + } + + @Override + @SuppressWarnings("unchecked") + public int getNumMatchingDocs() { + List ranges = computeMatchingRanges(); + int count = 0; + for (IntPair range : ranges) { + count += range.getRight() - range.getLeft() + 1; + } + return count; + } + + @Override + public boolean canProduceBitmaps() { + return true; + } + + @Override + @SuppressWarnings("unchecked") + public BitmapCollection getBitmaps() { + List ranges = computeMatchingRanges(); + MutableRoaringBitmap bitmap = new MutableRoaringBitmap(); + for (IntPair range : ranges) { + bitmap.add(range.getLeft(), range.getRight() + 1L); + } + return new BitmapCollection(_numDocs, false, bitmap); + } + + @Override + public List getChildOperators() { + return Collections.emptyList(); + } + + @Override + public String toExplainString() { + StringBuilder stringBuilder = new StringBuilder(EXPLAIN_NAME).append("(indexLookUp:raw_sorted_index"); + stringBuilder.append(",operator:").append(_predicateEvaluator.getPredicateType()); + stringBuilder.append(",predicate:").append(_predicateEvaluator.getPredicate().toString()); + return stringBuilder.append(')').toString(); + } + + @Override + protected String getExplainName() { + return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, EXPLAIN_NAME); + } + + @Override + protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { + super.explainAttributes(attributeBuilder); + attributeBuilder.putString("indexLookUp", "raw_sorted_index"); + attributeBuilder.putString("operator", _predicateEvaluator.getPredicateType().name()); + attributeBuilder.putString("predicate", _predicateEvaluator.getPredicate().toString()); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/filter/RawSortedIndexBasedFilterOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/filter/RawSortedIndexBasedFilterOperatorTest.java new file mode 100644 index 000000000000..190ee7987751 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/filter/RawSortedIndexBasedFilterOperatorTest.java @@ -0,0 +1,503 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.filter; + +import java.util.Arrays; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.predicate.EqPredicate; +import org.apache.pinot.common.request.context.predicate.InPredicate; +import org.apache.pinot.common.request.context.predicate.NotEqPredicate; +import org.apache.pinot.common.request.context.predicate.NotInPredicate; +import org.apache.pinot.common.request.context.predicate.RangePredicate; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.filter.predicate.EqualsPredicateEvaluatorFactory; +import org.apache.pinot.core.operator.filter.predicate.InPredicateEvaluatorFactory; +import org.apache.pinot.core.operator.filter.predicate.NotEqualsPredicateEvaluatorFactory; +import org.apache.pinot.core.operator.filter.predicate.NotInPredicateEvaluatorFactory; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; +import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.segment.spi.Constants; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.roaringbitmap.IntIterator; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class RawSortedIndexBasedFilterOperatorTest { + + // Sorted int data: [0, 0, 1, 1, 2, 3, 4, 5, 5, 5, 6, 7, 8, 9, 10, 10, 10, 15, 20, 100] + private static final int[] SORTED_INT_DATA = + {0, 0, 1, 1, 2, 3, 4, 5, 5, 5, 6, 7, 8, 9, 10, 10, 10, 15, 20, 100}; + private static final int NUM_DOCS = SORTED_INT_DATA.length; + private static final ExpressionContext COL_EXPR = ExpressionContext.forIdentifier("testCol"); + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static DataSource createIntDataSource(int[] data, int numDocsPerChunk) { + ForwardIndexReader reader = mock(ForwardIndexReader.class); + when(reader.isDictionaryEncoded()).thenReturn(false); + when(reader.isSingleValue()).thenReturn(true); + when(reader.getStoredType()).thenReturn(DataType.INT); + when(reader.getNumDocsPerChunk()).thenReturn(numDocsPerChunk); + when(reader.createContext()).thenReturn(null); + for (int i = 0; i < data.length; i++) { + when(reader.getInt(i, null)).thenReturn(data[i]); + } + + DataSourceMetadata metadata = mock(DataSourceMetadata.class); + when(metadata.isSorted()).thenReturn(true); + when(metadata.isSingleValue()).thenReturn(true); + when(metadata.getDataType()).thenReturn(DataType.INT); + + DataSource dataSource = mock(DataSource.class); + when(dataSource.getForwardIndex()).thenReturn(reader); + when(dataSource.getDataSourceMetadata()).thenReturn(metadata); + when(dataSource.getDictionary()).thenReturn(null); + when(dataSource.getNullValueVector()).thenReturn(null); + + return dataSource; + } + + private static QueryContext createQueryContext() { + QueryContext queryContext = mock(QueryContext.class); + when(queryContext.isNullHandlingEnabled()).thenReturn(false); + return queryContext; + } + + private static int[] getMatchingDocIds(RawSortedIndexBasedFilterOperator operator) { + FilterBlock filterBlock = operator.nextBlock(); + MutableRoaringBitmap bitmap = new MutableRoaringBitmap(); + var iterator = filterBlock.getBlockDocIdSet().iterator(); + int docId; + while ((docId = iterator.next()) != Constants.EOF) { + bitmap.add(docId); + } + int[] result = new int[bitmap.getCardinality()]; + IntIterator it = bitmap.getIntIterator(); + int idx = 0; + while (it.hasNext()) { + result[idx++] = it.next(); + } + return result; + } + + @Test + public void testEqPredicateValueExists() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + EqPredicate predicate = new EqPredicate(COL_EXPR, "5"); + PredicateEvaluator evaluator = + EqualsPredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + int[] matchingDocIds = getMatchingDocIds(operator); + // Value 5 is at indices 7, 8, 9 + assertEquals(matchingDocIds, new int[]{7, 8, 9}); + } + + @Test + public void testEqPredicateValueNotExists() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + EqPredicate predicate = new EqPredicate(COL_EXPR, "50"); + PredicateEvaluator evaluator = + EqualsPredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + int[] matchingDocIds = getMatchingDocIds(operator); + assertEquals(matchingDocIds.length, 0); + } + + @Test + public void testEqPredicateFirstValue() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + EqPredicate predicate = new EqPredicate(COL_EXPR, "0"); + PredicateEvaluator evaluator = + EqualsPredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + int[] matchingDocIds = getMatchingDocIds(operator); + assertEquals(matchingDocIds, new int[]{0, 1}); + } + + @Test + public void testEqPredicateLastValue() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + EqPredicate predicate = new EqPredicate(COL_EXPR, "100"); + PredicateEvaluator evaluator = + EqualsPredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + int[] matchingDocIds = getMatchingDocIds(operator); + assertEquals(matchingDocIds, new int[]{19}); + } + + @Test + public void testNotEqPredicate() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + NotEqPredicate predicate = new NotEqPredicate(COL_EXPR, "5"); + PredicateEvaluator evaluator = + NotEqualsPredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + int[] matchingDocIds = getMatchingDocIds(operator); + // All except indices 7, 8, 9 + assertEquals(matchingDocIds, new int[]{0, 1, 2, 3, 4, 5, 6, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}); + } + + @Test + public void testRangePredicateInclusive() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + // Range [3, 7] inclusive + RangePredicate predicate = new RangePredicate(COL_EXPR, true, "3", true, "7", DataType.INT); + PredicateEvaluator evaluator = + RangePredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + int[] matchingDocIds = getMatchingDocIds(operator); + // Values 3,4,5,5,5,6,7 at indices 5,6,7,8,9,10,11 + assertEquals(matchingDocIds, new int[]{5, 6, 7, 8, 9, 10, 11}); + } + + @Test + public void testRangePredicateExclusive() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + // Range (3, 7) exclusive + RangePredicate predicate = new RangePredicate(COL_EXPR, false, "3", false, "7", DataType.INT); + PredicateEvaluator evaluator = + RangePredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + int[] matchingDocIds = getMatchingDocIds(operator); + // Values 4,5,5,5,6 at indices 6,7,8,9,10 + assertEquals(matchingDocIds, new int[]{6, 7, 8, 9, 10}); + } + + @Test + public void testRangePredicateUnboundedLower() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + RangePredicate predicate = + new RangePredicate(COL_EXPR, true, RangePredicate.UNBOUNDED, true, "2", DataType.INT); + PredicateEvaluator evaluator = + RangePredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + int[] matchingDocIds = getMatchingDocIds(operator); + // Values 0,0,1,1,2 at indices 0,1,2,3,4 + assertEquals(matchingDocIds, new int[]{0, 1, 2, 3, 4}); + } + + @Test + public void testRangePredicateUnboundedUpper() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + RangePredicate predicate = + new RangePredicate(COL_EXPR, true, "15", true, RangePredicate.UNBOUNDED, DataType.INT); + PredicateEvaluator evaluator = + RangePredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + int[] matchingDocIds = getMatchingDocIds(operator); + // Values 15,20,100 at indices 17,18,19 + assertEquals(matchingDocIds, new int[]{17, 18, 19}); + } + + @Test + public void testRangePredicateNoMatch() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + RangePredicate predicate = new RangePredicate(COL_EXPR, true, "50", true, "99", DataType.INT); + PredicateEvaluator evaluator = + RangePredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + int[] matchingDocIds = getMatchingDocIds(operator); + assertEquals(matchingDocIds.length, 0); + } + + @Test + public void testInPredicate() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + InPredicate predicate = new InPredicate(COL_EXPR, Arrays.asList("1", "5", "10")); + PredicateEvaluator evaluator = + InPredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + int[] matchingDocIds = getMatchingDocIds(operator); + // Value 1 at 2,3; value 5 at 7,8,9; value 10 at 14,15,16 + assertEquals(matchingDocIds, new int[]{2, 3, 7, 8, 9, 14, 15, 16}); + } + + @Test + public void testNotInPredicate() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + NotInPredicate predicate = new NotInPredicate(COL_EXPR, Arrays.asList("0", "100")); + PredicateEvaluator evaluator = + NotInPredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + int[] matchingDocIds = getMatchingDocIds(operator); + // All except 0,1 (value 0) and 19 (value 100) + assertEquals(matchingDocIds, new int[]{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18}); + } + + @Test + public void testCanOptimizeCount() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + EqPredicate predicate = new EqPredicate(COL_EXPR, "5"); + PredicateEvaluator evaluator = + EqualsPredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + assertTrue(operator.canOptimizeCount()); + assertEquals(operator.getNumMatchingDocs(), 3); + } + + @Test + public void testCanProduceBitmaps() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + EqPredicate predicate = new EqPredicate(COL_EXPR, "10"); + PredicateEvaluator evaluator = + EqualsPredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + assertTrue(operator.canProduceBitmaps()); + BitmapCollection bitmaps = operator.getBitmaps(); + ImmutableRoaringBitmap bitmap = bitmaps.reduce(); + assertEquals(bitmap.getCardinality(), 3); + assertTrue(bitmap.contains(14)); + assertTrue(bitmap.contains(15)); + assertTrue(bitmap.contains(16)); + } + + @Test + public void testChunkAwareBinarySearch() { + // Test with numDocsPerChunk = 4 to verify two-level search + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 4); + QueryContext queryContext = createQueryContext(); + EqPredicate predicate = new EqPredicate(COL_EXPR, "5"); + PredicateEvaluator evaluator = + EqualsPredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + int[] matchingDocIds = getMatchingDocIds(operator); + assertEquals(matchingDocIds, new int[]{7, 8, 9}); + } + + @Test + public void testChunkAwareRangePredicate() { + // Test with numDocsPerChunk = 5 to verify two-level range search + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 5); + QueryContext queryContext = createQueryContext(); + RangePredicate predicate = new RangePredicate(COL_EXPR, true, "3", true, "7", DataType.INT); + PredicateEvaluator evaluator = + RangePredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + int[] matchingDocIds = getMatchingDocIds(operator); + assertEquals(matchingDocIds, new int[]{5, 6, 7, 8, 9, 10, 11}); + } + + @Test + public void testExplainString() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + EqPredicate predicate = new EqPredicate(COL_EXPR, "5"); + PredicateEvaluator evaluator = + EqualsPredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + String explainString = operator.toExplainString(); + assertTrue(explainString.contains("FILTER_RAW_SORTED_INDEX")); + assertTrue(explainString.contains("raw_sorted_index")); + } + + @Test + public void testAllDocsMatch() { + DataSource dataSource = createIntDataSource(SORTED_INT_DATA, 0); + QueryContext queryContext = createQueryContext(); + RangePredicate predicate = new RangePredicate(COL_EXPR, true, "0", true, "100", DataType.INT); + PredicateEvaluator evaluator = + RangePredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, NUM_DOCS); + + assertEquals(operator.getNumMatchingDocs(), NUM_DOCS); + } + + @Test + public void testSingleElementData() { + int[] data = {42}; + DataSource dataSource = createIntDataSource(data, 0); + QueryContext queryContext = createQueryContext(); + EqPredicate predicate = new EqPredicate(COL_EXPR, "42"); + PredicateEvaluator evaluator = + EqualsPredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.INT); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, 1); + + int[] matchingDocIds = getMatchingDocIds(operator); + assertEquals(matchingDocIds, new int[]{0}); + } + + // --- Long data type test --- + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static DataSource createLongDataSource(long[] data) { + ForwardIndexReader reader = mock(ForwardIndexReader.class); + when(reader.isDictionaryEncoded()).thenReturn(false); + when(reader.isSingleValue()).thenReturn(true); + when(reader.getStoredType()).thenReturn(DataType.LONG); + when(reader.getNumDocsPerChunk()).thenReturn(0); + when(reader.createContext()).thenReturn(null); + for (int i = 0; i < data.length; i++) { + when(reader.getLong(i, null)).thenReturn(data[i]); + } + + DataSourceMetadata metadata = mock(DataSourceMetadata.class); + when(metadata.isSorted()).thenReturn(true); + when(metadata.isSingleValue()).thenReturn(true); + when(metadata.getDataType()).thenReturn(DataType.LONG); + + DataSource dataSource = mock(DataSource.class); + when(dataSource.getForwardIndex()).thenReturn(reader); + when(dataSource.getDataSourceMetadata()).thenReturn(metadata); + when(dataSource.getDictionary()).thenReturn(null); + when(dataSource.getNullValueVector()).thenReturn(null); + + return dataSource; + } + + @Test + public void testLongEqPredicate() { + long[] data = {100L, 200L, 300L, 300L, 400L, 500L}; + DataSource dataSource = createLongDataSource(data); + QueryContext queryContext = createQueryContext(); + EqPredicate predicate = new EqPredicate(COL_EXPR, "300"); + PredicateEvaluator evaluator = + EqualsPredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.LONG); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, data.length); + + int[] matchingDocIds = getMatchingDocIds(operator); + assertEquals(matchingDocIds, new int[]{2, 3}); + } + + // --- String data type test --- + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static DataSource createStringDataSource(String[] data) { + ForwardIndexReader reader = mock(ForwardIndexReader.class); + when(reader.isDictionaryEncoded()).thenReturn(false); + when(reader.isSingleValue()).thenReturn(true); + when(reader.getStoredType()).thenReturn(DataType.STRING); + when(reader.getNumDocsPerChunk()).thenReturn(0); + when(reader.createContext()).thenReturn(null); + for (int i = 0; i < data.length; i++) { + when(reader.getString(i, null)).thenReturn(data[i]); + } + + DataSourceMetadata metadata = mock(DataSourceMetadata.class); + when(metadata.isSorted()).thenReturn(true); + when(metadata.isSingleValue()).thenReturn(true); + when(metadata.getDataType()).thenReturn(DataType.STRING); + + DataSource dataSource = mock(DataSource.class); + when(dataSource.getForwardIndex()).thenReturn(reader); + when(dataSource.getDataSourceMetadata()).thenReturn(metadata); + when(dataSource.getDictionary()).thenReturn(null); + when(dataSource.getNullValueVector()).thenReturn(null); + + return dataSource; + } + + @Test + public void testStringRangePredicate() { + String[] data = {"apple", "banana", "cherry", "date", "elderberry", "fig"}; + DataSource dataSource = createStringDataSource(data); + QueryContext queryContext = createQueryContext(); + RangePredicate predicate = + new RangePredicate(COL_EXPR, true, "banana", true, "elderberry", DataType.STRING); + PredicateEvaluator evaluator = + RangePredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.STRING); + + RawSortedIndexBasedFilterOperator operator = + new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, data.length); + + int[] matchingDocIds = getMatchingDocIds(operator); + // banana(1), cherry(2), date(3), elderberry(4) + assertEquals(matchingDocIds, new int[]{1, 2, 3, 4}); + } +} diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawSortedFilterOperator.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawSortedFilterOperator.java new file mode 100644 index 000000000000..a672e8c05ee6 --- /dev/null +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawSortedFilterOperator.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.perf; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.predicate.EqPredicate; +import org.apache.pinot.common.request.context.predicate.Predicate; +import org.apache.pinot.common.request.context.predicate.RangePredicate; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.filter.BaseFilterOperator; +import org.apache.pinot.core.operator.filter.FilterOperatorUtils; +import org.apache.pinot.core.operator.filter.ScanBasedFilterOperator; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.Constants; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + + +/** + * JMH benchmark comparing the raw sorted filter path against the scan fallback on a real immutable segment. + * + *

The benchmark builds a raw, single-value, sorted integer column and measures the end-to-end leaf filter cost for + * two cases: + * {@link org.apache.pinot.core.operator.filter.RawSortedIndexBasedFilterOperator} via the default planner path, and + * {@link ScanBasedFilterOperator} by forcing the query context to skip the sorted index. + */ +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +public class BenchmarkRawSortedFilterOperator { + private static final File INDEX_DIR_ROOT = + new File(FileUtils.getTempDirectory(), "BenchmarkRawSortedFilterOperator"); + private static final String TABLE_NAME = "rawSortedFilterTable"; + private static final String COLUMN_NAME = "rawSortedInt"; + private static final String RAW_SORTED_OPERATOR_NAME = "RawSortedIndexBasedFilterOperator"; + private static final ExpressionContext COLUMN_EXPRESSION = ExpressionContext.forIdentifier(COLUMN_NAME); + private static final int RUN_LENGTH = 32; + + public static void main(String[] args) + throws Exception { + new Runner(new OptionsBuilder().include(BenchmarkRawSortedFilterOperator.class.getSimpleName()).build()).run(); + } + + @State(Scope.Benchmark) + public static class FilterState { + @Param({"1000000"}) + int _numRows; + + @Param({"PASS_THROUGH", "SNAPPY"}) + String _compressionCodec; + + @Param({"EQ", "RANGE_1_PERCENT", "RANGE_10_PERCENT"}) + String _predicateShape; + + private File _indexDir; + private Schema _schema; + private IndexSegment _segment; + private DataSource _dataSource; + private QueryContext _rawSortedQueryContext; + private QueryContext _scanQueryContext; + private PredicateEvaluator _predicateEvaluator; + private long _expectedChecksum; + + @Setup(Level.Trial) + public void setup() + throws Exception { + _indexDir = new File(INDEX_DIR_ROOT, _compressionCodec + "-" + _predicateShape + "-" + _numRows); + FileUtils.deleteQuietly(_indexDir); + FileUtils.forceMkdir(_indexDir); + + _schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension(COLUMN_NAME, FieldSpec.DataType.INT) + .build(); + + buildSegment(); + loadSegment(); + + Preconditions.checkState(_dataSource.getDataSourceMetadata().isSorted(), + "Benchmark requires a sorted column but metadata was unsorted"); + Preconditions.checkState(_dataSource.getDictionary() == null, + "Benchmark requires a raw column but dictionary was present"); + _rawSortedQueryContext = new QueryContext.Builder().setTableName(TABLE_NAME).build(); + _rawSortedQueryContext.setSchema(_schema); + + _scanQueryContext = new QueryContext.Builder().setTableName(TABLE_NAME).build(); + _scanQueryContext.setSchema(_schema); + _scanQueryContext.setSkipIndexes(Collections.singletonMap(COLUMN_NAME, EnumSet.of(FieldConfig.IndexType.SORTED))); + + Predicate predicate = buildPredicate(); + _predicateEvaluator = PredicateEvaluatorProvider.getPredicateEvaluator(predicate, _dataSource, + _rawSortedQueryContext); + + BaseFilterOperator rawSortedOperator = createRawSortedOperator(); + Preconditions.checkState(RAW_SORTED_OPERATOR_NAME.equals(rawSortedOperator.getClass().getSimpleName()), + "Expected %s but planned %s", RAW_SORTED_OPERATOR_NAME, rawSortedOperator.getClass().getName()); + BaseFilterOperator scanOperator = createScanOperator(); + Preconditions.checkState(scanOperator instanceof ScanBasedFilterOperator, + "Expected ScanBasedFilterOperator but planned %s", scanOperator.getClass().getName()); + + _expectedChecksum = consume(rawSortedOperator); + long scanChecksum = consume(scanOperator); + Preconditions.checkState(_expectedChecksum == scanChecksum, + "Raw sorted and scan paths must return identical results. rawSorted=%s scan=%s shape=%s", + _expectedChecksum, scanChecksum, _predicateShape); + } + + @TearDown(Level.Trial) + public void tearDown() { + if (_segment != null) { + _segment.destroy(); + } + FileUtils.deleteQuietly(_indexDir); + } + + BaseFilterOperator createRawSortedOperator() { + return FilterOperatorUtils.getLeafFilterOperator(_rawSortedQueryContext, _predicateEvaluator, _dataSource, + _numRows); + } + + BaseFilterOperator createScanOperator() { + return FilterOperatorUtils.getLeafFilterOperator(_scanQueryContext, _predicateEvaluator, _dataSource, _numRows); + } + + private void buildSegment() + throws Exception { + FieldConfig fieldConfig = new FieldConfig.Builder(COLUMN_NAME) + .withEncodingType(FieldConfig.EncodingType.RAW) + .withCompressionCodec(FieldConfig.CompressionCodec.valueOf(_compressionCodec)) + .build(); + SegmentGeneratorConfig config = new SegmentGeneratorConfig( + new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) + .setNoDictionaryColumns(List.of(COLUMN_NAME)) + .setSortedColumn(COLUMN_NAME) + .setFieldConfigList(List.of(fieldConfig)) + .build(), + _schema); + config.setOutDir(_indexDir.getAbsolutePath()); + config.setTableName(TABLE_NAME); + config.setSegmentName("segment"); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + try (RecordReader recordReader = new GeneratedDataRecordReader(createDataGenerator())) { + driver.init(config, recordReader); + driver.build(); + } + } + + private void loadSegment() + throws Exception { + _segment = ImmutableSegmentLoader.load(new File(_indexDir, "segment"), + new IndexLoadingConfig( + new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) + .setNoDictionaryColumns(List.of(COLUMN_NAME)) + .setSortedColumn(COLUMN_NAME) + .setFieldConfigList(List.of(new FieldConfig.Builder(COLUMN_NAME) + .withEncodingType(FieldConfig.EncodingType.RAW) + .withCompressionCodec(FieldConfig.CompressionCodec.valueOf(_compressionCodec)) + .build())) + .build(), + _schema)); + _dataSource = _segment.getDataSource(COLUMN_NAME); + } + + private Predicate buildPredicate() { + int numDistinctValues = (_numRows + RUN_LENGTH - 1) / RUN_LENGTH; + int midValue = numDistinctValues / 2; + switch (_predicateShape) { + case "EQ": + return new EqPredicate(COLUMN_EXPRESSION, Integer.toString(midValue)); + case "RANGE_1_PERCENT": + return createCenteredRangePredicate(numDistinctValues, 0.01d); + case "RANGE_10_PERCENT": + return createCenteredRangePredicate(numDistinctValues, 0.10d); + default: + throw new IllegalStateException("Unsupported predicate shape: " + _predicateShape); + } + } + + private Predicate createCenteredRangePredicate(int numDistinctValues, double matchingFraction) { + int matchingDocs = Math.max(1, (int) Math.round(_numRows * matchingFraction)); + int matchingDistinctValues = Math.max(1, (matchingDocs + RUN_LENGTH - 1) / RUN_LENGTH); + int lowerValue = Math.max(0, (numDistinctValues - matchingDistinctValues) / 2); + int upperValue = Math.min(numDistinctValues - 1, lowerValue + matchingDistinctValues - 1); + return new RangePredicate(COLUMN_EXPRESSION, true, Integer.toString(lowerValue), true, + Integer.toString(upperValue), FieldSpec.DataType.INT); + } + + private LazyDataGenerator createDataGenerator() { + return new LazyDataGenerator() { + @Override + public int size() { + return _numRows; + } + + @Override + public GenericRow next(GenericRow row, int index) { + row.putValue(COLUMN_NAME, index / RUN_LENGTH); + return row; + } + + @Override + public void rewind() { + } + }; + } + } + + @Benchmark + public long rawSortedPath(FilterState state) { + return consume(state.createRawSortedOperator()); + } + + @Benchmark + public long scanPath(FilterState state) { + return consume(state.createScanOperator()); + } + + private static long consume(BaseFilterOperator operator) { + FilterBlock filterBlock = operator.nextBlock(); + long checksum = 1L; + int numMatches = 0; + int docId; + var iterator = filterBlock.getBlockDocIdSet().iterator(); + while ((docId = iterator.next()) != Constants.EOF) { + checksum = checksum * 31 + docId; + numMatches++; + } + return checksum ^ ((long) numMatches << 32); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java index 0ef13d23e63b..e625b5e89827 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java @@ -282,6 +282,11 @@ public int getLengthOfLongestEntry() { return _lengthOfLongestEntry; } + @Override + public int getNumDocsPerChunk() { + return _numDocsPerChunk; + } + @Override public void readValuesSV(int[] docIds, int length, int[] values, ChunkReaderContext context) { if (_storedType.isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) { diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java index d05078494cc7..0932352b9c21 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java @@ -101,6 +101,16 @@ default T createContext(Map queryOptions) { return createContext(); } + /** + * Returns the number of documents per chunk for chunk-based forward index readers. Returns 0 if chunk information is + * not available (e.g., non-chunk-based readers or uncompressed readers). This is used to optimize binary search on + * sorted raw columns by enabling two-level search: coarse search at chunk boundaries followed by fine search within + * the target chunk, minimizing chunk decompressions. + */ + default int getNumDocsPerChunk() { + return 0; + } + /** * DICTIONARY-ENCODED INDEX APIs */