Skip to content

Commit ebd75c9

Browse files
github-actions[bot]nooneuseyuanyuhao
authored
branch-4.1: Add datasketches HLL sketch aggregate functions #63143 (#63911)
Cherry-picked from #63143 Co-authored-by: nooneuse <nooneuse@users.noreply.github.com> Co-authored-by: yuanyuhao <yuanyuhao@bytedance.com>
1 parent 314f9c1 commit ebd75c9

13 files changed

Lines changed: 1732 additions & 1 deletion

File tree

.gitmodules

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,6 @@
2929
path = contrib/openblas
3030
url = https://github.com/apache/doris-thirdparty.git
3131
branch = openblas
32+
[submodule "contrib/datasketches-cpp"]
33+
path = contrib/datasketches-cpp
34+
url = https://github.com/apache/datasketches-cpp.git
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "exprs/aggregate/aggregate_function_datasketches_hll_union_agg.h"
19+
20+
#include <string>
21+
22+
#include "core/data_type/data_type.h"
23+
#include "core/data_type/define_primitive_type.h"
24+
#include "exec/common/hash_table/hash.h" // IWYU pragma: keep
25+
#include "exprs/aggregate/aggregate_function_simple_factory.h"
26+
#include "exprs/aggregate/helpers.h"
27+
namespace doris {
28+
template <template <PrimitiveType> class Data>
29+
AggregateFunctionPtr create_aggregate_function_datasketches_hll_union_agg(
30+
const std::string& name, const DataTypes& argument_types, const DataTypePtr& result_type,
31+
const bool result_is_nullable, const AggregateFunctionAttr& attr) {
32+
return creator_with_type_list<TYPE_STRING, TYPE_VARCHAR, TYPE_VARBINARY>::create<
33+
AggregateFunctionDataSketchesHllUnionAgg, Data>(argument_types, result_is_nullable,
34+
attr);
35+
}
36+
void register_aggregate_function_datasketches_HLL_union_agg(
37+
AggregateFunctionSimpleFactory& factory) {
38+
AggregateFunctionCreator creator =
39+
create_aggregate_function_datasketches_hll_union_agg<AggregateFunctionHllSketchData>;
40+
factory.register_function_both("datasketches_hll_union_agg", creator);
41+
factory.register_alias("datasketches_hll_union_agg", "ds_hll_estimate");
42+
factory.register_alias("datasketches_hll_union_agg", "datasketches_hll_estimate");
43+
}
44+
} // namespace doris
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
#include <stddef.h>
20+
21+
#include <DataSketches/hll.hpp>
22+
#include <algorithm>
23+
#include <boost/iterator/iterator_facade.hpp>
24+
#include <memory>
25+
#include <optional>
26+
#include <type_traits>
27+
#include <vector>
28+
29+
#include "common/compiler_util.h" // IWYU pragma: keep
30+
#include "core/assert_cast.h"
31+
#include "core/column/column.h"
32+
#include "core/column/column_varbinary.h"
33+
#include "core/column/column_vector.h"
34+
#include "core/custom_allocator.h"
35+
#include "core/data_type/data_type_number.h"
36+
#include "core/data_type/define_primitive_type.h"
37+
#include "core/field.h"
38+
#include "core/string_ref.h"
39+
#include "core/types.h"
40+
#include "core/uint128.h"
41+
#include "exec/common/hash_table/hash.h"
42+
#include "exec/common/hash_table/phmap_fwd_decl.h"
43+
#include "exprs/aggregate/aggregate_function.h"
44+
#include "util/var_int.h"
45+
template <typename T>
46+
struct HashCRC32;
47+
namespace doris {
48+
class Arena;
49+
class BufferReadable;
50+
class BufferWritable;
51+
template <PrimitiveType T>
52+
class ColumnDecimal;
53+
/// datasketches_hll_union_agg
54+
template <PrimitiveType T>
55+
struct AggregateFunctionHllSketchData {
56+
/** We set the default LgK to 12,
57+
* as this value is used as a performance baseline in the relevant documentation.
58+
* (https://datasketches.apache.org/docs/HLL/HllPerformance.html)
59+
*/
60+
static constexpr uint8_t DEFAULT_LOG_K = 12;
61+
using Alloc = CustomStdAllocator<uint8_t>;
62+
using Sketch = datasketches::hll_sketch_alloc<Alloc>;
63+
using Union = datasketches::hll_union_alloc<Alloc>;
64+
65+
std::optional<Union> hll_union_data;
66+
67+
static String get_name() { return "datasketches_hll_union_agg"; }
68+
69+
void merge(const Sketch& sketch_data) {
70+
if (!hll_union_data.has_value()) {
71+
/** We clamp max lg_k to [7, 21],
72+
* considering that the code comment requires 7 to 21.
73+
* See: datasketches-cpp/hll/include/hll.hpp:451
74+
*/
75+
constexpr uint8_t MIN_UNION_LOG_K = 7;
76+
const uint8_t union_lg_k =
77+
std::clamp<uint8_t>(sketch_data.get_lg_config_k(), MIN_UNION_LOG_K,
78+
datasketches::hll_constants::MAX_LOG_K);
79+
hll_union_data.emplace(union_lg_k, Alloc());
80+
}
81+
try {
82+
hll_union_data->update(sketch_data);
83+
} catch (const doris::Exception& e) {
84+
throw Exception(e.code(), "Internal error happened when update HLL sketch: {}",
85+
e.to_string());
86+
} catch (const std::exception& e) {
87+
throw Exception(ErrorCode::INTERNAL_ERROR,
88+
"Internal error happened when update HLL sketch: {}", e.what());
89+
} catch (...) {
90+
throw Exception(ErrorCode::INTERNAL_ERROR,
91+
"Internal error happened when update HLL sketch: unknown exception.");
92+
}
93+
}
94+
void reset() {
95+
if (hll_union_data.has_value()) {
96+
hll_union_data->reset();
97+
}
98+
hll_union_data.reset();
99+
}
100+
101+
void write_sketch(BufferWritable& buf, const Sketch& sk) const {
102+
auto serialized_bytes = sk.serialize_compact();
103+
StringRef d(serialized_bytes.data(), serialized_bytes.size());
104+
buf.write_binary(d);
105+
}
106+
void write(BufferWritable& buf) const {
107+
if (!hll_union_data.has_value()) {
108+
/** Using DEFAULT_LOG_K(12) here is surely sufficient,
109+
* because in this case the union that actually needs to be serialized should contain no data.
110+
*/
111+
Union u(DEFAULT_LOG_K, Alloc());
112+
write_sketch(buf, u.get_result());
113+
return;
114+
}
115+
try {
116+
auto cache = hll_union_data->get_result();
117+
write_sketch(buf, cache);
118+
} catch (const doris::Exception& e) {
119+
throw Exception(e.code(), "Internal error happened when serialize HLL sketch: {}",
120+
e.to_string());
121+
} catch (const std::exception& e) {
122+
throw Exception(ErrorCode::INTERNAL_ERROR,
123+
"Internal error happened when serialize HLL sketch: {}", e.what());
124+
} catch (...) {
125+
throw Exception(
126+
ErrorCode::INTERNAL_ERROR,
127+
"Internal error happened when serialize HLL sketch: unknown exception.");
128+
}
129+
}
130+
void read(BufferReadable& buf) {
131+
StringRef d;
132+
buf.read_binary(d);
133+
134+
auto cache = [&]() -> Sketch {
135+
try {
136+
return Sketch::deserialize(d.data, d.size, Alloc());
137+
} catch (const doris::Exception& e) {
138+
throw Exception(e.code(), "Failed to deserialize HLL sketch when read: {}",
139+
e.to_string());
140+
} catch (const std::exception& e) {
141+
throw Exception(ErrorCode::CORRUPTION, "HLL sketch data corrupted when read: {}",
142+
e.what());
143+
} catch (...) {
144+
throw Exception(ErrorCode::CORRUPTION,
145+
"HLL sketch data corrupted when read: unknown exception.");
146+
}
147+
}();
148+
149+
merge(cache);
150+
}
151+
double get_result() const {
152+
if (hll_union_data.has_value()) {
153+
try {
154+
return hll_union_data->get_estimate();
155+
} catch (const doris::Exception& e) {
156+
throw Exception(e.code(),
157+
"Internal error happened when get HLL sketch estimate: {}",
158+
e.to_string());
159+
} catch (const std::exception& e) {
160+
throw Exception(ErrorCode::INTERNAL_ERROR,
161+
"Internal error happened when get HLL sketch estimate: {}",
162+
e.what());
163+
} catch (...) {
164+
throw Exception(
165+
ErrorCode::INTERNAL_ERROR,
166+
"Internal error happened when get HLL sketch estimate: unknown exception.");
167+
}
168+
}
169+
return 0.0;
170+
}
171+
};
172+
173+
/// Calculates the number of different values approximately using hll sketch.
174+
template <PrimitiveType T, typename Data>
175+
class AggregateFunctionDataSketchesHllUnionAgg final
176+
: public IAggregateFunctionDataHelper<Data,
177+
AggregateFunctionDataSketchesHllUnionAgg<T, Data>>,
178+
UnaryExpression,
179+
NotNullableAggregateFunction {
180+
public:
181+
AggregateFunctionDataSketchesHllUnionAgg(const DataTypes& argument_types_)
182+
: IAggregateFunctionDataHelper<Data, AggregateFunctionDataSketchesHllUnionAgg<T, Data>>(
183+
argument_types_) {}
184+
String get_name() const override { return Data::get_name(); }
185+
DataTypePtr get_return_type() const override { return std::make_shared<DataTypeFloat64>(); }
186+
void reset(AggregateDataPtr __restrict place) const override { this->data(place).reset(); }
187+
void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num,
188+
Arena&) const override {
189+
add_one(this->data(place), *columns[0], row_num);
190+
}
191+
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
192+
Arena&) const override {
193+
const auto& rhs_data = this->data(rhs);
194+
if (!rhs_data.hll_union_data.has_value()) {
195+
return;
196+
}
197+
this->data(place).merge(rhs_data.hll_union_data->get_result(datasketches::HLL_8));
198+
}
199+
void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
200+
this->data(place).write(buf);
201+
}
202+
void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
203+
Arena&) const override {
204+
this->data(place).read(buf);
205+
}
206+
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {
207+
assert_cast<ColumnFloat64&>(to).get_data().push_back(this->data(place).get_result());
208+
}
209+
210+
private:
211+
static void ALWAYS_INLINE add_one(Data& data, const IColumn& column, ssize_t row_num) {
212+
if constexpr (is_string_type(T) || is_varbinary(T)) {
213+
const auto& src_column = assert_cast<const typename PrimitiveTypeTraits<T>::ColumnType&,
214+
TypeCheckOnRelease::DISABLE>(column);
215+
StringRef value = src_column.get_data_at(static_cast<size_t>(row_num));
216+
if (value.empty()) {
217+
throw Exception(ErrorCode::CORRUPTION,
218+
"HLL sketch data corrupted when add: empty input.");
219+
}
220+
221+
using Sketch = typename Data::Sketch;
222+
using Alloc = typename Data::Alloc;
223+
224+
auto sketch_data = [&]() -> Sketch {
225+
try {
226+
return Sketch::deserialize(value.begin(), value.size, Alloc());
227+
} catch (const doris::Exception& e) {
228+
throw Exception(e.code(), "Failed to deserialize HLL sketch when add: {}",
229+
e.to_string());
230+
} catch (const std::exception& e) {
231+
throw Exception(ErrorCode::CORRUPTION, "HLL sketch data corrupted when add: {}",
232+
e.what());
233+
} catch (...) {
234+
throw Exception(ErrorCode::CORRUPTION,
235+
"HLL sketch data corrupted when add: unknown exception.");
236+
}
237+
}();
238+
239+
data.merge(sketch_data);
240+
}
241+
}
242+
};
243+
} // namespace doris

