Skip to content

Commit 6ab2048

Browse files
author
Rafał Hibner
committed
Merge branch 'tdigest_map' into combined3
2 parents a750f7d + 73f04d9 commit 6ab2048

2 files changed

Lines changed: 88 additions & 42 deletions

File tree

cpp/src/arrow/compute/kernels/aggregate_tdigest.cc

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,13 @@ struct TDigestBaseImpl : public ScalarAggregator {
3636
explicit TDigestBaseImpl(std::unique_ptr<TDigest::Scaler> scaler, uint32_t buffer_size)
3737
: tdigest{std::move(scaler), buffer_size}, count{0}, all_valid{true} {
3838
auto output_size = tdigest.delta();
39-
out_type = struct_({field("mean", fixed_size_list(float64(), output_size), false),
40-
field("weight", fixed_size_list(float64(), output_size), false),
41-
field("count", uint64(), false), field("min", float64(), true),
42-
field("max", float64(), true)});
39+
out_type = struct_({
40+
field("mean", fixed_size_list(float64(), output_size), false),
41+
field("weight", fixed_size_list(float64(), output_size), false),
42+
field("min", float64(), true),
43+
field("max", float64(), true),
44+
field("count", uint64(), false),
45+
});
4346
}
4447

4548
Status MergeFrom(KernelContext*, KernelState&& src) override {
@@ -160,7 +163,7 @@ struct TDigestCentroidFinalizer : public TDigestBaseImpl {
160163
min = max = MakeNullScalar(float64());
161164
}
162165
*out = std::make_shared<StructScalar>(
163-
std::vector<std::shared_ptr<Scalar>>{mean, weight, count, min, max}, out_type);
166+
std::vector<std::shared_ptr<Scalar>>{mean, weight, min, max, count}, out_type);
164167
}
165168

