Skip to content

Commit 0b130ef

Browse files
xiangfu0claude
andcommitted
Add production-behavior accuracy test for TDigest aggregator
Verifies that PercentileTDigestValueAggregator with default compression=100 produces accurate quantile estimates under production-like conditions: - Single-path raw value ingestion: < 0.5% error vs exact quantiles - Pre-aggregated byte[] merge: < 0.5% error - Multi-level star-tree merge with ser/deser cycles: < 1.0% error - Stability across 10 random seeds: < 0.5% error - Serialization round-trip (5 cycles): no accuracy degradation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8c59246 commit 0b130ef

1 file changed

Lines changed: 285 additions & 0 deletions

File tree

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.segment.local.aggregator;
20+
21+
import com.tdunning.math.stats.TDigest;
22+
import java.util.Arrays;
23+
import java.util.Collections;
24+
import java.util.Random;
25+
import org.testng.annotations.Test;
26+
27+
import static org.testng.Assert.assertEquals;
28+
import static org.testng.Assert.assertTrue;
29+
30+
31+
/**
32+
* Tests that the {@link PercentileTDigestValueAggregator} produces accurate quantile estimates
33+
* under production-like conditions using the default compression (100).
34+
*
35+
* <p>These tests simulate single-path production behavior — where raw values are ingested into
36+
* a TDigest, serialized/deserialized through segment and star-tree building, and then queried.
37+
* The results are compared against exact quantiles computed from the raw data.
38+
*
39+
* <p>This is different from the star-tree comparison tests which compare two merge paths against
40+
* each other. Here we verify absolute accuracy against ground truth.
41+
*/
42+
public class PercentileTDigestValueAggregatorTest {
43+
private static final int MAX_VALUE = 10000;
44+
// Single-path accuracy: raw values merged into one TDigest, compared against exact quantiles.
45+
// With default compression=100 and uniform data, error is well below 0.5%.
46+
private static final double SINGLE_PATH_MAX_ERROR = 0.005; // 0.5%
47+
// Multi-level merge accuracy: TDigests merged in batches with intermediate serialization,
48+
// simulating star-tree building. The extra ser/deser cycles increase merge-order sensitivity
49+
// in t-digest 3.3, leading to slightly higher error vs ground truth.
50+
private static final double MULTI_LEVEL_MAX_ERROR = 0.01; // 1.0%
51+
52+
/**
53+
* Simulates production ingestion: raw double values are added one at a time via applyRawValue,
54+
* then quantile estimates are compared against exact values.
55+
*/
56+
@Test
57+
public void testSinglePathAccuracyWithRawValues() {
58+
PercentileTDigestValueAggregator aggregator =
59+
new PercentileTDigestValueAggregator(Collections.emptyList());
60+
61+
int numValues = 100_000;
62+
double[] rawValues = new double[numValues];
63+
Random random = new Random(42);
64+
65+
// Ingest first value
66+
rawValues[0] = random.nextInt(MAX_VALUE);
67+
TDigest digest = aggregator.getInitialAggregatedValue(rawValues[0]);
68+
69+
// Ingest remaining values
70+
for (int i = 1; i < numValues; i++) {
71+
rawValues[i] = random.nextInt(MAX_VALUE);
72+
digest = aggregator.applyRawValue(digest, rawValues[i]);
73+
}
74+
75+
// Serialize/deserialize to simulate segment storage
76+
byte[] serialized = aggregator.serializeAggregatedValue(digest);
77+
TDigest result = aggregator.deserializeAggregatedValue(serialized);
78+
79+
// Compute exact quantiles and compare
80+
Arrays.sort(rawValues);
81+
assertQuantilesWithinTolerance(result, rawValues, SINGLE_PATH_MAX_ERROR);
82+
}
83+
84+
/**
85+
* Simulates production with pre-aggregated TDigest bytes (e.g., from a previous segment).
86+
* Multiple TDigests are merged via applyRawValue(byte[]), serialized, and verified.
87+
*/
88+
@Test
89+
public void testSinglePathAccuracyWithPreAggregatedBytes() {
90+
PercentileTDigestValueAggregator aggregator =
91+
new PercentileTDigestValueAggregator(Collections.emptyList());
92+
93+
int numDigests = 1000;
94+
int valuesPerDigest = 100;
95+
double[] allValues = new double[numDigests * valuesPerDigest];
96+
Random random = new Random(42);
97+
98+
// Create first TDigest and serialize
99+
TDigest first = TDigest.createMergingDigest(PercentileTDigestValueAggregator.DEFAULT_TDIGEST_COMPRESSION);
100+
for (int j = 0; j < valuesPerDigest; j++) {
101+
double val = random.nextInt(MAX_VALUE);
102+
first.add(val);
103+
allValues[j] = val;
104+
}
105+
byte[] firstBytes = aggregator.serializeAggregatedValue(first);
106+
107+
// Initialize aggregated value from bytes
108+
TDigest merged = aggregator.getInitialAggregatedValue(firstBytes);
109+
110+
// Merge remaining TDigests via applyRawValue(byte[])
111+
for (int i = 1; i < numDigests; i++) {
112+
TDigest td = TDigest.createMergingDigest(PercentileTDigestValueAggregator.DEFAULT_TDIGEST_COMPRESSION);
113+
for (int j = 0; j < valuesPerDigest; j++) {
114+
double val = random.nextInt(MAX_VALUE);
115+
td.add(val);
116+
allValues[i * valuesPerDigest + j] = val;
117+
}
118+
byte[] bytes = aggregator.serializeAggregatedValue(td);
119+
merged = aggregator.applyRawValue(merged, bytes);
120+
}
121+
122+
// Serialize/deserialize round-trip
123+
byte[] serialized = aggregator.serializeAggregatedValue(merged);
124+
TDigest result = aggregator.deserializeAggregatedValue(serialized);
125+
126+
Arrays.sort(allValues);
127+
assertQuantilesWithinTolerance(result, allValues, SINGLE_PATH_MAX_ERROR);
128+
}
129+
130+
/**
131+
* Simulates star-tree multi-level merge: TDigests are merged via applyAggregatedValue,
132+
* with intermediate serialization/deserialization between levels.
133+
*/
134+
@Test
135+
public void testMultiLevelMergeAccuracy() {
136+
PercentileTDigestValueAggregator aggregator =
137+
new PercentileTDigestValueAggregator(Collections.emptyList());
138+
139+
int numDigests = 1000;
140+
int valuesPerDigest = 100;
141+
int batchSize = 50; // Simulates leaf node size in star-tree
142+
double[] allValues = new double[numDigests * valuesPerDigest];
143+
Random random = new Random(42);
144+
145+
// Create all TDigests
146+
TDigest[] digests = new TDigest[numDigests];
147+
for (int i = 0; i < numDigests; i++) {
148+
digests[i] = TDigest.createMergingDigest(PercentileTDigestValueAggregator.DEFAULT_TDIGEST_COMPRESSION);
149+
for (int j = 0; j < valuesPerDigest; j++) {
150+
double val = random.nextInt(MAX_VALUE);
151+
digests[i].add(val);
152+
allValues[i * valuesPerDigest + j] = val;
153+
}
154+
}
155+
156+
// Level 1: merge into batches with serialize/deserialize between levels
157+
int numBatches = (numDigests + batchSize - 1) / batchSize;
158+
TDigest[] batchResults = new TDigest[numBatches];
159+
for (int b = 0; b < numBatches; b++) {
160+
int start = b * batchSize;
161+
int end = Math.min(start + batchSize, numDigests);
162+
TDigest batchAcc = aggregator.cloneAggregatedValue(digests[start]);
163+
for (int i = start + 1; i < end; i++) {
164+
batchAcc = aggregator.applyAggregatedValue(batchAcc, digests[i]);
165+
}
166+
// Serialize/deserialize between levels (as star-tree builder does)
167+
batchResults[b] = aggregator.deserializeAggregatedValue(aggregator.serializeAggregatedValue(batchAcc));
168+
}
169+
170+
// Level 2: merge batch results
171+
TDigest finalResult = aggregator.cloneAggregatedValue(batchResults[0]);
172+
for (int i = 1; i < numBatches; i++) {
173+
finalResult = aggregator.applyAggregatedValue(finalResult, batchResults[i]);
174+
}
175+
176+
// Final serialize/deserialize (as stored in star-tree forward index)
177+
byte[] serialized = aggregator.serializeAggregatedValue(finalResult);
178+
TDigest result = aggregator.deserializeAggregatedValue(serialized);
179+
180+
Arrays.sort(allValues);
181+
assertQuantilesWithinTolerance(result, allValues, MULTI_LEVEL_MAX_ERROR);
182+
}
183+
184+
/**
185+
* Tests accuracy across multiple random seeds to ensure stability.
186+
*/
187+
@Test
188+
public void testAccuracyStability() {
189+
for (int seed = 0; seed < 10; seed++) {
190+
PercentileTDigestValueAggregator aggregator =
191+
new PercentileTDigestValueAggregator(Collections.emptyList());
192+
193+
int numValues = 50_000;
194+
double[] rawValues = new double[numValues];
195+
Random random = new Random(seed);
196+
197+
rawValues[0] = random.nextInt(MAX_VALUE);
198+
TDigest digest = aggregator.getInitialAggregatedValue(rawValues[0]);
199+
200+
for (int i = 1; i < numValues; i++) {
201+
rawValues[i] = random.nextInt(MAX_VALUE);
202+
digest = aggregator.applyRawValue(digest, rawValues[i]);
203+
}
204+
205+
byte[] serialized = aggregator.serializeAggregatedValue(digest);
206+
TDigest result = aggregator.deserializeAggregatedValue(serialized);
207+
208+
Arrays.sort(rawValues);
209+
assertQuantilesWithinTolerance(result, rawValues, SINGLE_PATH_MAX_ERROR);
210+
}
211+
}
212+
213+
/**
214+
* Tests that serialization round-trip preserves TDigest accuracy.
215+
*/
216+
@Test
217+
public void testSerializationRoundTripPreservesAccuracy() {
218+
PercentileTDigestValueAggregator aggregator =
219+
new PercentileTDigestValueAggregator(Collections.emptyList());
220+
221+
int numValues = 10_000;
222+
double[] rawValues = new double[numValues];
223+
Random random = new Random(42);
224+
225+
rawValues[0] = random.nextInt(MAX_VALUE);
226+
TDigest digest = aggregator.getInitialAggregatedValue(rawValues[0]);
227+
for (int i = 1; i < numValues; i++) {
228+
rawValues[i] = random.nextInt(MAX_VALUE);
229+
digest = aggregator.applyRawValue(digest, rawValues[i]);
230+
}
231+
232+
// Multiple round-trips should not degrade accuracy
233+
TDigest current = digest;
234+
for (int round = 0; round < 5; round++) {
235+
byte[] bytes = aggregator.serializeAggregatedValue(current);
236+
current = aggregator.deserializeAggregatedValue(bytes);
237+
}
238+
239+
Arrays.sort(rawValues);
240+
assertQuantilesWithinTolerance(current, rawValues, SINGLE_PATH_MAX_ERROR);
241+
}
242+
243+
/**
244+
* Asserts that TDigest quantile estimates are within the given error tolerance of exact values.
245+
*
246+
* @param digest the TDigest to verify
247+
* @param sortedValues the raw data sorted in ascending order (ground truth)
248+
* @param maxError maximum allowed error as a fraction of MAX_VALUE (e.g., 0.005 = 0.5%)
249+
*/
250+
private void assertQuantilesWithinTolerance(TDigest digest, double[] sortedValues, double maxError) {
251+
int n = sortedValues.length;
252+
double delta = MAX_VALUE * maxError;
253+
254+
for (int q = 0; q <= 100; q++) {
255+
double p = q / 100.0;
256+
double estimated = digest.quantile(p);
257+
258+
// Compute exact quantile using linear interpolation
259+
double exactIndex = p * (n - 1);
260+
int lower = (int) Math.floor(exactIndex);
261+
int upper = Math.min(lower + 1, n - 1);
262+
double fraction = exactIndex - lower;
263+
double exact = sortedValues[lower] * (1 - fraction) + sortedValues[upper] * fraction;
264+
265+
assertEquals(estimated, exact, delta,
266+
String.format("Quantile %d: estimated=%.2f, exact=%.2f, diff=%.2f, tolerance=%.2f",
267+
q, estimated, exact, Math.abs(estimated - exact), delta));
268+
}
269+
270+
// Also verify the max absolute error across all percentiles
271+
double maxAbsError = 0;
272+
for (int q = 0; q <= 100; q++) {
273+
double p = q / 100.0;
274+
double exactIndex = p * (n - 1);
275+
int lower = (int) Math.floor(exactIndex);
276+
int upper = Math.min(lower + 1, n - 1);
277+
double fraction = exactIndex - lower;
278+
double exact = sortedValues[lower] * (1 - fraction) + sortedValues[upper] * fraction;
279+
maxAbsError = Math.max(maxAbsError, Math.abs(digest.quantile(p) - exact));
280+
}
281+
double maxErrorPct = maxAbsError / MAX_VALUE;
282+
assertTrue(maxErrorPct < maxError,
283+
String.format("Max error %.4f%% exceeds tolerance %.4f%%", maxErrorPct * 100, maxError * 100));
284+
}
285+
}

0 commit comments

Comments
 (0)