Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/types/tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ refer to https://github.com/apache/arrow/blob/27bbd593625122a4a25d9471c8aaf5df54
#include <fmt/format.h>

#include <algorithm>
#include <initializer_list>
#include <iterator>
#include <queue>

Expand Down Expand Up @@ -355,6 +356,8 @@ class TDigest {
void Merge(const std::vector<TDigest>& others);
void Add(const std::vector<double>& items);
void Reset(const CentroidsWithDelta& centroid_list);
// Callers must explicitly specify the initial state to reset to; implicit initialization like Reset({}) is forbidden.
void Reset(std::initializer_list<CentroidsWithDelta>) = delete;
Comment thread
Tangruilin marked this conversation as resolved.
void Reset();
CentroidsWithDelta DumpCentroids() const;
Comment thread
Tangruilin marked this conversation as resolved.

Expand Down
9 changes: 5 additions & 4 deletions src/types/tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <fmt/format.h>

#include <limits>
#include <map>
#include <numeric>
#include <variant>
Expand All @@ -47,10 +48,10 @@ struct Centroid {

struct CentroidsWithDelta {
Comment thread
Tangruilin marked this conversation as resolved.
std::vector<Centroid> centroids;
uint64_t delta;
double min;
double max;
double total_weight;
uint64_t delta = 0;
double min = std::numeric_limits<double>::max();
double max = std::numeric_limits<double>::lowest();
double total_weight = 0;
Comment thread
Tangruilin marked this conversation as resolved.
};

StatusOr<CentroidsWithDelta> TDigestMerge(const std::vector<CentroidsWithDelta>& centroids_list, uint64_t delta);
Expand Down
56 changes: 56 additions & 0 deletions tests/gocase/unit/type/tdigest/tdigest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,62 @@ func tdigestTests(t *testing.T, configs util.KvrocksServerConfigs) {
require.EqualValues(t, 5, info.Observations) // src data replaced dest data due to OVERRIDE
})

// Regression test for empty merge: min/max should be correctly initialized
// Before fix: merging empty sources initialized min/max to 0, corrupting subsequent add
t.Run("tdigest.merge empty sources then add samples", func(t *testing.T) {
keyPrefix := "tdigest_merge_empty_"

srcKey1 := keyPrefix + "src1"
srcKey2 := keyPrefix + "src2"
destKey := keyPrefix + "dest"

// Create empty sources (no observations added)
require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", srcKey1, "compression", "100").Err())
require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", srcKey2, "compression", "100").Err())

// Merge empty sources into new destination
require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", destKey, 2, srcKey1, srcKey2).Err())

// Verify dest is empty
rsp := rdb.Do(ctx, "TDIGEST.INFO", destKey)
require.NoError(t, rsp.Err())
info := toTdigestInfo(t, rsp.Val())
require.EqualValues(t, 0, info.Observations)

// Add positive samples and verify min/max
require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", destKey, "10", "20", "30").Err())

rsp = rdb.Do(ctx, "TDIGEST.MIN", destKey)
require.NoError(t, rsp.Err())
minVal, err := rsp.Float64()
require.NoError(t, err)
require.InEpsilon(t, 10.0, minVal, 0.1, "min should be 10, not 0")

rsp = rdb.Do(ctx, "TDIGEST.MAX", destKey)
require.NoError(t, rsp.Err())
maxVal, err := rsp.Float64()
require.NoError(t, err)
require.InEpsilon(t, 30.0, maxVal, 0.1, "max should be 30")

// Test with negative samples
destKey2 := keyPrefix + "dest2"
require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", destKey2, 2, srcKey1, srcKey2).Err())

require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", destKey2, "-10", "-20", "-30").Err())

rsp = rdb.Do(ctx, "TDIGEST.MIN", destKey2)
require.NoError(t, rsp.Err())
minVal, err = rsp.Float64()
require.NoError(t, err)
require.InEpsilon(t, -30.0, minVal, 0.1, "min should be -30")

rsp = rdb.Do(ctx, "TDIGEST.MAX", destKey2)
require.NoError(t, rsp.Err())
maxVal, err = rsp.Float64()
require.NoError(t, err)
require.InEpsilon(t, -10.0, maxVal, 0.1, "max should be -10, not 0")
})

t.Run("tdigest.revrank with different arguments", func(t *testing.T) {
keyPrefix := "tdigest_revrank_"

Expand Down
Loading