Add RawSortedIndexBasedFilterOperator for binary search on raw sorted columns#18079
Add RawSortedIndexBasedFilterOperator for binary search on raw sorted columns#18079xiangfu0 wants to merge 2 commits intoapache:masterfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18079 +/- ##
============================================
+ Coverage 63.48% 63.50% +0.01%
Complexity 1627 1627
============================================
Files 3244 3245 +1
Lines 197342 197570 +228
Branches 30529 30575 +46
============================================
+ Hits 125285 125460 +175
- Misses 62014 62057 +43
- Partials 10043 10053 +10
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
8817917 to
fe87055
Compare
fe87055 to
f1e6f1d
Compare
| && dataSource.getDataSourceMetadata().isSingleValue() | ||
| && queryContext.isIndexUseAllowed(dataSource, FieldConfig.IndexType.SORTED)) { | ||
| return new RawSortedIndexBasedFilterOperator(queryContext, predicateEvaluator, dataSource, numDocs); | ||
| } |
… columns For raw (non-dictionary) sorted forward index columns, filter queries previously fell back to ScanBasedFilterOperator which does a full linear scan O(N). This change adds a new RawSortedIndexBasedFilterOperator that uses binary search O(log N) on the forward index to find matching document ID ranges. Key features: - Two-level binary search for chunk-compressed readers: coarse search at chunk boundaries minimizes decompressions, fine search within cached chunk is free - Supports EQ, NEQ, IN, NOT_IN, RANGE predicates with all numeric types + STRING - Exposes getNumDocsPerChunk() on ForwardIndexReader for chunk-aware optimization - Optimized count and bitmap production from docId ranges Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When a column has both a range index and is raw-sorted, the range index should be preferred over raw sorted binary search. Moved raw sorted checks after specialized index checks to fix TextMatchTransformFunctionTest. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
f1e6f1d to
b7d1b32
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a new filter operator path for sorted, raw (non-dictionary-encoded) single-value columns, enabling binary-search-based filtering on forward indexes (with chunk-aware optimization when chunk metadata is available). This extends Pinot’s existing “sorted index” optimization beyond dictionary-encoded columns.
Changes:
- Add
RawSortedIndexBasedFilterOperatorto compute matching docId ranges via binary search on raw sorted forward indexes (optionally chunk-aware). - Extend
ForwardIndexReaderwithgetNumDocsPerChunk()and implement it inBaseChunkForwardIndexReader. - Update
FilterOperatorUtilsto route eligible sorted raw SV columns to the new operator and add a new unit test suite.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java |
Adds getNumDocsPerChunk() default method for chunk-aware optimizations. |
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java |
Exposes _numDocsPerChunk via getNumDocsPerChunk(). |
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RawSortedIndexBasedFilterOperator.java |
Implements raw sorted forward-index binary search (including chunk-aware search). |
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java |
Routes sorted raw SV columns to the new operator and prioritizes it similarly to SortedIndexBasedFilterOperator. |
pinot-core/src/test/java/org/apache/pinot/core/operator/filter/RawSortedIndexBasedFilterOperatorTest.java |
Adds unit tests for the new operator (currently focused on INT/LONG/STRING). |
| // Sort values to get ranges in order, then merge adjacent ranges | ||
| String[] sortedValues = getSortedValues(dataType, values); | ||
| List<IntPair> allRanges = new ArrayList<>(); | ||
| for (String valueStr : sortedValues) { | ||
| List<IntPair> eqRanges = computeEqRanges(context, dataType, valueStr); | ||
| allRanges.addAll(eqRanges); | ||
| } | ||
| return mergeAdjacentRanges(allRanges); | ||
| } | ||
|
|
There was a problem hiding this comment.
computeInRanges() relies on mergeAdjacentRanges() assuming ranges are already in ascending docId order, but getSortedValues() only applies type-correct ordering for INT/LONG/FLOAT/DOUBLE and falls back to lexicographic String sort for other types. For BIG_DECIMAL this can generate out-of-order docId ranges (e.g., values ["2","10"]), which breaks SortedDocIdSet/SortedRangeIntersection expectations and can lead to incorrect AND/OR results. Fix by sorting the collected docId ranges by range.getLeft() (and ideally de-duping/merging overlaps) before constructing SortedDocIdSet, or by implementing type-correct value sorting for BIG_DECIMAL (BigDecimal::compareTo) and BYTES (decoded bytes / ByteArray.compare) as well.
| // Sort values to get ranges in order, then merge adjacent ranges | |
| String[] sortedValues = getSortedValues(dataType, values); | |
| List<IntPair> allRanges = new ArrayList<>(); | |
| for (String valueStr : sortedValues) { | |
| List<IntPair> eqRanges = computeEqRanges(context, dataType, valueStr); | |
| allRanges.addAll(eqRanges); | |
| } | |
| return mergeAdjacentRanges(allRanges); | |
| } | |
| // Sort values to improve binary search locality, then normalize the resulting docId ranges because | |
| // value ordering does not always imply docId-range ordering for all data types. | |
| String[] sortedValues = getSortedValues(dataType, values); | |
| List<IntPair> allRanges = new ArrayList<>(); | |
| for (String valueStr : sortedValues) { | |
| List<IntPair> eqRanges = computeEqRanges(context, dataType, valueStr); | |
| allRanges.addAll(eqRanges); | |
| } | |
| return sortAndMergeRanges(allRanges); | |
| } | |
| private List<IntPair> sortAndMergeRanges(List<IntPair> ranges) { | |
| int numRanges = ranges.size(); | |
| if (numRanges < 2) { | |
| return ranges; | |
| } | |
| ranges.sort((range1, range2) -> { | |
| int leftCompare = Integer.compare(range1.getLeft(), range2.getLeft()); | |
| return leftCompare != 0 ? leftCompare : Integer.compare(range1.getRight(), range2.getRight()); | |
| }); | |
| List<IntPair> mergedRanges = new ArrayList<>(numRanges); | |
| IntPair currentRange = ranges.get(0); | |
| int currentStart = currentRange.getLeft(); | |
| int currentEnd = currentRange.getRight(); | |
| for (int i = 1; i < numRanges; i++) { | |
| IntPair nextRange = ranges.get(i); | |
| int nextStart = nextRange.getLeft(); | |
| int nextEnd = nextRange.getRight(); | |
| if (nextStart <= currentEnd + 1) { | |
| currentEnd = Math.max(currentEnd, nextEnd); | |
| } else { | |
| mergedRanges.add(new IntPair(currentStart, currentEnd)); | |
| currentStart = nextStart; | |
| currentEnd = nextEnd; | |
| } | |
| } | |
| mergedRanges.add(new IntPair(currentStart, currentEnd)); | |
| return mergedRanges; | |
| } |
| /** | ||
| * Sorts the IN/NOT_IN values according to the column data type ordering, so that binary search results produce | ||
| * ranges in docId order. | ||
| */ | ||
| private String[] getSortedValues(DataType dataType, List<String> values) { | ||
| String[] sorted = values.toArray(new String[0]); | ||
| switch (dataType) { | ||
| case INT: | ||
| int[] intVals = new int[sorted.length]; | ||
| for (int i = 0; i < sorted.length; i++) { | ||
| intVals[i] = Integer.parseInt(sorted[i]); | ||
| } | ||
| Arrays.sort(intVals); | ||
| for (int i = 0; i < sorted.length; i++) { | ||
| sorted[i] = Integer.toString(intVals[i]); | ||
| } | ||
| break; | ||
| case LONG: | ||
| long[] longVals = new long[sorted.length]; | ||
| for (int i = 0; i < sorted.length; i++) { | ||
| longVals[i] = Long.parseLong(sorted[i]); | ||
| } | ||
| Arrays.sort(longVals); | ||
| for (int i = 0; i < sorted.length; i++) { | ||
| sorted[i] = Long.toString(longVals[i]); | ||
| } | ||
| break; | ||
| case FLOAT: | ||
| float[] floatVals = new float[sorted.length]; | ||
| for (int i = 0; i < sorted.length; i++) { | ||
| floatVals[i] = Float.parseFloat(sorted[i]); | ||
| } | ||
| Arrays.sort(floatVals); | ||
| for (int i = 0; i < sorted.length; i++) { | ||
| sorted[i] = Float.toString(floatVals[i]); | ||
| } | ||
| break; | ||
| case DOUBLE: | ||
| double[] doubleVals = new double[sorted.length]; | ||
| for (int i = 0; i < sorted.length; i++) { | ||
| doubleVals[i] = Double.parseDouble(sorted[i]); | ||
| } | ||
| Arrays.sort(doubleVals); | ||
| for (int i = 0; i < sorted.length; i++) { | ||
| sorted[i] = Double.toString(doubleVals[i]); | ||
| } | ||
| break; | ||
| default: | ||
| // String and others: natural string sort | ||
| Arrays.sort(sorted); | ||
| break; | ||
| } |
There was a problem hiding this comment.
getSortedValues() does not implement type-aware ordering for BIG_DECIMAL or BYTES, but computeInRanges() depends on the values being sorted in the same order as the column to keep docId ranges ordered for SortedDocIdSet. Please add explicit sorting for BIG_DECIMAL (parse to BigDecimal and sort via compareTo) and BYTES (sort by decoded bytes using ByteArray.compare), or remove this method and instead sort the resulting IntPair ranges by docId before merging.
| // --- Long data type test --- | ||
|
|
||
| @SuppressWarnings({"unchecked", "rawtypes"}) | ||
| private static DataSource createLongDataSource(long[] data) { | ||
| ForwardIndexReader reader = mock(ForwardIndexReader.class); | ||
| when(reader.isDictionaryEncoded()).thenReturn(false); | ||
| when(reader.isSingleValue()).thenReturn(true); | ||
| when(reader.getStoredType()).thenReturn(DataType.LONG); | ||
| when(reader.getNumDocsPerChunk()).thenReturn(0); | ||
| when(reader.createContext()).thenReturn(null); | ||
| for (int i = 0; i < data.length; i++) { | ||
| when(reader.getLong(i, null)).thenReturn(data[i]); | ||
| } | ||
|
|
||
| DataSourceMetadata metadata = mock(DataSourceMetadata.class); | ||
| when(metadata.isSorted()).thenReturn(true); | ||
| when(metadata.isSingleValue()).thenReturn(true); | ||
| when(metadata.getDataType()).thenReturn(DataType.LONG); | ||
|
|
||
| DataSource dataSource = mock(DataSource.class); | ||
| when(dataSource.getForwardIndex()).thenReturn(reader); | ||
| when(dataSource.getDataSourceMetadata()).thenReturn(metadata); | ||
| when(dataSource.getDictionary()).thenReturn(null); | ||
| when(dataSource.getNullValueVector()).thenReturn(null); | ||
|
|
||
| return dataSource; | ||
| } | ||
|
|
||
| @Test | ||
| public void testLongEqPredicate() { | ||
| long[] data = {100L, 200L, 300L, 300L, 400L, 500L}; | ||
| DataSource dataSource = createLongDataSource(data); | ||
| QueryContext queryContext = createQueryContext(); | ||
| EqPredicate predicate = new EqPredicate(COL_EXPR, "300"); | ||
| PredicateEvaluator evaluator = | ||
| EqualsPredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.LONG); | ||
|
|
||
| RawSortedIndexBasedFilterOperator operator = | ||
| new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, data.length); | ||
|
|
||
| int[] matchingDocIds = getMatchingDocIds(operator); | ||
| assertEquals(matchingDocIds, new int[]{2, 3}); | ||
| } | ||
|
|
||
| // --- String data type test --- | ||
|
|
||
| @SuppressWarnings({"unchecked", "rawtypes"}) | ||
| private static DataSource createStringDataSource(String[] data) { | ||
| ForwardIndexReader reader = mock(ForwardIndexReader.class); | ||
| when(reader.isDictionaryEncoded()).thenReturn(false); | ||
| when(reader.isSingleValue()).thenReturn(true); | ||
| when(reader.getStoredType()).thenReturn(DataType.STRING); | ||
| when(reader.getNumDocsPerChunk()).thenReturn(0); | ||
| when(reader.createContext()).thenReturn(null); | ||
| for (int i = 0; i < data.length; i++) { | ||
| when(reader.getString(i, null)).thenReturn(data[i]); | ||
| } | ||
|
|
||
| DataSourceMetadata metadata = mock(DataSourceMetadata.class); | ||
| when(metadata.isSorted()).thenReturn(true); | ||
| when(metadata.isSingleValue()).thenReturn(true); | ||
| when(metadata.getDataType()).thenReturn(DataType.STRING); | ||
|
|
||
| DataSource dataSource = mock(DataSource.class); | ||
| when(dataSource.getForwardIndex()).thenReturn(reader); | ||
| when(dataSource.getDataSourceMetadata()).thenReturn(metadata); | ||
| when(dataSource.getDictionary()).thenReturn(null); | ||
| when(dataSource.getNullValueVector()).thenReturn(null); | ||
|
|
||
| return dataSource; | ||
| } | ||
|
|
||
| @Test | ||
| public void testStringRangePredicate() { | ||
| String[] data = {"apple", "banana", "cherry", "date", "elderberry", "fig"}; | ||
| DataSource dataSource = createStringDataSource(data); | ||
| QueryContext queryContext = createQueryContext(); | ||
| RangePredicate predicate = | ||
| new RangePredicate(COL_EXPR, true, "banana", true, "elderberry", DataType.STRING); | ||
| PredicateEvaluator evaluator = | ||
| RangePredicateEvaluatorFactory.newRawValueBasedEvaluator(predicate, DataType.STRING); | ||
|
|
||
| RawSortedIndexBasedFilterOperator operator = | ||
| new RawSortedIndexBasedFilterOperator(queryContext, evaluator, dataSource, data.length); | ||
|
|
||
| int[] matchingDocIds = getMatchingDocIds(operator); | ||
| // banana(1), cherry(2), date(3), elderberry(4) | ||
| assertEquals(matchingDocIds, new int[]{1, 2, 3, 4}); | ||
| } | ||
| } |
There was a problem hiding this comment.
The new operator claims support for multiple raw data types (FLOAT/DOUBLE/BYTES/BIG_DECIMAL), but this test suite only exercises INT/LONG/STRING. Adding focused tests for at least BIG_DECIMAL and BYTES (especially IN/NOT_IN, where range ordering/merging is sensitive) would prevent regressions and would have caught the ordering issue in getSortedValues().
| * not available (e.g., non-chunk-based readers or uncompressed readers). This is used to optimize binary search on | ||
| * sorted raw columns by enabling two-level search: coarse search at chunk boundaries followed by fine search within | ||
| * the target chunk, minimizing chunk decompressions. |
There was a problem hiding this comment.
Javadoc says getNumDocsPerChunk() returns 0 for “uncompressed readers”, but BaseChunkForwardIndexReader can be uncompressed (PASS_THROUGH) and still has meaningful chunk boundaries and returns _numDocsPerChunk. Consider updating the wording to reflect “non-chunk-based readers” (or “readers without chunk metadata”) rather than tying it to compression.
| * not available (e.g., non-chunk-based readers or uncompressed readers). This is used to optimize binary search on | |
| * sorted raw columns by enabling two-level search: coarse search at chunk boundaries followed by fine search within | |
| * the target chunk, minimizing chunk decompressions. | |
| * not available (e.g., non-chunk-based readers or readers without chunk metadata). This is used to optimize binary | |
| * search on sorted raw columns by enabling two-level search: coarse search at chunk boundaries followed by fine | |
| * search within the target chunk, minimizing chunk decompressions. |
Summary
RawSortedIndexBasedFilterOperatorthat uses binary search O(log N) on raw sorted forward index columns instead of full scan O(N)MultiChunkReaderContextwith an LRU cache of decompressed chunks to avoid repeated decompression during binary search; chunk cache is exposed viaForwardIndexReader.createCachedContext(int numSlots)SPIgetNumDocsPerChunk()andcreateCachedContext()onForwardIndexReaderinterface for chunk-aware optimizationBackground
Currently
SortedIndexBasedFilterOperatoronly works for dictionary-encoded sorted columns. Raw sorted columns fall through toScanBasedFilterOperatorwhich linearly scans every document. This PR adds an efficient binary search path for raw sorted columns, matching the optimization that dictionary-encoded sorted columns already enjoy.Changes
ForwardIndexReader.java— AddedgetNumDocsPerChunk()andcreateCachedContext(int numSlots)default methodsBaseChunkForwardIndexReader.java— Override both methods; addsdecompressChunkIntofor direct-into-slot decompressionMultiChunkReaderContext.java— New LRU chunk cache context (up to N decompressed chunks); handles DELTA/DELTADELTA codecs correctlyRawSortedIndexBasedFilterOperator.java— New filter operator with two-level binary search + multi-chunk cache; cachescomputeMatchingRanges()result for reuse bycanOptimizeCount()/canProduceBitmaps()FilterOperatorUtils.java— Route sorted raw SV columns to the new operatorRawSortedIndexBasedFilterOperatorTest.java— 21 unit tests covering all predicate types and edge casesMultiChunkReaderContextTest.java— 8 unit tests covering cache hit/miss, LRU eviction, replaceSlot, close cleanup, and integration for all compression typesBenchmarkRawSortedIndexFilter.java— JMH microbenchmark comparing binary search vs linear scanBenchmark
JMH microbenchmark on a sorted raw INT forward index (1K docs/chunk, ~10x value repetition).
AverageTimeKey takeaways:
numDocsbecauseScanBasedFilterOperatoruses batched iteration — only the first batch (~1K docs) is measured here. Real query latency at scale is proportionally worse for linear scan.Test plan
MultiChunkReaderContext(cache hit/miss, LRU eviction, replaceSlot, close, all compression types)🤖 Generated with Claude Code