Skip to content

Commit 505108c

Browse files
TangruilinjihuayuLindaSummer
authored
fix(tdigest): merge into existing dest without OVERRIDE (#3412)
Co-authored-by: 纪华裕 <jihuayu123@gmail.com> Co-authored-by: Edward Xu <xuxiangad@gmail.com>
1 parent 75df374 commit 505108c

5 files changed

Lines changed: 543 additions & 23 deletions

File tree

src/types/redis_tdigest.cc

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -437,9 +437,8 @@ rocksdb::Status TDigest::Merge(engine::Context& ctx, const Slice& dest_digest,
437437
return status;
438438
} else if (status.ok()) {
439439
dest_digest_existed = true;
440-
if (!options.override_flag) {
441-
return rocksdb::Status::InvalidArgument(fmt::format("{}: {}", errKeyAlreadyExists, dest_digest.ToString()));
442-
}
440+
// When dest exists without OVERRIDE flag, we should merge dest's data with sources
441+
// (Redis behavior: merge into existing sketch instead of returning error)
443442
}
444443

445444
auto batch = storage_->GetWriteBatchBase();
@@ -473,21 +472,8 @@ rocksdb::Status TDigest::Merge(engine::Context& ctx, const Slice& dest_digest,
473472
return status;
474473
}
475474

476-
if (metadata.unmerged_nodes > 0) {
477-
if (auto status = mergeCurrentBuffer(ctx, source_ns_key, batch, &metadata, nullptr, &source_centroids);
478-
!status.ok()) {
479-
return status;
480-
}
481-
482-
std::string metadata_bytes;
483-
metadata.Encode(&metadata_bytes);
484-
if (auto status = batch->Put(metadata_cf_handle_, source_ns_key, metadata_bytes); !status.ok()) {
485-
return status;
486-
}
487-
} else if (metadata.merged_nodes > 0) {
488-
if (auto status = dumpCentroids(ctx, source_ns_key, metadata, &source_centroids); !status.ok()) {
489-
return status;
490-
}
475+
if (auto status = getCentroidsForMerge(ctx, source_ns_key, batch, &metadata, &source_centroids); !status.ok()) {
476+
return status;
491477
}
492478

493479
if (!source_centroids.empty()) {
@@ -506,6 +492,40 @@ rocksdb::Status TDigest::Merge(engine::Context& ctx, const Slice& dest_digest,
506492
compression = std::max(compression, metadata.compression);
507493
}
508494

495+
// Merge dest's existing data if dest exists without OVERRIDE flag
496+
// (Redis behavior: dest is merged first, then source list is merged,
497+
// so if dest is in source list, it gets merged twice)
498+
bool should_merge_dest = dest_digest_existed && !options.override_flag;
499+
500+
if (should_merge_dest) {
501+
std::vector<Centroid> dest_centroids;
502+
503+
if (auto status = getCentroidsForMerge(ctx, dest_ns_key, batch, &dest_metadata, &dest_centroids); !status.ok()) {
504+
return status;
505+
}
506+
507+
if (!dest_centroids.empty()) {
508+
source_centroids_data.emplace_back(CentroidsWithDelta{
509+
.centroids = std::move(dest_centroids),
510+
.delta = dest_metadata.compression,
511+
.min = dest_metadata.minimum,
512+
.max = dest_metadata.maximum,
513+
.total_weight = static_cast<double>(dest_metadata.merged_weight),
514+
});
515+
total_observations += dest_metadata.total_observations;
516+
}
517+
// Redis behavior: when merging into existing dest without OVERRIDE,
518+
// keep dest's compression value (ignore sources' compression)
519+
compression = dest_metadata.compression;
520+
}
521+
522+
/*
523+
* refer to: https://redis.io/docs/latest/commands/tdigest.merge/
524+
* When COMPRESSION is not specified:
525+
* - If destination-key does not exist or if OVERRIDE is specified, the compression is set to the maximum value among
526+
* all source sketches.
527+
* - If destination-key already exists and OVERRIDE is not specified, its compression is not changed.
528+
*/
509529
if (options.compression != 0) {
510530
compression = options.compression;
511531
}
@@ -744,6 +764,26 @@ rocksdb::Status TDigest::dumpCentroidsAndBuffer(engine::Context& ctx, const std:
744764
return rocksdb::Status::OK();
745765
}
746766

767+
rocksdb::Status TDigest::getCentroidsForMerge(engine::Context& ctx, const std::string& ns_key,
768+
ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch,
769+
TDigestMetadata* metadata, std::vector<Centroid>* centroids) {
770+
if (metadata->unmerged_nodes > 0) {
771+
if (auto status = mergeCurrentBuffer(ctx, ns_key, batch, metadata, nullptr, centroids); !status.ok()) {
772+
return status;
773+
}
774+
std::string metadata_bytes;
775+
metadata->Encode(&metadata_bytes);
776+
if (auto status = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); !status.ok()) {
777+
return status;
778+
}
779+
} else if (metadata->merged_nodes > 0) {
780+
if (auto status = dumpCentroids(ctx, ns_key, *metadata, centroids); !status.ok()) {
781+
return status;
782+
}
783+
}
784+
return rocksdb::Status::OK();
785+
}
786+
747787
rocksdb::Status TDigest::applyNewCentroids(ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch,
748788
const std::string& ns_key, const TDigestMetadata& metadata,
749789
const std::vector<Centroid>& centroids) {

src/types/redis_tdigest.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,22 @@ class TDigest : public SubKeyScanner {
124124
const TDigestMetadata& metadata, std::vector<Centroid>* centroids,
125125
std::vector<double>* buffer,
126126
ObserverOrUniquePtr<rocksdb::WriteBatchBase>* clean_after_dump_batch);
127+
128+
/**
129+
* @brief Get centroids for merge operation.
130+
*
131+
* If the tdigest has unmerged buffer, merge it first and update metadata to batch.
132+
* Otherwise, just dump existing centroids.
133+
* @param ctx The context of the operation.
134+
* @param ns_key The namespace key of the t-digest.
135+
* @param batch The write batch to store metadata updates.
136+
* @param metadata The metadata of the t-digest (may be updated if buffer is merged).
137+
* @param centroids The output vector to store the centroids.
138+
*/
139+
rocksdb::Status getCentroidsForMerge(engine::Context& ctx, const std::string& ns_key,
140+
ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, TDigestMetadata* metadata,
141+
std::vector<Centroid>* centroids);
142+
127143
rocksdb::Status applyNewCentroids(ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, const std::string& ns_key,
128144
const TDigestMetadata& metadata, const std::vector<Centroid>& centroids);
129145

src/types/tdigest.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ class TDigest {
362362
TDigestImpl impl_;
363363
};
364364

365-
TDigest::TDigest(uint64_t delta) : impl_(TDigestImpl(delta)) { Reset({}); }
365+
TDigest::TDigest(uint64_t delta) : impl_(TDigestImpl(delta)) { Reset(); }
366366

367367
void TDigest::Merge(const std::vector<TDigest>& others) {
368368
if (others.empty()) {

0 commit comments

Comments
 (0)