Skip to content

Commit a5f65e7

Browse files
fix(ft.aggregate): Fix Compress() zombie-parent bug in Quantile reducer
Signed-off-by: Riley Des <riley.desserre@improving.com>
1 parent 2e371f4 commit a5f65e7

2 files changed

Lines changed: 50 additions & 4 deletions

File tree

src/commands/ft_aggregate_exec.cc

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -418,25 +418,33 @@ class Quantile : public GroupBy::ReducerInstance {
418418
// Compress samples to maintain space bounds.
419419
// Walks backward from the second-to-last sample so that merges
420420
// propagate toward the tail where error bounds are largest.
421+
// parent_idx tracks the nearest live (non-merged) sample to the right of
422+
// the current position, preventing merged (g==0) zombie samples from being
423+
// used as merge targets and silently absorbing subsequent merges.
421424
void Compress() const {
422425
if (samples_.size() < 2) return;
423426

424427
// Compute rank of the last sample (n - 1 - last.g)
425428
double r =
426429
static_cast<double>(n_) - 1.0 - static_cast<double>(samples_.back().g);
427430

428-
// Walk backward from second-to-last to first
431+
// Walk backward from second-to-last to first, always merging into the
432+
// nearest live parent to the right.
433+
size_t parent_idx = samples_.size() - 1;
429434
for (int i = static_cast<int>(samples_.size()) - 2; i >= 0; --i) {
430435
Sample& curr = samples_[i];
431-
Sample& parent = samples_[i + 1];
436+
Sample& parent = samples_[parent_idx];
432437
double g_curr = curr.g;
433438

434439
if (curr.g + parent.g + parent.delta <=
435440
static_cast<size_t>(GetMaxVal(r))) {
436-
// Merge curr into parent
441+
// Merge curr into parent; parent_idx stays — keep accumulating into
442+
// the same live sample.
437443
parent.g += curr.g;
438-
// Mark for removal by setting g to 0
439444
curr.g = 0;
445+
} else {
446+
// curr survives; it becomes the new live parent for i-1.
447+
parent_idx = static_cast<size_t>(i);
440448
}
441449
r -= g_curr;
442450
}

testing/ft_aggregate_exec_test.cc

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,5 +1029,43 @@ TEST_F(AggregateExecTest, QuantileStringNumericValueTest) {
10291029
EXPECT_NEAR(result, 2.0, 1.0);
10301030
}
10311031

1032+
// Regression test for the Compress() zombie-parent bug.
1033+
// Before the fix, each Compress() pass deleted at most one sample because
1034+
// merged (g==0) samples were used as merge targets, reviving them. After the
1035+
// fix, a single pass must delete all consecutively-mergeable samples.
1036+
TEST_F(AggregateExecTest, CompressBoundedSampleCount) {
1037+
std::cerr << "CompressBoundedSampleCount\n";
1038+
1039+
// Insert enough values to trigger many Flush()+Compress() cycles.
1040+
// With EPSILON=0.01 the GK bound is O((1/ε)·log(ε·N)) ≈ 700 samples for
1041+
// N=100 000. The buggy implementation retained ~N samples; the fixed one
1042+
// must stay well below N.
1043+
const size_t kN = 100000;
1044+
// kDefaultBufferSize is 500; each flush triggers a compress.
1045+
// We drive inserts through the public ProcessRecord path via Execute().
1046+
// To keep the test self-contained we use a large single group.
1047+
RecordSet records(nullptr);
1048+
for (size_t i = 0; i < kN; ++i) {
1049+
auto rec = std::make_unique<Record>(2);
1050+
rec->fields_[0] = expr::Value(static_cast<double>(i));
1051+
rec->fields_[1] = expr::Value(1.0); // single group key
1052+
records.emplace_back(std::move(rec));
1053+
}
1054+
1055+
auto param = MakeStages("groupby 1 @n2 reduce quantile 2 @n1 0.5");
1056+
EXPECT_TRUE((param->stages_[0]->Execute(records)).ok());
1057+
EXPECT_EQ(records.size(), 1);
1058+
auto record = records.pop_front();
1059+
1060+
// The median of [0, N) is N/2. Allow 1% relative error (EPSILON).
1061+
ASSERT_TRUE(record->fields_.at(2).IsDouble());
1062+
double result = *(record->fields_.at(2).AsDouble());
1063+
double expected = static_cast<double>(kN) / 2.0;
1064+
EXPECT_NEAR(result, expected, expected * 0.01)
1065+
<< "Median of [0," << kN << ") should be within 1% of " << expected;
1066+
1067+
std::cerr << "CompressBoundedSampleCount passed, median=" << result << "\n";
1068+
}
1069+
10321070
} // namespace aggregate
10331071
} // namespace valkey_search

0 commit comments

Comments
 (0)