55
66package org .opensearch .sql .calcite .udf .udaf ;
77
8- import com .google .zetasketch .HyperLogLogPlusPlus ;
8+ import java .nio .ByteBuffer ;
9+ import java .nio .ByteOrder ;
10+ import java .nio .charset .StandardCharsets ;
11+ import org .opensearch .common .hash .MurmurHash3 ;
12+ import org .opensearch .common .util .BigArrays ;
13+ import org .opensearch .search .aggregations .metrics .HyperLogLogPlusPlus ;
914import org .opensearch .sql .calcite .udf .UserDefinedAggFunction ;
1015
1116/** The function use HyperLogLogPlusPlus to count distinct count approximate value */
@@ -26,26 +31,51 @@ public Object result(HLLAccumulator accumulator) {
2631 public HLLAccumulator add (HLLAccumulator acc , Object ... values ) {
2732 for (Object value : values ) {
2833 if (value != null ) {
29- acc .add (value . toString () );
34+ acc .add (value );
3035 }
3136 }
3237 return acc ;
3338 }
3439
3540 public static class HLLAccumulator implements UserDefinedAggFunction .Accumulator {
36- private final HyperLogLogPlusPlus < String > hll ;
41+ private final HyperLogLogPlusPlus hll ;
3742
3843 public HLLAccumulator () {
39- this .hll = new HyperLogLogPlusPlus .Builder ().buildForStrings ();
44+ this .hll =
45+ new HyperLogLogPlusPlus (
46+ HyperLogLogPlusPlus .DEFAULT_PRECISION , BigArrays .NON_RECYCLING_INSTANCE , 1 );
4047 }
4148
42- public void add (String value ) {
43- hll .add ( value );
49+ public void add (Object value ) {
50+ hll .collect ( 0 , hash ( value ) );
4451 }
4552
4653 @ Override
4754 public Object value (Object ... args ) {
48- return hll .result ( );
55+ return hll .cardinality ( 0 );
4956 }
5057 }
58+
59+ private static long hash (Object data ) {
60+ MurmurHash3 .Hash128 hash = new MurmurHash3 .Hash128 ();
61+ if (data == null ) {
62+ return 0L ;
63+ }
64+
65+ byte [] bytes ;
66+
67+ if (data instanceof byte []) {
68+ bytes = (byte []) data ;
69+ } else if (data instanceof String ) {
70+ bytes = ((String ) data ).getBytes (StandardCharsets .UTF_8 );
71+ } else if (data instanceof Number ) {
72+ long value = ((Number ) data ).longValue ();
73+ bytes = ByteBuffer .allocate (Long .BYTES ).order (ByteOrder .LITTLE_ENDIAN ).putLong (value ).array ();
74+ } else {
75+ bytes = data .toString ().getBytes (StandardCharsets .UTF_8 );
76+ }
77+
78+ MurmurHash3 .hash128 (bytes , 0 , bytes .length , 0 , hash );
79+ return hash .h1 ^ hash .h2 ;
80+ }
5181}
0 commit comments