Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.google.common.base.Preconditions;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
Expand All @@ -37,7 +38,6 @@
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.RoaringBitmap;


Expand All @@ -50,7 +50,7 @@ public DistinctCountHLLAggregationFunction(List<ExpressionContext> arguments) {
// This function expects 1 or 2 arguments.
Preconditions.checkArgument(numExpressions <= 2, "DistinctCountHLL expects 1 or 2 arguments, got: %s",
numExpressions);
if (arguments.size() == 2) {
if (numExpressions == 2) {
_log2m = arguments.get(1).getLiteral().getIntValue();
} else {
_log2m = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M;
Expand Down Expand Up @@ -113,11 +113,16 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde

protected void aggregateSV(int length, AggregationResultHolder aggregationResultHolder, BlockValSet blockValSet,
DataType storedType) {
// For dictionary-encoded expression, store dictionary ids into the bitmap
// For dictionary-encoded expression, collect dictionary ids into a BitSet for deduplication.
// BitSet gives O(1) insertion with no container-switching overhead (unlike RoaringBitmap), and uses
// dictSize/8 bytes of memory (e.g. 128 KB for a 1M-entry dictionary).
Dictionary dictionary = blockValSet.getDictionary();
if (dictionary != null) {
int[] dictIds = blockValSet.getDictionaryIdsSV();
getDictIdBitmap(aggregationResultHolder, dictionary).addN(dictIds, 0, length);
BitSet bitSet = getDictIdBitSet(aggregationResultHolder, dictionary);
for (int i = 0; i < length; i++) {
bitSet.set(dictIds[i]);
}
return;
}

Expand Down Expand Up @@ -161,13 +166,15 @@ protected void aggregateSV(int length, AggregationResultHolder aggregationResult

protected void aggregateMV(int length, AggregationResultHolder aggregationResultHolder, BlockValSet blockValSet,
DataType storedType) {
// For dictionary-encoded expression, store dictionary ids into the bitmap
// For dictionary-encoded expression, collect dictionary ids into a BitSet for deduplication.
Dictionary dictionary = blockValSet.getDictionary();
if (dictionary != null) {
RoaringBitmap dictIdBitmap = getDictIdBitmap(aggregationResultHolder, dictionary);
int[][] dictIds = blockValSet.getDictionaryIdsMV();
BitSet bitSet = getDictIdBitSet(aggregationResultHolder, dictionary);
for (int i = 0; i < length; i++) {
dictIdBitmap.add(dictIds[i]);
for (int dictId : dictIds[i]) {
bitSet.set(dictId);
}
}
return;
}
Expand Down Expand Up @@ -255,7 +262,10 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol

protected void aggregateSVGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
BlockValSet blockValSet, DataType storedType) {
// For dictionary-encoded expression, store dictionary ids into the bitmap
// For dictionary-encoded expression, collect dictionary ids into a RoaringBitmap for deduplication.
// RoaringBitmap is used (not BitSet) because it is sparse: memory scales with the number of distinct dict IDs
// seen per group, not with the full dictionary size. This avoids OOM when many groups each see few distinct values
// (contrast with the non-group-by path, which uses a single BitSet across the entire dictionary).
Dictionary dictionary = blockValSet.getDictionary();
if (dictionary != null) {
int[] dictIds = blockValSet.getDictionaryIdsSV();
Expand Down Expand Up @@ -304,7 +314,7 @@ protected void aggregateSVGroupBySV(int length, int[] groupKeyArray, GroupByResu

protected void aggregateMVGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
BlockValSet blockValSet, DataType storedType) {
// For dictionary-encoded expression, store dictionary ids into the bitmap
// For dictionary-encoded expression, collect dictionary ids into a RoaringBitmap (see aggregateSVGroupBySV).
Dictionary dictionary = blockValSet.getDictionary();
if (dictionary != null) {
int[][] dictIds = blockValSet.getDictionaryIdsMV();
Expand Down Expand Up @@ -404,12 +414,15 @@ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResult

protected void aggregateSVGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
BlockValSet blockValSet, DataType storedType) {
// For dictionary-encoded expression, store dictionary ids into the bitmap
// For dictionary-encoded expression, collect dictionary ids into a RoaringBitmap (see aggregateSVGroupBySV).
Dictionary dictionary = blockValSet.getDictionary();
if (dictionary != null) {
int[] dictIds = blockValSet.getDictionaryIdsSV();
for (int i = 0; i < length; i++) {
setDictIdForGroupKeys(groupByResultHolder, groupKeysArray[i], dictionary, dictIds[i]);
int dictId = dictIds[i];
for (int groupKey : groupKeysArray[i]) {
getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictId);
}
}
return;
}
Expand Down Expand Up @@ -453,13 +466,14 @@ protected void aggregateSVGroupByMV(int length, int[][] groupKeysArray, GroupByR

protected void aggregateMVGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
BlockValSet blockValSet, DataType storedType) {
// For dictionary-encoded expression, store dictionary ids into the bitmap
// For dictionary-encoded expression, collect dictionary ids into a RoaringBitmap (see aggregateSVGroupBySV).
Dictionary dictionary = blockValSet.getDictionary();
if (dictionary != null) {
int[][] dictIds = blockValSet.getDictionaryIdsMV();
for (int i = 0; i < length; i++) {
int[] rowDictIds = dictIds[i];
for (int groupKey : groupKeysArray[i]) {
getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictIds[i]);
getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(rowDictIds);
}
}
return;
Expand Down Expand Up @@ -554,12 +568,11 @@ public HyperLogLog extractGroupByResult(GroupByResultHolder groupByResultHolder,
if (result == null) {
return new HyperLogLog(_log2m);
}

if (result instanceof DictIdsWrapper) {
// For dictionary-encoded expression, convert dictionary ids to HyperLogLog
return convertToHyperLogLog((DictIdsWrapper) result);
if (result instanceof GroupByDictIdsWrapper) {
// For dictionary-encoded expression, convert the collected dict IDs to a HyperLogLog
return convertToHyperLogLog((GroupByDictIdsWrapper) result);
} else {
// For non-dictionary-encoded expression, directly return the HyperLogLog
// For non-dictionary-encoded expression, the result is already a HyperLogLog
return (HyperLogLog) result;
}
}
Expand Down Expand Up @@ -630,16 +643,30 @@ public boolean canUseStarTree(Map<String, Object> functionParameters) {
}

/**
* Returns the dictionary id bitmap from the result holder or creates a new one if it does not exist.
* Returns the {@link RoaringBitmap} for the given group key, creating a new {@link GroupByDictIdsWrapper} if absent.
* Uses a sparse bitmap so memory scales with distinct values per group, not dictionary size.
*/
protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder aggregationResultHolder,
protected static RoaringBitmap getDictIdBitmap(GroupByResultHolder groupByResultHolder, int groupKey,
Dictionary dictionary) {
GroupByDictIdsWrapper wrapper = groupByResultHolder.getResult(groupKey);
if (wrapper == null) {
wrapper = new GroupByDictIdsWrapper(dictionary);
groupByResultHolder.setValueForKey(groupKey, wrapper);
}
return wrapper._dictIdBitmap;
}

/**
* Returns the {@link BitSet} from the result holder, creating a new {@link DictIdsWrapper} if absent.
*/
protected static BitSet getDictIdBitSet(AggregationResultHolder aggregationResultHolder,
Dictionary dictionary) {
DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult();
if (dictIdsWrapper == null) {
dictIdsWrapper = new DictIdsWrapper(dictionary);
aggregationResultHolder.setValue(dictIdsWrapper);
}
return dictIdsWrapper._dictIdBitmap;
return dictIdsWrapper._bitSet;
}

/**
Expand All @@ -654,19 +681,6 @@ protected HyperLogLog getHyperLogLog(AggregationResultHolder aggregationResultHo
return hyperLogLog;
}

/**
* Returns the dictionary id bitmap for the given group key or creates a new one if it does not exist.
*/
protected static RoaringBitmap getDictIdBitmap(GroupByResultHolder groupByResultHolder, int groupKey,
Dictionary dictionary) {
DictIdsWrapper dictIdsWrapper = groupByResultHolder.getResult(groupKey);
if (dictIdsWrapper == null) {
dictIdsWrapper = new DictIdsWrapper(dictionary);
groupByResultHolder.setValueForKey(groupKey, dictIdsWrapper);
}
return dictIdsWrapper._dictIdBitmap;
}

/**
* Returns the HyperLogLog for the given group key or creates a new one if it does not exist.
*/
Expand All @@ -680,43 +694,66 @@ protected HyperLogLog getHyperLogLog(GroupByResultHolder groupByResultHolder, in
}

/**
* Helper method to set dictionary id for the given group keys into the result holder.
* Helper method to set value for the given group keys into the result holder.
*/
private static void setDictIdForGroupKeys(GroupByResultHolder groupByResultHolder, int[] groupKeys,
Dictionary dictionary, int dictId) {
private void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, int[] groupKeys, Object value) {
for (int groupKey : groupKeys) {
getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictId);
getHyperLogLog(groupByResultHolder, groupKey).offer(value);
}
}

/**
* Helper method to set value for the given group keys into the result holder.
* Converts a {@link GroupByDictIdsWrapper} to a HyperLogLog by offering each distinct dictionary value exactly once.
*/
private void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, int[] groupKeys, Object value) {
for (int groupKey : groupKeys) {
getHyperLogLog(groupByResultHolder, groupKey).offer(value);
private HyperLogLog convertToHyperLogLog(GroupByDictIdsWrapper wrapper) {
HyperLogLog hyperLogLog = new HyperLogLog(_log2m);
Dictionary dictionary = wrapper._dictionary;
for (int dictId : wrapper._dictIdBitmap) {
hyperLogLog.offer(dictionary.get(dictId));
}
return hyperLogLog;
}

/**
* Helper method to read dictionary and convert dictionary ids to HyperLogLog for dictionary-encoded expression.
* Converts a {@link DictIdsWrapper} to a HyperLogLog by offering each distinct dictionary value exactly once.
*/
private HyperLogLog convertToHyperLogLog(DictIdsWrapper dictIdsWrapper) {
HyperLogLog hyperLogLog = new HyperLogLog(_log2m);
Dictionary dictionary = dictIdsWrapper._dictionary;
RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
while (iterator.hasNext()) {
hyperLogLog.offer(dictionary.get(iterator.next()));
BitSet bitSet = dictIdsWrapper._bitSet;
for (int dictId = bitSet.nextSetBit(0); dictId >= 0; dictId = bitSet.nextSetBit(dictId + 1)) {
hyperLogLog.offer(dictionary.get(dictId));
}
return hyperLogLog;
}

private static final class DictIdsWrapper {
/**
* Wraps a {@link Dictionary} with a {@link BitSet} to collect and deduplicate dictionary IDs before offering
* to HyperLogLog. BitSet gives O(1) insertion with no container-management overhead (unlike RoaringBitmap),
* and uses dictSize/8 bytes of memory (e.g. 128 KB for a 1M-entry dictionary).
*/
protected static final class DictIdsWrapper {
final Dictionary _dictionary;
final BitSet _bitSet;

DictIdsWrapper(Dictionary dictionary) {
_dictionary = dictionary;
_bitSet = new BitSet(dictionary.length());
}
}

/**
* Wraps a {@link Dictionary} with a {@link RoaringBitmap} to collect and deduplicate dictionary IDs in the group-by
* aggregation path. Unlike {@link DictIdsWrapper} (which uses a pre-allocated {@link BitSet} of dictSize/8 bytes),
* this uses a sparse RoaringBitmap whose memory grows only with the number of distinct dict IDs seen per group.
* This is critical for group-by: one wrapper per group means memory = numGroups × (distinct values/group × ~2 bytes),
* which stays bounded even when there are many groups or a large dictionary.
*/
protected static final class GroupByDictIdsWrapper {
final Dictionary _dictionary;
final RoaringBitmap _dictIdBitmap;

private DictIdsWrapper(Dictionary dictionary) {
GroupByDictIdsWrapper(Dictionary dictionary) {
_dictionary = dictionary;
_dictIdBitmap = new RoaringBitmap();
}
Expand Down
Loading
Loading