|
38 | 38 | import org.apache.pinot.segment.spi.index.reader.Dictionary; |
39 | 39 | import org.apache.pinot.spi.data.FieldSpec.DataType; |
40 | 40 | import org.apache.pinot.spi.utils.CommonConstants; |
| 41 | +import org.roaringbitmap.RoaringBitmap; |
41 | 42 |
|
42 | 43 |
|
43 | 44 | public class DistinctCountHLLAggregationFunction extends BaseSingleInputAggregationFunction<HyperLogLog, Long> { |
@@ -256,16 +257,15 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol |
256 | 257 |
|
257 | 258 | protected void aggregateSVGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, |
258 | 259 | BlockValSet blockValSet, DataType storedType) { |
259 | | - // For dictionary-encoded expression, offer dictionary values directly to HyperLogLog. |
260 | | - // Unlike the non-group-by aggregation path (where a single BitSet over the full dict is cheap), the group-by |
261 | | - // path creates one result per group. Pre-allocating a BitSet of dictSize/8 bytes per group would multiply memory |
262 | | - // usage by numGroups (e.g. 100K groups × 375KB = 37.5GB for a 3M-entry dict). Since HLL is an approximation and |
263 | | - // max-register semantics make duplicate offers a no-op, deduplication is unnecessary here. |
| 260 | + // For dictionary-encoded expression, collect dictionary ids into a RoaringBitmap for deduplication. |
| 261 | + // RoaringBitmap is used (not BitSet) because it is sparse: memory scales with the number of distinct dict IDs |
| 262 | + // seen per group, not with the full dictionary size. This avoids OOM when many groups each see few distinct values |
| 263 | + // (contrast with the non-group-by path, which uses a single BitSet across the entire dictionary). |
264 | 264 | Dictionary dictionary = blockValSet.getDictionary(); |
265 | 265 | if (dictionary != null) { |
266 | 266 | int[] dictIds = blockValSet.getDictionaryIdsSV(); |
267 | 267 | for (int i = 0; i < length; i++) { |
268 | | - getHyperLogLog(groupByResultHolder, groupKeyArray[i]).offer(dictionary.get(dictIds[i])); |
| 268 | + getDictIdBitmap(groupByResultHolder, groupKeyArray[i], dictionary).add(dictIds[i]); |
269 | 269 | } |
270 | 270 | return; |
271 | 271 | } |
@@ -309,15 +309,12 @@ protected void aggregateSVGroupBySV(int length, int[] groupKeyArray, GroupByResu |
309 | 309 |
|
310 | 310 | protected void aggregateMVGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, |
311 | 311 | BlockValSet blockValSet, DataType storedType) { |
312 | | - // For dictionary-encoded expression, offer dictionary values directly to HyperLogLog (see aggregateSVGroupBySV). |
| 312 | + // For dictionary-encoded expression, collect dictionary ids into a RoaringBitmap (see aggregateSVGroupBySV). |
313 | 313 | Dictionary dictionary = blockValSet.getDictionary(); |
314 | 314 | if (dictionary != null) { |
315 | 315 | int[][] dictIds = blockValSet.getDictionaryIdsMV(); |
316 | 316 | for (int i = 0; i < length; i++) { |
317 | | - HyperLogLog hyperLogLog = getHyperLogLog(groupByResultHolder, groupKeyArray[i]); |
318 | | - for (int dictId : dictIds[i]) { |
319 | | - hyperLogLog.offer(dictionary.get(dictId)); |
320 | | - } |
| 317 | + getDictIdBitmap(groupByResultHolder, groupKeyArray[i], dictionary).add(dictIds[i]); |
321 | 318 | } |
322 | 319 | return; |
323 | 320 | } |
@@ -412,14 +409,14 @@ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResult |
412 | 409 |
|
413 | 410 | protected void aggregateSVGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, |
414 | 411 | BlockValSet blockValSet, DataType storedType) { |
415 | | - // For dictionary-encoded expression, offer dictionary values directly to HyperLogLog (see aggregateSVGroupBySV). |
| 412 | + // For dictionary-encoded expression, collect dictionary ids into a RoaringBitmap (see aggregateSVGroupBySV). |
416 | 413 | Dictionary dictionary = blockValSet.getDictionary(); |
417 | 414 | if (dictionary != null) { |
418 | 415 | int[] dictIds = blockValSet.getDictionaryIdsSV(); |
419 | 416 | for (int i = 0; i < length; i++) { |
420 | | - Object value = dictionary.get(dictIds[i]); |
| 417 | + int dictId = dictIds[i]; |
421 | 418 | for (int groupKey : groupKeysArray[i]) { |
422 | | - getHyperLogLog(groupByResultHolder, groupKey).offer(value); |
| 419 | + getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictId); |
423 | 420 | } |
424 | 421 | } |
425 | 422 | return; |
@@ -464,17 +461,14 @@ protected void aggregateSVGroupByMV(int length, int[][] groupKeysArray, GroupByR |
464 | 461 |
|
465 | 462 | protected void aggregateMVGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, |
466 | 463 | BlockValSet blockValSet, DataType storedType) { |
467 | | - // For dictionary-encoded expression, offer dictionary values directly to HyperLogLog (see aggregateSVGroupBySV). |
| 464 | + // For dictionary-encoded expression, collect dictionary ids into a RoaringBitmap (see aggregateSVGroupBySV). |
468 | 465 | Dictionary dictionary = blockValSet.getDictionary(); |
469 | 466 | if (dictionary != null) { |
470 | 467 | int[][] dictIds = blockValSet.getDictionaryIdsMV(); |
471 | 468 | for (int i = 0; i < length; i++) { |
472 | 469 | int[] rowDictIds = dictIds[i]; |
473 | 470 | for (int groupKey : groupKeysArray[i]) { |
474 | | - HyperLogLog hyperLogLog = getHyperLogLog(groupByResultHolder, groupKey); |
475 | | - for (int dictId : rowDictIds) { |
476 | | - hyperLogLog.offer(dictionary.get(dictId)); |
477 | | - } |
| 471 | + getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(rowDictIds); |
478 | 472 | } |
479 | 473 | } |
480 | 474 | return; |
@@ -565,8 +559,17 @@ public HyperLogLog extractAggregationResult(AggregationResultHolder aggregationR |
565 | 559 |
|
566 | 560 | @Override |
567 | 561 | public HyperLogLog extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { |
568 | | - HyperLogLog hyperLogLog = groupByResultHolder.getResult(groupKey); |
569 | | - return hyperLogLog != null ? hyperLogLog : new HyperLogLog(_log2m); |
| 562 | + Object result = groupByResultHolder.getResult(groupKey); |
| 563 | + if (result == null) { |
| 564 | + return new HyperLogLog(_log2m); |
| 565 | + } |
| 566 | + if (result instanceof GroupByDictIdsWrapper) { |
| 567 | + // For dictionary-encoded expression, convert the collected dict IDs to a HyperLogLog |
| 568 | + return convertToHyperLogLog((GroupByDictIdsWrapper) result); |
| 569 | + } else { |
| 570 | + // For non-dictionary-encoded expression, the result is already a HyperLogLog |
| 571 | + return (HyperLogLog) result; |
| 572 | + } |
570 | 573 | } |
571 | 574 |
|
572 | 575 | @Override |
@@ -634,6 +637,20 @@ public boolean canUseStarTree(Map<String, Object> functionParameters) { |
634 | 637 | } |
635 | 638 | } |
636 | 639 |
|
| 640 | + /** |
| 641 | + * Returns the {@link GroupByDictIdsWrapper} for the given group key, creating a new one if absent. |
| 642 | + * Uses a {@link RoaringBitmap} (sparse) so memory scales with distinct values per group, not dictionary size. |
| 643 | + */ |
| 644 | + protected static GroupByDictIdsWrapper getDictIdBitmap(GroupByResultHolder groupByResultHolder, int groupKey, |
| 645 | + Dictionary dictionary) { |
| 646 | + GroupByDictIdsWrapper wrapper = groupByResultHolder.getResult(groupKey); |
| 647 | + if (wrapper == null) { |
| 648 | + wrapper = new GroupByDictIdsWrapper(dictionary); |
| 649 | + groupByResultHolder.setValueForKey(groupKey, wrapper); |
| 650 | + } |
| 651 | + return wrapper; |
| 652 | + } |
| 653 | + |
637 | 654 | /** |
638 | 655 | * Returns the {@link DictIdsWrapper} from the result holder, creating a new one if absent. |
639 | 656 | */ |
@@ -680,6 +697,18 @@ private void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, int[] |
680 | 697 | } |
681 | 698 | } |
682 | 699 |
|
| 700 | + /** |
| 701 | + * Converts a {@link GroupByDictIdsWrapper} to a HyperLogLog by offering each distinct dictionary value exactly once. |
| 702 | + */ |
| 703 | + private HyperLogLog convertToHyperLogLog(GroupByDictIdsWrapper wrapper) { |
| 704 | + HyperLogLog hyperLogLog = new HyperLogLog(_log2m); |
| 705 | + Dictionary dictionary = wrapper._dictionary; |
| 706 | + for (int dictId : wrapper._dictIdBitmap) { |
| 707 | + hyperLogLog.offer(dictionary.get(dictId)); |
| 708 | + } |
| 709 | + return hyperLogLog; |
| 710 | + } |
| 711 | + |
683 | 712 | /** |
684 | 713 | * Converts a {@link DictIdsWrapper} to a HyperLogLog by offering each distinct dictionary value exactly once. |
685 | 714 | */ |
@@ -723,4 +752,29 @@ void addDictIds(int[] dictIds, int length) { |
723 | 752 | } |
724 | 753 | } |
725 | 754 | } |
| 755 | + |
| 756 | + /** |
| 757 | + * Wraps a {@link Dictionary} with a {@link RoaringBitmap} to collect and deduplicate dictionary IDs in the group-by |
| 758 | + * aggregation path. Unlike {@link DictIdsWrapper} (which uses a pre-allocated {@link BitSet} of dictSize/8 bytes), |
| 759 | + * this uses a sparse RoaringBitmap whose memory grows only with the number of distinct dict IDs seen per group. |
| 760 | + * This is critical for group-by: one wrapper per group means memory = numGroups × (distinct values/group × ~2 bytes), |
| 761 | + * which stays bounded even when there are many groups or a large dictionary. |
| 762 | + */ |
| 763 | + protected static final class GroupByDictIdsWrapper { |
| 764 | + final Dictionary _dictionary; |
| 765 | + final RoaringBitmap _dictIdBitmap; |
| 766 | + |
| 767 | + GroupByDictIdsWrapper(Dictionary dictionary) { |
| 768 | + _dictionary = dictionary; |
| 769 | + _dictIdBitmap = new RoaringBitmap(); |
| 770 | + } |
| 771 | + |
| 772 | + void add(int dictId) { |
| 773 | + _dictIdBitmap.add(dictId); |
| 774 | + } |
| 775 | + |
| 776 | + void add(int[] dictIds) { |
| 777 | + _dictIdBitmap.add(dictIds); |
| 778 | + } |
| 779 | + } |
726 | 780 | } |
0 commit comments