be/src/exprs/aggregate/aggregate_function_simple_factory.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ void register_aggregate_function_avg(AggregateFunctionSimpleFactory& factory);
3838
void register_aggregate_function_count(AggregateFunctionSimpleFactory& factory);
3939
void register_aggregate_function_count_by_enum(AggregateFunctionSimpleFactory& factory);
4040
void register_aggregate_function_HLL_union_agg(AggregateFunctionSimpleFactory& factory);
41+
void register_aggregate_function_datasketches_HLL_union_agg(
42+
AggregateFunctionSimpleFactory& factory);
4143
void register_aggregate_function_uniq(AggregateFunctionSimpleFactory& factory);
4244
void register_aggregate_function_uniq_distribute_key(AggregateFunctionSimpleFactory& factory);
4345
void register_aggregate_function_bit(AggregateFunctionSimpleFactory& factory);
@@ -128,6 +130,7 @@ AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() {
128130
register_aggregate_function_replace_reader_load(instance);
129131
register_aggregate_function_window_lead_lag_first_last(instance);
130132
register_aggregate_function_HLL_union_agg(instance);
133+
register_aggregate_function_datasketches_HLL_union_agg(instance);
131134
register_aggregate_functions_corr(instance);
132135
register_aggregate_functions_corr_welford(instance);
133136
register_aggregate_function_covar_pop(instance);

0 commit comments

Comments
 (0)