diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/hyperloglog/HyperLogLogPlusPlus.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/hyperloglog/HyperLogLogPlusPlus.java index befa3b72a780d..2c551d198bf1f 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/hyperloglog/HyperLogLogPlusPlus.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/hyperloglog/HyperLogLogPlusPlus.java @@ -4946,8 +4946,8 @@ public long query(HllBuffer buffer) { int i = 0; int shift = 0; while (idx < m && i < REGISTERS_PER_WORD) { - long mIdx = (word >>> shift) & REGISTER_WORD_MASK; - zInverse += 1.0 / (1 << mIdx); + int mIdx = (int) ((word >>> shift) & REGISTER_WORD_MASK); + zInverse += 1.0 / (1L << mIdx); if (mIdx == 0) { v += 1.0d; } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/aggregate/hyperloglog/HyperLogLogPlusPlusTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/aggregate/hyperloglog/HyperLogLogPlusPlusTest.java index e06ad04ec930a..af3a064068f40 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/aggregate/hyperloglog/HyperLogLogPlusPlusTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/aggregate/hyperloglog/HyperLogLogPlusPlusTest.java @@ -159,6 +159,36 @@ private void testCardinalityEstimates( } } + @Test + void testQueryWithRegisterValuesAbove32() { + // Directly construct an HLL buffer where every register holds value 35 (>= 32). + // Before the fix, "1 << mIdx" used int shift which wraps for mIdx >= 32 + // (1 << 35 == 1 << 3 == 8), producing incorrect estimates. + // The fix uses "1L << mIdx" (long shift) so 1L << 35 == 34359738368. + int registerValue = 35; + HyperLogLogPlusPlus hll = new HyperLogLogPlusPlus(0.01); + HllBuffer buffer = createHllBuffer(hll); + + // Pack each word with 10 registers (6 bits each) all set to registerValue. + for (int w = 0; w < hll.getNumWords(); w++) { + long word = 0L; + for (int r = 0; r < 10; r++) { + word |= ((long) registerValue) << (r * 6); + } + buffer.array[w] = word; + } + + long estimate = hll.query(buffer); + + // With correct long shift, the estimate should be astronomically large + // (on the order of alpha * m * 2^35 ~ 4e14). + // With the buggy int shift, the estimate would be around 95K. + // Assert the estimate is at least 1e12 to catch the int-shift bug. + assertThat(estimate) + .as("Estimate should reflect long-shift math for register values >= 32") + .isGreaterThan(1_000_000_000_000L); + } + public HllBuffer createHllBuffer(HyperLogLogPlusPlus hll) { HllBuffer buffer = new HllBuffer(); buffer.array = new long[hll.getNumWords()];