166169
return Status::OK();
@@ -241,9 +244,9 @@ struct TDigestCentroidConsumerImpl : public TDigestFinalizer_T {
241244
auto weight_array =
242245
checked_cast<const FixedSizeListScalar*>(input_struct_scalar->value[1].get())
243246
->value;
244-
auto count = checked_cast<const UInt64Scalar*>(input_struct_scalar->value[2].get());
245-
auto min = checked_cast<const DoubleScalar*>(input_struct_scalar->value[3].get());
246-
auto max = checked_cast<const DoubleScalar*>(input_struct_scalar->value[4].get());
247+
auto min = checked_cast<const DoubleScalar*>(input_struct_scalar->value[2].get());
248+
auto max = checked_cast<const DoubleScalar*>(input_struct_scalar->value[3].get());
249+
auto count = checked_cast<const UInt64Scalar*>(input_struct_scalar->value[4].get());
247250
auto mean_double_array = checked_cast<const DoubleArray*>(mean_array.get());
248251
auto weight_double_array = checked_cast<const DoubleArray*>(weight_array.get());
249252
DCHECK_EQ(mean_double_array->length(), this->tdigest.delta());
@@ -411,7 +414,7 @@ struct TDigestCentroidTypeMatcher : public TypeMatcher {
411414

412415
static std::string ToStringStatic() {
413416
return "struct{mean:fixed_size_list<item: double>[N], weight:fixed_size_list<item: "
414-
"double>[N], count:int64, min:float64, max:float64}";
417+
"double>[N], min:float64, max:float64, count:int64}";
415418
}
416419
std::string ToString() const override { return ToStringStatic(); }
417420

cpp/src/arrow/compute/kernels/aggregate_test.cc

Lines changed: 76 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4106,6 +4106,7 @@ class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel<ArrowType> {
41064106
VerifyTDigest(chunked, quantiles);
41074107
VerifyTDigestMapQuantile(chunked, quantiles);
41084108
VerifyTDigestMapReduceQuantile(chunked, quantiles);
4109+
VerifyTDigestMapReduceIncrementalQuantile(chunked, quantiles);
41094110
}
41104111

41114112
void CheckTDigestsSliced(const std::vector<int>& chunk_sizes, int64_t num_quantiles) {
@@ -4124,6 +4125,7 @@ class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel<ArrowType> {
41244125
VerifyTDigest(chunked->Slice(os[0], os[1]), quantiles);
41254126
VerifyTDigestMapQuantile(chunked->Slice(os[0], os[1]), quantiles);
41264127
VerifyTDigestMapReduceQuantile(chunked->Slice(os[0], os[1]), quantiles);
4128+
VerifyTDigestMapReduceIncrementalQuantile(chunked->Slice(os[0], os[1]), quantiles);
41274129
}
41284130
}
41294131

@@ -4193,8 +4195,51 @@ class TestRandomQuantileKernel : public TestPrimitiveQuantileKernel<ArrowType> {
41934195
map_chunks.push_back(std::move(map_chunk));
41944196
}
41954197
auto map_chunked = std::make_shared<ChunkedArray>(std::move(map_chunks));
4198+
ASSERT_OK_AND_ASSIGN(Datum reduced, TDigestReduce(map_chunked));
41964199
TDigestQuantileOptions options(quantiles);
4197-
ASSERT_OK_AND_ASSIGN(Datum out, TDigestQuantile(map_chunked, options));
4200+
4201+
ASSERT_OK_AND_ASSIGN(Datum out_alternative, TDigestQuantile(map_chunked, options));
4202+
ASSERT_OK_AND_ASSIGN(Datum out, TDigestQuantile(reduced, options));
4203+
ASSERT_EQ(out, out_alternative);
4204+
const auto& out_array = out.make_array();
4205+
ValidateOutput(*out_array);
4206+
ASSERT_EQ(out_array->length(), quantiles.size());
4207+
ASSERT_EQ(out_array->null_count(), 0);
4208+
AssertTypeEqual(out_array->type(), float64());
4209+
4210+
// linear interpolated exact quantile as reference
4211+
std::vector<std::vector<Datum>> exact =
4212+
NaiveQuantile(*chunked, quantiles, {QuantileOptions::LINEAR});
4213+
const double* approx = out_array->data()->GetValues<double>(1);
4214+
for (size_t i = 0; i < quantiles.size(); ++i) {
4215+
const auto& exact_scalar = checked_pointer_cast<DoubleScalar>(exact[i][0].scalar());
4216+
const double tolerance = std::fabs(exact_scalar->value) * 0.05;
4217+
EXPECT_NEAR(approx[i], exact_scalar->value, tolerance) << quantiles[i];
4218+
}
4219+
}
4220+
4221+
void VerifyTDigestMapReduceIncrementalQuantile(
4222+
const std::shared_ptr<ChunkedArray>& chunked, std::vector<double>& quantiles) {
4223+
Datum out;
4224+
TDigestQuantileOptions options(quantiles);
4225+
std::shared_ptr<Scalar> incremental_centroids;
4226+
for (const auto& chunk : chunked->chunks()) {
4227+
ASSERT_OK_AND_ASSIGN(Datum centroids, TDigestMap(chunk));
4228+
if (incremental_centroids) {
4229+
// Is there a nicer way to make array from scalars?
4230+
ASSERT_OK_AND_ASSIGN(auto chunk1, MakeArrayFromScalar(*centroids.scalar(), 1));
4231+
ASSERT_OK_AND_ASSIGN(auto chunk2, MakeArrayFromScalar(*incremental_centroids, 1));
4232+
auto map_chunked = std::make_shared<ChunkedArray>(ArrayVector{chunk1, chunk2});
4233+
ASSERT_OK_AND_ASSIGN(Datum reduced, TDigestReduce(map_chunked));
4234+
incremental_centroids = reduced.scalar();
4235+
} else {
4236+
incremental_centroids = centroids.scalar();
4237+
}
4238+
4239+
ASSERT_OK_AND_ASSIGN(
4240+
out, TDigestQuantile(incremental_centroids, options)); // incremental quantile
4241+
}
4242+
41984243
const auto& out_array = out.make_array();
41994244
ValidateOutput(*out_array);
42004245
ASSERT_EQ(out_array->length(), quantiles.size());
@@ -4514,8 +4559,8 @@ TEST(TestTDigestMapKernel, Options) {
45144559
auto output_type =
45154560
struct_({field("mean", fixed_size_list(float64(), 5), false),
45164561
field("weight", fixed_size_list(float64(), 5), false),
4517-
field("count", uint64(), false), field("min", float64(), true),
4518-
field("max", float64(), true)});
4562+
field("min", float64(), true), field("max", float64(), true),
4563+
field("count", uint64(), false)});
45194564
TDigestMapOptions keep_nulls(/*delta=*/5, /*buffer_size=*/500,
45204565
/*skip_nulls=*/false,
45214566
/*scaler=*/TDigestMapOptions::Scaler::K0);
@@ -4527,103 +4572,102 @@ TEST(TestTDigestMapKernel, Options) {
45274572
TDigestMap(ArrayFromJSON(input_type, "[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]"), keep_nulls),
45284573
ResultWith(ScalarFromJSON(output_type,
45294574
"{\"mean\":[1.5, 3.5, 5.5, null, null],\"weight\":[2, 2, "
4530-
"2, null, null],\"count\":6,\"min\":1.0,\"max\":6.0}")));
4575+
"2, null, null],\"min\":1.0,\"max\":6.0,\"count\":6}")));
45314576
EXPECT_THAT(
45324577
TDigestMap(ArrayFromJSON(input_type, "[1.0, 2.0, 3.0, 4.0, 5.0]"), keep_nulls),
45334578
ResultWith(ScalarFromJSON(output_type,
45344579
"{\"mean\":[1.5, 3.5, 5.0, null, null],\"weight\":[2, 2, "
4535-
"1, null, null],\"count\":5,\"min\":1.0,\"max\":5.0}")));
4580+
"1, null, null],\"min\":1.0,\"max\":5.0,\"count\":5}")));
45364581
EXPECT_THAT(
45374582
TDigestMap(ArrayFromJSON(input_type, "[1.0, 2.0, 3.0, 4.0]"), keep_nulls),
45384583
ResultWith(ScalarFromJSON(
45394584
output_type,
45404585
"{\"mean\":[1.0, 2.0, 3.0, 4.0, "
4541-
"null],\"weight\":[1,1,1,1,null],\"count\":4,\"min\":1.0,\"max\":4.0}")));
4586+
"null],\"weight\":[1,1,1,1,null],\"min\":1.0,\"max\":4.0,\"count\":4}")));
45424587

