Skip to content

Commit 2da64c8

Browse files
committed
Add pure t-digest merge-order reproducer
1 parent 73f569b commit 2da64c8

3 files changed

Lines changed: 409 additions & 0 deletions

File tree

pinot-segment-local/pom.xml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,4 +141,34 @@
141141
<artifactId>clp-ffi</artifactId>
142142
</dependency>
143143
</dependencies>
144+
145+
<build>
146+
<plugins>
147+
<plugin>
148+
<groupId>org.apache.maven.plugins</groupId>
149+
<artifactId>maven-dependency-plugin</artifactId>
150+
<version>3.8.1</version>
151+
<executions>
152+
<execution>
153+
<id>copy-tdigest-32-for-compat-tests</id>
154+
<phase>process-test-resources</phase>
155+
<goals>
156+
<goal>copy</goal>
157+
</goals>
158+
<configuration>
159+
<artifactItems>
160+
<artifactItem>
161+
<groupId>com.tdunning</groupId>
162+
<artifactId>t-digest</artifactId>
163+
<version>3.2</version>
164+
<destFileName>t-digest-3.2.jar</destFileName>
165+
<outputDirectory>${project.build.directory}/tdigest-compat</outputDirectory>
166+
</artifactItem>
167+
</artifactItems>
168+
</configuration>
169+
</execution>
170+
</executions>
171+
</plugin>
172+
</plugins>
173+
</build>
144174
</project>
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.segment.local.aggregator;
20+
21+
import com.tdunning.math.stats.MergingDigest;
22+
import com.tdunning.math.stats.TDigest;
23+
import java.nio.ByteBuffer;
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import java.util.Random;
27+
import org.testng.annotations.Test;
28+
29+
import static org.testng.Assert.assertTrue;
30+
31+
32+
/**
33+
* Pure t-digest reproducer for the merge-order sensitivity observed while upgrading Pinot from
34+
* t-digest 3.2 to 3.3 in PR 18103.
35+
*
36+
* <p>The sample data intentionally mirrors Pinot's pre-aggregated percentileTDigest star-tree
37+
* path: many tiny leaf digests (two points each), most values near zero, sparse large spikes near
38+
* the upper tail, and repeated serialize/deserialize round-trips between hierarchical merge levels.
39+
*
40+
* <p>This test intentionally exercises the 3.3 behavior only. See {@link TDigestVersionComparisonTest}
41+
* for the direct 3.2 vs 3.3 exact-quantile comparison on a fixed dataset.
42+
*
43+
* <p>On t-digest 3.3 this deterministic generator produces a large divergence between sequential
44+
* merging and hierarchical merging at low compression, while compression 500 restores stable
45+
* results. The generator depends only on t-digest APIs so it can be copied directly into upstream
46+
* t-digest tests for further investigation.
47+
*/
48+
public class TDigestMergeOrderReproducerTest {
49+
private static final int SCALE = 10_000;
50+
private static final int NUM_DIGESTS = 1_024;
51+
private static final int VALUES_PER_DIGEST = 2;
52+
private static final int BATCH_SIZE = 16;
53+
private static final int DATA_SEED = 5;
54+
55+
@Test
56+
public void testTailSpikesScenarioRequiresHighCompressionForStableHierarchicalMerges() {
57+
double divergenceAt100 = maxMergeOrderDivergence(100);
58+
double divergenceAt150 = maxMergeOrderDivergence(150);
59+
double divergenceAt200 = maxMergeOrderDivergence(200);
60+
double divergenceAt500 = maxMergeOrderDivergence(500);
61+
62+
assertTrue(divergenceAt100 > 0.02,
63+
String.format("Expected large merge-order divergence at compression 100 but saw %.6f", divergenceAt100));
64+
assertTrue(divergenceAt150 > 0.02,
65+
String.format("Expected large merge-order divergence at compression 150 but saw %.6f", divergenceAt150));
66+
assertTrue(divergenceAt200 > 0.02,
67+
String.format("Expected large merge-order divergence at compression 200 but saw %.6f", divergenceAt200));
68+
assertTrue(divergenceAt500 < 0.001,
69+
String.format("Expected stable merge-order behavior at compression 500 but saw %.6f", divergenceAt500));
70+
}
71+
72+
private double maxMergeOrderDivergence(int compression) {
73+
List<TDigest> leafDigests = createLeafDigests(compression);
74+
TDigest sequential = roundTrip(mergeSequential(leafDigests));
75+
TDigest hierarchical = roundTrip(mergeHierarchical(leafDigests, BATCH_SIZE));
76+
77+
double maxNormalizedDivergence = 0d;
78+
for (int percentile = 0; percentile <= 100; percentile++) {
79+
double quantile = percentile / 100d;
80+
double delta = Math.abs(sequential.quantile(quantile) - hierarchical.quantile(quantile)) / SCALE;
81+
maxNormalizedDivergence = Math.max(maxNormalizedDivergence, delta);
82+
}
83+
return maxNormalizedDivergence;
84+
}
85+
86+
private List<TDigest> createLeafDigests(int compression) {
87+
Random random = new Random(DATA_SEED);
88+
List<TDigest> digests = new ArrayList<>(NUM_DIGESTS);
89+
for (int i = 0; i < NUM_DIGESTS; i++) {
90+
TDigest digest = TDigest.createMergingDigest(compression);
91+
for (int j = 0; j < VALUES_PER_DIGEST; j++) {
92+
digest.add(nextTailSpikeValue(random));
93+
}
94+
digests.add(roundTrip(digest));
95+
}
96+
return digests;
97+
}
98+
99+
private TDigest mergeSequential(List<TDigest> digests) {
100+
TDigest accumulator = roundTrip(digests.get(0));
101+
for (int i = 1; i < digests.size(); i++) {
102+
accumulator.add(digests.get(i));
103+
}
104+
return accumulator;
105+
}
106+
107+
private TDigest mergeHierarchical(List<TDigest> digests, int batchSize) {
108+
List<TDigest> currentLevel = digests;
109+
while (currentLevel.size() > 1) {
110+
List<TDigest> nextLevel = new ArrayList<>((currentLevel.size() + batchSize - 1) / batchSize);
111+
for (int start = 0; start < currentLevel.size(); start += batchSize) {
112+
int end = Math.min(start + batchSize, currentLevel.size());
113+
TDigest accumulator = roundTrip(currentLevel.get(start));
114+
for (int i = start + 1; i < end; i++) {
115+
accumulator.add(currentLevel.get(i));
116+
}
117+
nextLevel.add(roundTrip(accumulator));
118+
}
119+
currentLevel = nextLevel;
120+
}
121+
return currentLevel.get(0);
122+
}
123+
124+
private double nextTailSpikeValue(Random random) {
125+
double roll = random.nextDouble();
126+
if (roll < 0.97d) {
127+
return random.nextDouble() * 100d;
128+
}
129+
if (roll < 0.995d) {
130+
return 9_900d + random.nextDouble() * 50d;
131+
}
132+
return random.nextDouble() * SCALE;
133+
}
134+
135+
private TDigest roundTrip(TDigest digest) {
136+
ByteBuffer buffer = ByteBuffer.allocate(digest.smallByteSize());
137+
digest.asSmallBytes(buffer);
138+
buffer.flip();
139+
return MergingDigest.fromBytes(buffer);
140+
}
141+
}

0 commit comments

Comments
 (0)