45434588
EXPECT_THAT(
45444589
TDigestMap(ArrayFromJSON(input_type, "[1.0, 2.0, 3.0]"), keep_nulls),
45454590
ResultWith(ScalarFromJSON(output_type,
45464591
"{\"mean\":[1.0,2.0,3.0,null,null],\"weight\":[1,"
4547-
"1,1,null,null],\"count\":3,\"min\":1.0,\"max\":3.0}")));
4592+
"1,1,null,null],\"min\":1.0,\"max\":3.0,\"count\":3}")));
45484593
EXPECT_THAT(TDigestMap(ArrayFromJSON(input_type, "[1.0, 2.0, 3.0, null]"), keep_nulls),
45494594
ResultWith(ScalarFromJSON(output_type, "null")));
45504595
EXPECT_THAT(TDigestMap(ScalarFromJSON(input_type, "1.0"), keep_nulls),
45514596
ResultWith(ScalarFromJSON(
45524597
output_type,
45534598
"{\"mean\":[1.0,null,null,null,null],\"weight\":["
4554-
"1,null,null,null,null],\"count\":1,\"min\":1.0,\"max\":1.0}")));
4599+
"1,null,null,null,null],\"min\":1.0,\"max\":1.0,\"count\":1}")));
45554600
EXPECT_THAT(TDigestMap(ScalarFromJSON(input_type, "null"), keep_nulls),
45564601
ResultWith(ScalarFromJSON(output_type, "null")));
45574602

45584603
EXPECT_THAT(
45594604
TDigestMap(ArrayFromJSON(input_type, "[1.0, 2.0, 3.0, null]"), skip_nulls),
45604605
ResultWith(ScalarFromJSON(output_type,
45614606
"{\"mean\":[1.0,2.0,3.0,null,null],\"weight\":[1,"
4562-
"1,1,null,null],\"count\":3,\"min\":1.0,\"max\":3.0}")));
4607+
"1,1,null,null],\"min\":1.0,\"max\":3.0,\"count\":3}")));
45634608
EXPECT_THAT(TDigestMap(ArrayFromJSON(input_type, "[1.0, 2.0, null]"), skip_nulls),
45644609
ResultWith(ScalarFromJSON(
45654610
output_type,
45664611
"{\"mean\":[1.0,2.0,null,null,null],\"weight\":["
4567-
"1,1,null,null,null],\"count\":2,\"min\":1.0,\"max\":2.0}")));
4612+
"1,1,null,null,null],\"min\":1.0,\"max\":2.0,\"count\":2}")));
45684613
EXPECT_THAT(TDigestMap(ScalarFromJSON(input_type, "1.0"), skip_nulls),
45694614
ResultWith(ScalarFromJSON(
45704615
output_type,
45714616
"{\"mean\":[1.0,null,null,null,null],\"weight\":["
4572-
"1,null,null,null,null],\"count\":1,\"min\":1.0,\"max\":1.0}")));
4617+
"1,null,null,null,null],\"min\":1.0,\"max\":1.0,\"count\":1}")));
45734618
EXPECT_THAT(TDigestMap(ScalarFromJSON(input_type, "null"), skip_nulls),
45744619
ResultWith(ScalarFromJSON(
45754620
output_type,
45764621
"{\"mean\":[null,null,null,null,null],\"weight\":"
4577-
"[null,null,null,null,null],\"count\":0,\"min\":null,\"max\":null}")));
4622+
"[null,null,null,null,null],\"min\":null,\"max\":null,\"count\":0}")));
45784623
}
45794624

45804625
TEST(TestTDigestReduceKernel, Basic) {
45814626
auto type = struct_({field("mean", fixed_size_list(float64(), 5), false),
45824627
field("weight", fixed_size_list(float64(), 5), false),
4583-
field("count", uint64(), false), field("min", float64(), true),
4584-
field("max", float64(), true)});
4628+
field("min", float64(), true), field("max", float64(), true),
4629+
field("count", uint64(), false)});
45854630
TDigestReduceOptions options(/*scaler=*/TDigestMapOptions::Scaler::K0);
45864631
EXPECT_THAT(
45874632
TDigestReduce(
45884633
ArrayFromJSON(type,
45894634
"["
45904635
"{\"mean\":[1.5, 3.5, 5.5, null, null],\"weight\":[2, "
4591-
"2, 2, null, null],\"count\":6,\"min\":1.0,\"max\":6.0},"
4636+
"2, 2, null, null],\"min\":1.0,\"max\":6.0,\"count\":6},"
45924637
"{\"mean\":[1.5, 3.5, 5.5, null, null],\"weight\":[2, "
4593-
"2, 2, null, null],\"count\":6,\"min\":1.0,\"max\":6.0}"
4638+
"2, 2, null, null],\"min\":1.0,\"max\":6.0,\"count\":6}"
45944639
"]"),
45954640
options),
45964641
ResultWith(ScalarFromJSON(type,
45974642
"{\"mean\":[1.5, 3.5, 5.5, null, null],\"weight\":[4, 4, "
4598-
"4, null, null],\"count\":12,\"min\":1.0,\"max\":6.0}")));
4643+
"4, null, null],\"min\":1.0,\"max\":6.0,\"count\":12}")));
45994644

46004645
EXPECT_THAT(
46014646
TDigestReduce(
46024647
ScalarFromJSON(type,
46034648
"{\"mean\":[1.5, 3.5, 5.5, null, null],\"weight\":[2, "
4604-
"2, 2, null, null],\"count\":6,\"min\":1.0,\"max\":6.0}"),
4649+
"2, 2, null, null],\"min\":1.0,\"max\":6.0,\"count\":6}"),
46054650
options),
46064651
ResultWith(ScalarFromJSON(type,
46074652
"{\"mean\":[1.5, 3.5, 5.5, null, null],\"weight\":[2, 2, "
4608-
"2, null, null],\"count\":6,\"min\":1.0,\"max\":6.0}")));
4653+
"2, null, null],\"min\":1.0,\"max\":6.0,\"count\":6}")));
46094654
}
46104655

46114656
TEST(TestTDigestQuantileKernel, Basic) {
4612-
auto input_type =
4613-
struct_({field("mean", fixed_size_list(float64(), 5), false),
4614-
field("weight", fixed_size_list(float64(), 5), false),
4615-
field("count", uint64(), false), field("min", float64(), true),
4616-
field("max", float64(), true)});
4657+
auto input_type = struct_({field("mean", fixed_size_list(float64(), 5), false),
4658+
field("weight", fixed_size_list(float64(), 5), false),
4659+
field("min", float64(), true), field("max", float64(), true),
4660+
field("count", uint64(), false)});
46174661

46184662
auto output_type = float64();
46194663

46204664
auto input_array =
46214665
ArrayFromJSON(input_type,
46224666
"["
46234667
"{\"mean\":[1.5, 3.5, 5.5, null, null],\"weight\":[2, "
4624-
"2, 2, null, null],\"count\":6,\"min\":1.0,\"max\":6.0},"
4668+
"2, 2, null, null],\"min\":1.0,\"max\":6.0,\"count\":6},"
46254669
"{\"mean\":[1.5, 3.5, 5.5, null, null],\"weight\":[2, "
4626-
"2, 2, null, null],\"count\":6,\"min\":1.0,\"max\":6.0}"
4670+
"2, 2, null, null],\"min\":1.0,\"max\":6.0,\"count\":6}"
46274671
"]");
46284672

46294673
TDigestQuantileOptions multiple(/*q=*/{0.1, 0.5, 0.9}, /*min_count=*/12);
@@ -4636,21 +4680,20 @@ TEST(TestTDigestQuantileKernel, Basic) {
46364680
}
46374681

46384682
TEST(TestTDigestMapReduceQuantileKernel, Basic) {
4639-
auto input_type =
4640-
struct_({field("mean", fixed_size_list(float64(), 5), false),
4641-
field("weight", fixed_size_list(float64(), 5), false),
4642-
field("count", uint64(), false), field("min", float64(), true),
4643-
field("max", float64(), true)});
4683+
auto input_type = struct_({field("mean", fixed_size_list(float64(), 5), false),
4684+
field("weight", fixed_size_list(float64(), 5), false),
4685+
field("min", float64(), true), field("max", float64(), true),
4686+
field("count", uint64(), false)});
46444687

46454688
auto output_type = float64();
46464689

46474690
auto input_array =
46484691
ArrayFromJSON(input_type,
46494692
"["
46504693
"{\"mean\":[1.5, 3.5, 5.5, null, null],\"weight\":[2, "
4651-
"2, 2, null, null],\"count\":6,\"min\":1.0,\"max\":6.0},"
4694+
"2, 2, null, null],\"min\":1.0,\"max\":6.0,\"count\":6},"
46524695
"{\"mean\":[1.5, 3.5, 5.5, null, null],\"weight\":[2, "
4653-
"2, 2, null, null],\"count\":6,\"min\":1.0,\"max\":6.0}"
4696+
"2, 2, null, null],\"min\":1.0,\"max\":6.0,\"count\":6}"
46544697
"]");
46554698

46564699
TDigestQuantileOptions multiple(/*q=*/{0.1, 0.5, 0.9}, /*min_count=*/12);

0 commit comments

Comments
 (0)